Mercurial > repos > bgruening > sklearn_stacking_ensemble_models
diff iraps_classifier.py @ 0:47467890f541 draft
planemo upload for repository https://github.com/bgruening/galaxytools/tree/master/tools/sklearn commit ab963ec9498bd05d2fb2f24f75adb2fccae7958c
| author | bgruening | 
|---|---|
| date | Wed, 15 May 2019 07:03:47 -0400 | 
| parents | |
| children | 
line wrap: on
 line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iraps_classifier.py Wed May 15 07:03:47 2019 -0400 @@ -0,0 +1,569 @@ +""" +class IRAPSCore +class IRAPSClassifier +class BinarizeTargetClassifier +class BinarizeTargetRegressor +class _BinarizeTargetScorer +class _BinarizeTargetProbaScorer + +binarize_auc_scorer +binarize_average_precision_scorer + +binarize_accuracy_scorer +binarize_balanced_accuracy_scorer +binarize_precision_scorer +binarize_recall_scorer +""" + + +import numpy as np +import random +import warnings + +from abc import ABCMeta +from scipy.stats import ttest_ind +from sklearn import metrics +from sklearn.base import BaseEstimator, clone, RegressorMixin +from sklearn.externals import six +from sklearn.feature_selection.univariate_selection import _BaseFilter +from sklearn.metrics.scorer import _BaseScorer +from sklearn.pipeline import Pipeline +from sklearn.utils import as_float_array, check_X_y +from sklearn.utils._joblib import Parallel, delayed +from sklearn.utils.validation import (check_array, check_is_fitted, + check_memory, column_or_1d) + + +VERSION = '0.1.1' + + +class IRAPSCore(six.with_metaclass(ABCMeta, BaseEstimator)): + """ + Base class of IRAPSClassifier + From sklearn BaseEstimator: + get_params() + set_params() + + Parameters + ---------- + n_iter : int + sample count + + positive_thres : float + z_score shreshold to discretize positive target values + + negative_thres : float + z_score threshold to discretize negative target values + + verbose : int + 0 or geater, if not 0, print progress + + n_jobs : int, default=1 + The number of CPUs to use to do the computation. + + pre_dispatch : int, or string. + Controls the number of jobs that get dispatched during parallel + execution. Reducing this number can be useful to avoid an + explosion of memory consumption when more jobs get dispatched + than CPUs can process. This parameter can be: + - None, in which case all the jobs are immediately + created and spawned. Use this for lightweight and + fast-running jobs, to avoid delays due to on-demand + spawning of the jobs + - An int, giving the exact number of total jobs that are + spawned + - A string, giving an expression as a function of n_jobs, + as in '2*n_jobs' + + random_state : int or None + """ + + def __init__(self, n_iter=1000, positive_thres=-1, negative_thres=0, + verbose=0, n_jobs=1, pre_dispatch='2*n_jobs', + random_state=None): + """ + IRAPS turns towwards general Anomaly Detection + It comapares positive_thres with negative_thres, + and decide which portion is the positive target. + e.g.: + (positive_thres=-1, negative_thres=0) + => positive = Z_score of target < -1 + (positive_thres=1, negative_thres=0) + => positive = Z_score of target > 1 + + Note: The positive targets here is always the + abnormal minority group. + """ + self.n_iter = n_iter + self.positive_thres = positive_thres + self.negative_thres = negative_thres + self.verbose = verbose + self.n_jobs = n_jobs + self.pre_dispatch = pre_dispatch + self.random_state = random_state + + def fit(self, X, y): + """ + X: array-like (n_samples x n_features) + y: 1-d array-like (n_samples) + """ + X, y = check_X_y(X, y, ['csr', 'csc'], multi_output=False) + + def _stochastic_sampling(X, y, random_state=None, positive_thres=-1, + negative_thres=0): + # each iteration select a random number of random subset of + # training samples. this is somewhat different from the original + # IRAPS method, but effect is almost the same. + SAMPLE_SIZE = [0.25, 0.75] + n_samples = X.shape[0] + + if random_state is None: + n_select = random.randint(int(n_samples * SAMPLE_SIZE[0]), + int(n_samples * SAMPLE_SIZE[1])) + index = random.sample(list(range(n_samples)), n_select) + else: + n_select = random.Random(random_state).randint( + int(n_samples * SAMPLE_SIZE[0]), + int(n_samples * SAMPLE_SIZE[1])) + index = random.Random(random_state).sample( + list(range(n_samples)), n_select) + + X_selected, y_selected = X[index], y[index] + + # Spliting by z_scores. + y_selected = (y_selected - y_selected.mean()) / y_selected.std() + if positive_thres < negative_thres: + X_selected_positive = X_selected[y_selected < positive_thres] + X_selected_negative = X_selected[y_selected > negative_thres] + else: + X_selected_positive = X_selected[y_selected > positive_thres] + X_selected_negative = X_selected[y_selected < negative_thres] + + # For every iteration, at least 5 responders are selected + if X_selected_positive.shape[0] < 5: + warnings.warn("Warning: fewer than 5 positives were selected!") + return + + # p_values + _, p = ttest_ind(X_selected_positive, X_selected_negative, + axis=0, equal_var=False) + + # fold_change == mean change? + # TODO implement other normalization method + positive_mean = X_selected_positive.mean(axis=0) + negative_mean = X_selected_negative.mean(axis=0) + mean_change = positive_mean - negative_mean + # mean_change = np.select( + # [positive_mean > negative_mean, + # positive_mean < negative_mean], + # [positive_mean / negative_mean, + # -negative_mean / positive_mean]) + # mean_change could be adjusted by power of 2 + # mean_change = 2**mean_change \ + # if mean_change>0 else -2**abs(mean_change) + + return p, mean_change, negative_mean + + parallel = Parallel(n_jobs=self.n_jobs, verbose=self.verbose, + pre_dispatch=self.pre_dispatch) + if self.random_state is None: + res = parallel(delayed(_stochastic_sampling)( + X, y, random_state=None, + positive_thres=self.positive_thres, + negative_thres=self.negative_thres) + for i in range(self.n_iter)) + else: + res = parallel(delayed(_stochastic_sampling)( + X, y, random_state=seed, + positive_thres=self.positive_thres, + negative_thres=self.negative_thres) + for seed in range(self.random_state, + self.random_state+self.n_iter)) + res = [_ for _ in res if _] + if len(res) < 50: + raise ValueError("too few (%d) valid feature lists " + "were generated!" % len(res)) + pvalues = np.vstack([x[0] for x in res]) + fold_changes = np.vstack([x[1] for x in res]) + base_values = np.vstack([x[2] for x in res]) + + self.pvalues_ = np.asarray(pvalues) + self.fold_changes_ = np.asarray(fold_changes) + self.base_values_ = np.asarray(base_values) + + return self + + +def _iraps_core_fit(iraps_core, X, y): + return iraps_core.fit(X, y) + + +class IRAPSClassifier(six.with_metaclass(ABCMeta, _BaseFilter, + BaseEstimator, RegressorMixin)): + """ + Extend the bases of both sklearn feature_selector and classifier. + From sklearn BaseEstimator: + get_params() + set_params() + From sklearn _BaseFilter: + get_support() + fit_transform(X) + transform(X) + From sklearn RegressorMixin: + score(X, y): R2 + New: + predict(X) + predict_label(X) + get_signature() + Properties: + discretize_value + + Parameters + ---------- + iraps_core: object + p_thres: float, threshold for p_values + fc_thres: float, threshold for fold change or mean difference + occurrence: float, occurrence rate selected by set of p_thres and fc_thres + discretize: float, threshold of z_score to discretize target value + memory: None, str or joblib.Memory object + min_signature_features: int, the mininum number of features in a signature + """ + + def __init__(self, iraps_core, p_thres=1e-4, fc_thres=0.1, + occurrence=0.8, discretize=-1, memory=None, + min_signature_features=1): + self.iraps_core = iraps_core + self.p_thres = p_thres + self.fc_thres = fc_thres + self.occurrence = occurrence + self.discretize = discretize + self.memory = memory + self.min_signature_features = min_signature_features + + def fit(self, X, y): + memory = check_memory(self.memory) + cached_fit = memory.cache(_iraps_core_fit) + iraps_core = clone(self.iraps_core) + # allow pre-fitted iraps_core here + if not hasattr(iraps_core, 'pvalues_'): + iraps_core = cached_fit(iraps_core, X, y) + self.iraps_core_ = iraps_core + + pvalues = as_float_array(iraps_core.pvalues_, copy=True) + # why np.nan is here? + pvalues[np.isnan(pvalues)] = np.finfo(pvalues.dtype).max + + fold_changes = as_float_array(iraps_core.fold_changes_, copy=True) + fold_changes[np.isnan(fold_changes)] = 0.0 + + base_values = as_float_array(iraps_core.base_values_, copy=True) + + p_thres = self.p_thres + fc_thres = self.fc_thres + occurrence = self.occurrence + + mask_0 = np.zeros(pvalues.shape, dtype=np.int32) + # mark p_values less than the threashold + mask_0[pvalues <= p_thres] = 1 + # mark fold_changes only when greater than the threashold + mask_0[abs(fold_changes) < fc_thres] = 0 + + # count the occurrence and mask greater than the threshold + counts = mask_0.sum(axis=0) + occurrence_thres = int(occurrence * iraps_core.n_iter) + mask = np.zeros(counts.shape, dtype=bool) + mask[counts >= occurrence_thres] = 1 + + # generate signature + fold_changes[mask_0 == 0] = 0.0 + signature = fold_changes[:, mask].sum(axis=0) / counts[mask] + signature = np.vstack((signature, base_values[:, mask].mean(axis=0))) + # It's not clearn whether min_size could impact prediction + # performance + if signature is None\ + or signature.shape[1] < self.min_signature_features: + raise ValueError("The classifier got None signature or the number " + "of sinature feature is less than minimum!") + + self.signature_ = np.asarray(signature) + self.mask_ = mask + # TODO: support other discretize method: fixed value, upper + # third quater, etc. + self.discretize_value = y.mean() + y.std() * self.discretize + if iraps_core.negative_thres > iraps_core.positive_thres: + self.less_is_positive = True + else: + self.less_is_positive = False + + return self + + def _get_support_mask(self): + """ + return mask of feature selection indices + """ + check_is_fitted(self, 'mask_') + + return self.mask_ + + def get_signature(self): + """ + return signature + """ + check_is_fitted(self, 'signature_') + + return self.signature_ + + def predict(self, X): + """ + compute the correlation coefficient with irpas signature + """ + signature = self.get_signature() + + X = as_float_array(X) + X_transformed = self.transform(X) - signature[1] + corrcoef = np.array( + [np.corrcoef(signature[0], e)[0][1] for e in X_transformed]) + corrcoef[np.isnan(corrcoef)] = np.finfo(np.float32).min + + return corrcoef + + def predict_label(self, X, clf_cutoff=0.4): + return self.predict(X) >= clf_cutoff + + +class BinarizeTargetClassifier(BaseEstimator, RegressorMixin): + """ + Convert continuous target to binary labels (True and False) + and apply a classification estimator. + + Parameters + ---------- + classifier: object + Estimator object such as derived from sklearn `ClassifierMixin`. + + z_score: float, default=-1.0 + Threshold value based on z_score. Will be ignored when + fixed_value is set + + value: float, default=None + Threshold value + + less_is_positive: boolean, default=True + When target is less the threshold value, it will be converted + to True, False otherwise. + + Attributes + ---------- + classifier_: object + Fitted classifier + + discretize_value: float + The threshold value used to discretize True and False targets + """ + + def __init__(self, classifier, z_score=-1, value=None, + less_is_positive=True): + self.classifier = classifier + self.z_score = z_score + self.value = value + self.less_is_positive = less_is_positive + + def fit(self, X, y, sample_weight=None): + """ + Convert y to True and False labels and then fit the classifier + with X and new y + + Returns + ------ + self: object + """ + y = check_array(y, accept_sparse=False, force_all_finite=True, + ensure_2d=False, dtype='numeric') + y = column_or_1d(y) + + if self.value is None: + discretize_value = y.mean() + y.std() * self.z_score + else: + discretize_value = self.Value + self.discretize_value = discretize_value + + if self.less_is_positive: + y_trans = y < discretize_value + else: + y_trans = y > discretize_value + + self.classifier_ = clone(self.classifier) + + if sample_weight is not None: + self.classifier_.fit(X, y_trans, sample_weight=sample_weight) + else: + self.classifier_.fit(X, y_trans) + + if hasattr(self.classifier_, 'feature_importances_'): + self.feature_importances_ = self.classifier_.feature_importances_ + if hasattr(self.classifier_, 'coef_'): + self.coef_ = self.classifier_.coef_ + if hasattr(self.classifier_, 'n_outputs_'): + self.n_outputs_ = self.classifier_.n_outputs_ + if hasattr(self.classifier_, 'n_features_'): + self.n_features_ = self.classifier_.n_features_ + + return self + + def predict(self, X): + """ + Predict class probabilities of X. + """ + check_is_fitted(self, 'classifier_') + proba = self.classifier_.predict_proba(X) + return proba[:, 1] + + def predict_label(self, X): + """Predict class label of X + """ + check_is_fitted(self, 'classifier_') + return self.classifier_.predict(X) + + +class _BinarizeTargetProbaScorer(_BaseScorer): + """ + base class to make binarized target specific scorer + """ + + def __call__(self, clf, X, y, sample_weight=None): + clf_name = clf.__class__.__name__ + # support pipeline object + if isinstance(clf, Pipeline): + main_estimator = clf.steps[-1][-1] + # support stacking ensemble estimators + # TODO support nested pipeline/stacking estimators + elif clf_name in ['StackingCVClassifier', 'StackingClassifier']: + main_estimator = clf.meta_clf_ + elif clf_name in ['StackingCVRegressor', 'StackingRegressor']: + main_estimator = clf.meta_regr_ + else: + main_estimator = clf + + discretize_value = main_estimator.discretize_value + less_is_positive = main_estimator.less_is_positive + + if less_is_positive: + y_trans = y < discretize_value + else: + y_trans = y > discretize_value + + y_pred = clf.predict(X) + if sample_weight is not None: + return self._sign * self._score_func(y_trans, y_pred, + sample_weight=sample_weight, + **self._kwargs) + else: + return self._sign * self._score_func(y_trans, y_pred, + **self._kwargs) + + +# roc_auc +binarize_auc_scorer =\ + _BinarizeTargetProbaScorer(metrics.roc_auc_score, 1, {}) + +# average_precision_scorer +binarize_average_precision_scorer =\ + _BinarizeTargetProbaScorer(metrics.average_precision_score, 1, {}) + +# roc_auc_scorer +iraps_auc_scorer = binarize_auc_scorer + +# average_precision_scorer +iraps_average_precision_scorer = binarize_average_precision_scorer + + +class BinarizeTargetRegressor(BaseEstimator, RegressorMixin): + """ + Extend regression estimator to have discretize_value + + Parameters + ---------- + regressor: object + Estimator object such as derived from sklearn `RegressionMixin`. + + z_score: float, default=-1.0 + Threshold value based on z_score. Will be ignored when + fixed_value is set + + value: float, default=None + Threshold value + + less_is_positive: boolean, default=True + When target is less the threshold value, it will be converted + to True, False otherwise. + + Attributes + ---------- + regressor_: object + Fitted regressor + + discretize_value: float + The threshold value used to discretize True and False targets + """ + + def __init__(self, regressor, z_score=-1, value=None, + less_is_positive=True): + self.regressor = regressor + self.z_score = z_score + self.value = value + self.less_is_positive = less_is_positive + + def fit(self, X, y, sample_weight=None): + """ + Calculate the discretize_value fit the regressor with traning data + + Returns + ------ + self: object + """ + y = check_array(y, accept_sparse=False, force_all_finite=True, + ensure_2d=False, dtype='numeric') + y = column_or_1d(y) + + if self.value is None: + discretize_value = y.mean() + y.std() * self.z_score + else: + discretize_value = self.Value + self.discretize_value = discretize_value + + self.regressor_ = clone(self.regressor) + + if sample_weight is not None: + self.regressor_.fit(X, y, sample_weight=sample_weight) + else: + self.regressor_.fit(X, y) + + # attach classifier attributes + if hasattr(self.regressor_, 'feature_importances_'): + self.feature_importances_ = self.regressor_.feature_importances_ + if hasattr(self.regressor_, 'coef_'): + self.coef_ = self.regressor_.coef_ + if hasattr(self.regressor_, 'n_outputs_'): + self.n_outputs_ = self.regressor_.n_outputs_ + if hasattr(self.regressor_, 'n_features_'): + self.n_features_ = self.regressor_.n_features_ + + return self + + def predict(self, X): + """Predict target value of X + """ + check_is_fitted(self, 'regressor_') + y_pred = self.regressor_.predict(X) + if not np.all((y_pred >= 0) & (y_pred <= 1)): + y_pred = (y_pred - y_pred.min()) / (y_pred.max() - y_pred.min()) + if self.less_is_positive: + y_pred = 1 - y_pred + return y_pred + + +# roc_auc_scorer +regression_auc_scorer = binarize_auc_scorer + +# average_precision_scorer +regression_average_precision_scorer = binarize_average_precision_scorer
