Source code for autogluon.eda.analysis.shift
import copy
from typing import Any, Dict, List, Optional, Union
import numpy as np
import pandas as pd
from autogluon.core.constants import BINARY
from autogluon.core.metrics import BINARY_METRICS, roc_auc
from autogluon.core.utils import generate_train_test_split
from autogluon.tabular import TabularPredictor
from .. import AnalysisState
from ..state import StateCheckMixin
from .base import AbstractAnalysis
__all__ = ["XShiftDetector"]
[docs]class XShiftDetector(AbstractAnalysis, StateCheckMixin):
"""Detect a change in covariate (X) distribution between training and test, which we call XShift. It can tell you
if your training set is not representative of your test set distribution. This is done with a Classifier 2
Sample Test.
Parameters
----------
classifier_class : an AutoGluon predictor, such as autogluon.tabular.TabularPredictor (default)
The predictor that will be fit on training set and predict the test set
compute_fi : bool, default = True
To compute the feature importances set to True, this can be computationally intensive
pvalue_thresh : float, default = 0.01
The threshold for the pvalue
eval_metric : str, default = 'balanced_accuracy'
The metric used for the C2ST, it must be one of the binary metrics from autogluon.core.metrics
sample_label : str, default = 'i2vkyc0p64'
The label internally used for the classifier 2 sample test, the only reason to change it is in the off chance
that the default value is a column in the data.
classifier_kwargs : dict, default = {}
The kwargs passed to the classifier, a member of classifier_class
classifier_fit_kwargs : dict, default = {}
The kwargs passed to the classifier's `fit` call, a member of classifier_class
num_permutations: int, default = 1000
The number of permutations used for any permutation based method
test_size_2st: float, default = 0.3
The size of the test set in the training test split in 2ST
State attributes
----------------
state.xshift_results: outputs the results of XShift detection,
dict of
- 'detection_status': bool, True if detected
- 'test_statistic': float, the C2ST statistic
- 'pvalue': float, the p-value using permutation test
- 'pvalue_threshold': float, the decision p-value threshold
- 'feature_importance': DataFrame, the feature importance dataframe, if computed
"""
def __init__(
self,
classifier_class: Any = TabularPredictor,
compute_fi: bool = True,
pvalue_thresh: float = 0.01,
eval_metric: str = "roc_auc",
sample_label: str = "i2vkyc0p64",
classifier_kwargs: Optional[dict] = None,
classifier_fit_kwargs: Optional[dict] = None,
num_permutations: int = 1000,
test_size_2st: float = 0.3,
parent: Union[None, AbstractAnalysis] = None,
children: Optional[List[AbstractAnalysis]] = None,
**kwargs,
) -> None:
super().__init__(parent, children, **kwargs)
if classifier_kwargs is None:
classifier_kwargs = {}
if classifier_fit_kwargs is None:
classifier_fit_kwargs = {}
self.classifier_kwargs = classifier_kwargs
self.classifier_fit_kwargs = classifier_fit_kwargs
self.classifier_class = classifier_class
self.compute_fi = compute_fi
named_metrics = BINARY_METRICS
assert eval_metric in named_metrics.keys(), (
"eval_metric must be one of [" + ", ".join(named_metrics.keys()) + "]"
)
self.eval_metric = named_metrics[eval_metric]
self.C2ST = Classifier2ST(
classifier_class,
sample_label=sample_label,
eval_metric=self.eval_metric,
compute_fi=compute_fi,
classifier_kwargs=classifier_kwargs,
test_size_2st=test_size_2st,
)
self.fi_scores = None
self.compute_fi = compute_fi
self.pvalue_thresh = pvalue_thresh
self.num_permutations = num_permutations
def can_handle(self, state: AnalysisState, args: AnalysisState) -> bool:
return self.all_keys_must_be_present(args, "train_data", "test_data")
def _fit(self, state: AnalysisState, args: AnalysisState, **fit_kwargs) -> None:
"""Fit method. `args` can contain
- 'train_data': pd.DataFrame, required
- 'test_data': pd.DataFrame, required
- 'label': str, optional
The Y variable that is to be predicted (if it appears in the train/test data then it will be removed)
"""
X = args["train_data"].copy()
X_test = args["test_data"].copy()
assert (
self.C2ST.sample_label not in X.columns
), f"your data columns contain {self.C2ST.sample_label} which is used internally"
if "label" in args:
label = args["label"]
if label in X.columns:
X = X.drop(columns=[label])
if label in X_test.columns:
X_test = X_test.drop(columns=[label])
self.C2ST.fit((X, X_test), **self.classifier_fit_kwargs, **fit_kwargs)
# Feature importance
if self.C2ST.has_fi and self.compute_fi:
fi_scores = self.C2ST.feature_importance()
else:
fi_scores = None
pvalue = self.C2ST.pvalue(num_permutations=self.num_permutations)
state.xshift_results = {
"detection_status": bool(pvalue <= self.pvalue_thresh), # numpy.bool_ -> bool
"test_statistic": self.C2ST.test_stat,
"pvalue": pvalue,
"pvalue_threshold": self.pvalue_thresh,
"eval_metric": self.eval_metric.name,
"feature_importance": fi_scores,
}
def post_fit(func):
"""decorator for post-fit methods"""
def pff_wrapper(self, *args, **kwargs):
assert self._is_fit, f".fit needs to be called prior to .{func.__name__}"
return func(self, *args, **kwargs)
return pff_wrapper
class Classifier2ST:
"""A classifier 2 sample test, which tests for a difference between a source and target dataset. It fits a
classifier to predict if a sample is in the source and target dataset, then computes an evaluation metric on a
holdout which becomes the test statistic.
Parameters
----------
classifier_class : an AutoGluon predictor, such as autogluon.tabular.TabularPredictor
The predictor (classifier) class to classify the source from target dataset, predictor class needs to support
binary classification
sample_label : str, default = 'xshift_label'
The label that will be used to indicate if the sample is from training or test
eval_metric : callable, default = autogluon.core.metrics.balanced_accuracy
Binary classification metric to use for the classifier 2 sample test, currently only metrics that accept binary
predictions are supported, such as balanced_accuracy
compute_fi : bool, default = True
To compute the feature importances set to True, this can be computationally intensive
split : float, default = 0.5
Training/test split proportion for classifier 2 sample test
classifier_kwargs : dict, default = {}
The kwargs passed to the classifier, a member of classifier_class
test_size_2st: float, default = 0.3
The size of the test set in the training test split in 2ST
"""
def __init__(
self,
classifier_class,
sample_label="xshift_label",
eval_metric=roc_auc,
split=0.5,
compute_fi=True,
classifier_kwargs: Optional[Dict] = None,
test_size_2st=0.3,
):
if classifier_kwargs is None:
classifier_kwargs = {}
else:
classifier_kwargs = copy.deepcopy(classifier_kwargs)
classifier_kwargs.update({"label": sample_label, "eval_metric": eval_metric})
self.classifier = classifier_class(**classifier_kwargs)
self.classifier_class = classifier_class
self.split = split
self.sample_label = sample_label
self.eval_metric = eval_metric
self._is_fit = False
self._test = None
self.test_stat = None
self.has_fi: Optional[bool] = None
self.compute_fi = compute_fi
self.test_size_2st = test_size_2st
@staticmethod
def _make_source_target_label(data, sample_label):
"""Turn a source, target pair into a single dataframe with label column"""
source, target = data[0].copy(), data[1].copy()
source.loc[:, sample_label] = 0
target.loc[:, sample_label] = 1
data = pd.concat((source, target), ignore_index=True)
return data
def fit(self, data, **kwargs):
"""Fit the classifier for predicting if source or target and compute the 2-sample test statistic.
Parameters
----------
data : pd.DataFrame, or tuple
either
- a dataframe with a label column where 1 = target and 0 = source
- a tuple of source dataframe and target dataframe
"""
if isinstance(data, pd.DataFrame):
sample_label = self.sample_label
assert sample_label in data.columns, "sample_label needs to be a column of data"
assert self.split, "sample_label requires the split parameter"
data = data.copy() # makes a copy
else:
assert len(data) == 2, "Data needs to be tuple/list of (source, target) if sample_label is None"
data = self._make_source_target_label(data, self.sample_label) # makes a copy
if data.index.has_duplicates:
data = data.reset_index(drop=True)
train, test, y_train, y_test = generate_train_test_split(
data.drop(columns=[self.sample_label]), data[self.sample_label], BINARY, test_size=self.test_size_2st
)
train[self.sample_label] = y_train
test[self.sample_label] = y_test
self.classifier.fit(train, **kwargs)
yhat = self.classifier.predict_proba(test)[1]
self.test_stat = self.eval_metric(test[self.sample_label], yhat)
self.has_fi = getattr(self.classifier, "feature_importance", None) is not None
if self.has_fi and self.compute_fi:
self._test = test # for feature importance
self._is_fit = True
@post_fit
def _pvalue_half_permutation(self, num_permutations=1000):
"""The half permutation method for computing p-values.
See Section 9.2 of https://arxiv.org/pdf/1602.02210.pdf
"""
perm_stats = [self.test_stat]
yhat = self.classifier.predict_proba(self._test)[1]
for _ in range(num_permutations):
perm_yhat = np.random.permutation(yhat)
perm_test_stat = self.eval_metric(self._test[self.sample_label], perm_yhat) # type: ignore
perm_stats.append(perm_test_stat)
pval = (self.test_stat <= np.array(perm_stats)).mean()
return pval
@post_fit
def pvalue(self, num_permutations: int = 1000):
"""Compute the p-value which measures the significance level for the test statistic
Parameters
----------
num_permutations: int, default = 1000
The number of permutations used for any permutation based method
Returns
-------
float of the p-value for the 2-sample test
"""
pval = self._pvalue_half_permutation(num_permutations=num_permutations)
return pval
@post_fit
def feature_importance(self):
"""Returns the feature importances for the trained classifier for source v. target
Returns
-------
pd.DataFrame of feature importances
"""
assert self.has_fi, "Classifier class does not have feature_importance method"
assert self.compute_fi, "Set compute_fi to True to compute feature importances"
fi_scores = self.classifier.feature_importance(self._test)
return fi_scores