from __future__ import annotations
from typing import Any, Dict, List, Optional, Set, Union
import numpy as np
import pandas as pd
from autogluon.common.features.infer_types import get_type_group_map_special, get_type_map_raw
from autogluon.common.features.types import R_BOOL, R_CATEGORY, R_FLOAT, R_INT, R_OBJECT
from ..state import AnalysisState
from .base import AbstractAnalysis
__all__ = [
"DatasetSummary",
"RawTypesAnalysis",
"Sampler",
"SpecialTypesAnalysis",
"VariableTypeAnalysis",
"TrainValidationSplit",
"ProblemTypeControl",
"LabelInsightsAnalysis",
]
from autogluon.core.constants import (
BINARY,
MULTICLASS,
PROBLEM_TYPES_CLASSIFICATION,
PROBLEM_TYPES_REGRESSION,
REGRESSION,
)
from autogluon.core.utils import generate_train_test_split_combined, infer_problem_type
[docs]class Sampler(AbstractAnalysis):
"""
Sampler is a wrapper that provides sampling capabilities for the wrapped analyses.
The sampling is performed for all datasets in `args` and passed to all `children` during `fit` call shadowing outer parameters.
Parameters
----------
sample: Union[None, int, float], default = None
sample size; if `int`, then row number is used;
`float` must be between 0.0 and 1.0 and represents fraction of dataset to sample;
`None` means no sampling
parent: Optional[AbstractAnalysis], default = None
parent Analysis
children: Optional[List[AbstractAnalysis]], default None
wrapped analyses; these will receive sampled `args` during `fit` call
Examples
--------
>>> from autogluon.eda.analysis.base import BaseAnalysis
>>> from autogluon.eda.analysis import Sampler
>>> import pandas as pd
>>> import numpy as np
>>>
>>> df_train = pd.DataFrame(np.random.randint(0, 100, size=(10, 4)), columns=list('ABCD'))
>>> df_test = pd.DataFrame(np.random.randint(0, 100, size=(20, 4)), columns=list('EFGH'))
>>> analysis = BaseAnalysis(train_data=df_train, test_data=df_test, children=[
>>> Sampler(sample=5, children=[
>>> # Analysis here will be performed on a sample of 5 for both train_data and test_data
>>> ])
>>> ])
"""
def __init__(
self,
sample: Union[None, int, float] = None,
parent: Optional[AbstractAnalysis] = None,
children: Optional[List[AbstractAnalysis]] = None,
**kwargs,
) -> None:
super().__init__(parent, children, **kwargs)
if sample is not None and isinstance(sample, float):
assert 0.0 < sample < 1.0, "sample must be within the range (0.0, 1.0)"
self.sample = sample
def can_handle(self, state: AnalysisState, args: AnalysisState) -> bool:
return True
def _fit(self, state: AnalysisState, args: AnalysisState, **fit_kwargs) -> None:
if self.sample is not None:
for ds, df in self.available_datasets(args):
arg = "n"
if self.sample is not None and isinstance(self.sample, float):
arg = "frac"
if len(df) > self.sample:
df = df.sample(**{arg: self.sample}, random_state=0)
if state.sample_size is None:
state.sample_size = {}
state.sample_size[ds] = self.sample
self.args[ds] = df
[docs]class ProblemTypeControl(AbstractAnalysis):
"""
Helper component to control problem type. Autodetect if `problem_type = 'auto'`.
Parameters
----------
problem_type: str, default = 'auto'
problem type to use. Valid problem_type values include ['auto', 'binary', 'multiclass', 'regression', 'quantile', 'softclass']
auto means it will be Auto-detected using AutoGluon methods.
parent: Optional[AbstractAnalysis], default = None
parent Analysis
children: Optional[List[AbstractAnalysis]], default None
wrapped analyses; these will receive sampled `args` during `fit` call
kwargs
"""
def __init__(
self,
problem_type: str = "auto",
parent: Optional[AbstractAnalysis] = None,
children: Optional[List[AbstractAnalysis]] = None,
**kwargs,
) -> None:
super().__init__(parent, children, **kwargs)
valid_problem_types = ["auto"] + PROBLEM_TYPES_REGRESSION + PROBLEM_TYPES_CLASSIFICATION
assert problem_type in valid_problem_types, f"Valid problem_type values include {valid_problem_types}"
self.problem_type = problem_type
def can_handle(self, state: AnalysisState, args: AnalysisState) -> bool:
return self.all_keys_must_be_present(args, "train_data", "label")
def _fit(self, state: AnalysisState, args: AnalysisState, **fit_kwargs) -> None:
if self.problem_type == "auto":
state.problem_type = infer_problem_type(args.train_data[args.label], silent=True)
else:
state.problem_type = self.problem_type
[docs]class TrainValidationSplit(AbstractAnalysis):
"""
This wrapper splits `train_data` into training and validation sets stored in `train_data` and `val_data` for the wrapped analyses.
The split is performed for datasets in `args` and passed to all `children` during `fit` call shadowing outer parameters.
This component requires :py:class:`~autogluon.eda.visualization.dataset.ProblemTypeControl` present in the analysis call to set `problem_type`.
Parameters
----------
val_size: float, default = 0.3
fraction of training set to be assigned as validation set during the split.
parent: Optional[AbstractAnalysis], default = None
parent Analysis
children: Optional[List[AbstractAnalysis]], default None
wrapped analyses; these will receive sampled `args` during `fit` call
kwargs
Examples
--------
>>> from autogluon.eda.analysis.base import BaseAnalysis
>>> from autogluon.eda.analysis import Sampler
>>> import pandas as pd
>>> import numpy as np
>>>
>>> df_train = pd.DataFrame(np.random.randint(0, 100, size=(100, 4)), columns=list("ABCD"))
>>> analysis = BaseAnalysis(train_data=df_train, label="D", children=[
>>> Namespace(namespace="ns_val_split_specified", children=[
>>> ProblemTypeControl(),
>>> TrainValidationSplit(val_pct=0.4, children=[
>>> # This analysis sees 60/40 split of df_train between train_data and val_data
>>> SomeAnalysis()
>>> ])
>>> ]),
>>> Namespace(namespace="ns_val_split_default", children=[
>>> ProblemTypeControl(),
>>> TrainValidationSplit(children=[
>>> # This analysis sees 70/30 split (default) of df_train between train_data and val_data
>>> SomeAnalysis()
>>> ])
>>> ]),
>>> Namespace(namespace="ns_no_split", children=[
>>> # This analysis sees only original train_data
>>> SomeAnalysis()
>>> ]),
>>> ],
>>> )
>>>
>>> state = analysis.fit()
>>>
See Also
--------
:py:class:`~autogluon.eda.visualization.dataset.ProblemTypeControl`
"""
def __init__(
self,
val_size: float = 0.3,
parent: Optional[AbstractAnalysis] = None,
children: Optional[List[AbstractAnalysis]] = None,
**kwargs,
) -> None:
super().__init__(parent, children, **kwargs)
assert 0 < val_size < 1.0, "val_size must be between 0 and 1"
self.val_size = val_size
def can_handle(self, state: AnalysisState, args: AnalysisState) -> bool:
return self.all_keys_must_be_present(state, "problem_type")
def _fit(self, state: AnalysisState, args: AnalysisState, **fit_kwargs) -> None:
train_data, val_data = generate_train_test_split_combined(
args.train_data, args.label, state.problem_type, test_size=self.val_size, **self.args
)
self.args["train_data"] = train_data
self.args["val_data"] = val_data
[docs]class DatasetSummary(AbstractAnalysis):
"""
Generates dataset summary including counts, number of unique elements, most frequent, dtypes and 7-figure summary (std/mean/min/max/quartiles)
Examples
--------
>>> import autogluon.eda.analysis as eda
>>> import autogluon.eda.visualization as viz
>>> import autogluon.eda.auto as auto
>>> state = auto.analyze(
>>> train_data=..., label=..., return_state=True,
>>> anlz_facets=[
>>> eda.dataset.DatasetSummary(),
>>> ],
>>> viz_facets=[
>>> viz.dataset.DatasetStatistics()
>>> ]
>>> )
See Also
--------
:py:class:`~autogluon.eda.visualization.dataset.DatasetStatistics`
"""
def can_handle(self, state: AnalysisState, args: AnalysisState) -> bool:
return True
def _fit(self, state: AnalysisState, args: AnalysisState, **fit_kwargs) -> None:
s = {}
for ds, df in self.available_datasets(args):
summary = df.describe(include="all").T
summary = summary.join(pd.DataFrame({"dtypes": df.dtypes}))
summary["unique"] = args[ds].nunique()
summary["count"] = summary["count"].astype(int)
summary = summary.sort_index()
s[ds] = summary.to_dict()
state.dataset_stats = s
[docs]class RawTypesAnalysis(AbstractAnalysis):
"""
Infers autogluon raw types for the column.
Examples
--------
>>> import autogluon.eda.analysis as eda
>>> import autogluon.eda.visualization as viz
>>> import autogluon.eda.auto as auto
>>> state = auto.analyze(
>>> train_data=..., label=..., return_state=True,
>>> anlz_facets=[
>>> eda.dataset.RawTypesAnalysis(),
>>> ],
>>> viz_facets=[
>>> viz.dataset.DatasetStatistics()
>>> ]
>>> )
See Also
--------
:py:class:`~autogluon.eda.visualization.dataset.DatasetStatistics`
"""
def can_handle(self, state: AnalysisState, args: AnalysisState) -> bool:
return True
def _fit(self, state: AnalysisState, args: AnalysisState, **fit_kwargs) -> None:
state.raw_type = {}
for ds, df in self.available_datasets(args):
state.raw_type[ds] = get_type_map_raw(df)
[docs]class VariableTypeAnalysis(AbstractAnalysis):
"""
Infers variable types for the column: numeric vs category.
This analysis depends on :func:`RawTypesAnalysis`.
Parameters
----------
numeric_as_categorical_threshold: int, default = 20
if numeric column has less than this value, then the variable should be considered as categorical
parent: Optional[AbstractAnalysis], default = None
parent Analysis
children: Optional[List[AbstractAnalysis]], default None
wrapped analyses; these will receive sampled `args` during `fit` call
Examples
--------
>>> import autogluon.eda.analysis as eda
>>> import autogluon.eda.visualization as viz
>>> import autogluon.eda.auto as auto
>>> state = auto.analyze(
>>> train_data=..., label=..., return_state=True,
>>> anlz_facets=[
>>> eda.dataset.RawTypesAnalysis(),
>>> eda.dataset.VariableTypeAnalysis(),
>>> ],
>>> viz_facets=[
>>> viz.dataset.DatasetStatistics()
>>> ]
>>> )
See Also
--------
:py:class:`~autogluon.eda.analysis.dataset.RawTypesAnalysis`
:py:class:`~autogluon.eda.visualization.dataset.DatasetStatistics`
"""
def __init__(
self,
parent: Union[None, AbstractAnalysis] = None,
children: Optional[List[AbstractAnalysis]] = None,
numeric_as_categorical_threshold: int = 20,
**kwargs,
) -> None:
super().__init__(parent, children, **kwargs)
self.numeric_as_categorical_threshold = numeric_as_categorical_threshold
def can_handle(self, state: AnalysisState, args: AnalysisState) -> bool:
return self.all_keys_must_be_present(state, "raw_type")
def _fit(self, state: AnalysisState, args: AnalysisState, **fit_kwargs) -> None:
state.variable_type = {}
for ds, df in self.available_datasets(args):
state.variable_type[ds] = {
c: self.map_raw_type_to_feature_type(c, t, df, self.numeric_as_categorical_threshold)
for c, t in state.raw_type[ds].items()
}
@staticmethod
def map_raw_type_to_feature_type(
col: Optional[str], raw_type: str, df: pd.DataFrame, numeric_as_categorical_threshold: int = 20
) -> Union[None, str]:
if col is None:
return None
elif df[col].nunique() <= numeric_as_categorical_threshold:
return "category"
elif raw_type in [R_INT, R_FLOAT]:
return "numeric"
elif raw_type in [R_OBJECT, R_CATEGORY, R_BOOL]:
return "category"
else:
return None
[docs]class SpecialTypesAnalysis(AbstractAnalysis):
"""
Infers autogluon special types for the column (i.e. text).
Examples
--------
>>> import autogluon.eda.analysis as eda
>>> import autogluon.eda.visualization as viz
>>> import autogluon.eda.auto as auto
>>> state = auto.analyze(
>>> train_data=..., label=..., return_state=True,
>>> anlz_facets=[
>>> eda.dataset.SpecialTypesAnalysis(),
>>> ],
>>> viz_facets=[
>>> viz.dataset.DatasetStatistics()
>>> ]
>>> )
See Also
--------
:py:class:`~autogluon.eda.visualization.dataset.DatasetStatistics`
"""
def can_handle(self, state: AnalysisState, args: AnalysisState) -> bool:
return True
def _fit(self, state: AnalysisState, args: AnalysisState, **fit_kwargs) -> None:
state.special_types = {}
for ds, df in self.available_datasets(args):
state.special_types[ds] = self.infer_special_types(df)
@staticmethod
def infer_special_types(ds):
special_types: Dict[str, Set[str]] = {}
for t, cols in get_type_group_map_special(ds).items():
for col in cols:
if col not in special_types:
special_types[col] = set()
special_types[col].add(t)
result: Dict[str, str] = {}
for col, types in special_types.items():
result[col] = ", ".join(sorted(types))
return result
[docs]class LabelInsightsAnalysis(AbstractAnalysis):
"""
Analyze label for insights:
- classification: low cardinality classes detection
- classification: classes present in test data, but not in the train data
- regression: out-of-domain labels detection
Note: this Analysis requires `problem_type` present in state.
It can be detected/set via :py:class:`~autogluon.eda.analysis.dataset.ProblemTypeControl` component
Parameters
----------
low_cardinality_classes_threshold: int, default = 50
Minimum class instances present in the dataset to consider marking a class as low-cardinality
regression_ood_threshold: float, default = 0.01
mark results as out-of-domain when test label range in regression task is beyond train data range + regression_ood_threshold margin,
This is performed because some algorithms can't extrapolate beyond training data.
class_imbalance_ratio_threshold: float, default = 0.4
minority class proportion to detect as imbalance.
parent: Optional[AbstractAnalysis], default = None
parent Analysis
children: Optional[List[AbstractAnalysis]], default None
wrapped analyses; these will receive sampled `args` during `fit` call
state: AnalysisState
state object to perform check on
Examples
--------
>>> import autogluon.eda.analysis as eda
>>> import autogluon.eda.visualization as viz
>>> import autogluon.eda.auto as auto
>>> auto.analyze(
>>> auto.analyze(train_data=..., test_data=..., label=..., anlz_facets=[
>>> eda.dataset.ProblemTypeControl(),
>>> eda.dataset.LabelInsightsAnalysis(low_cardinality_classes_threshold=50, regression_ood_threshold=0.01),
>>> ], viz_facets=[
>>> viz.dataset.LabelInsightsVisualization()
>>> ])
See Also
--------
:py:class:`~autogluon.eda.analysis.dataset.ProblemTypeControl`
:py:class:`~autogluon.eda.visualization.dataset.LabelInsightsVisualization`
"""
def __init__(
self,
low_cardinality_classes_threshold: int = 50,
regression_ood_threshold: float = 0.01,
class_imbalance_ratio_threshold: float = 0.4,
parent: Optional[AbstractAnalysis] = None,
children: Optional[List[AbstractAnalysis]] = None,
state: Optional[AnalysisState] = None,
**kwargs,
) -> None:
super().__init__(parent, children, state, **kwargs)
assert low_cardinality_classes_threshold > 0, "low_cardinality_classes_threshold must be greater than 0"
self.low_cardinality_classes_threshold = low_cardinality_classes_threshold
assert 0 < class_imbalance_ratio_threshold < 1, "class_imbalance_ratio_threshold must be between 0 and 1"
self.class_imbalance_ratio_threshold = class_imbalance_ratio_threshold
assert 0 < regression_ood_threshold < 1, "regression_ood_threshold must be between 0 and 1"
self.regression_ood_threshold = regression_ood_threshold
assert regression_ood_threshold >= 0, "regression_ood_threshold must be non-negative"
self.regression_ood_threshold = regression_ood_threshold
def can_handle(self, state: AnalysisState, args: AnalysisState) -> bool:
return self.all_keys_must_be_present(args, "train_data", "label") and self.all_keys_must_be_present(
state, "problem_type"
)
def _fit(self, state: AnalysisState, args: AnalysisState, **fit_kwargs) -> None:
label = args.label
train_data = args.train_data
s: Dict[str, Any] = {}
if state.problem_type in [BINARY, MULTICLASS]:
label_counts = train_data[label].value_counts()
minority_class = label_counts[label_counts == label_counts.min()].index.values[0]
majority_class = label_counts[label_counts == label_counts.max()].index.values[0]
minority_class_imbalance_ratio = min(label_counts) / max(label_counts)
# Low-cardinality class detection
label_counts = label_counts[label_counts < self.low_cardinality_classes_threshold].to_dict()
if len(label_counts) > 0:
s["low_cardinality_classes"] = {
"instances": label_counts,
"threshold": self.low_cardinality_classes_threshold,
}
# Class imbalance detection
if minority_class_imbalance_ratio < self.class_imbalance_ratio_threshold:
s["minority_class_imbalance"] = {
"majority_class": majority_class,
"minority_class": minority_class,
"ratio": minority_class_imbalance_ratio,
}
# Classes not found in test_data
if self._test_data_with_label_present(args, label):
train_labels = set(train_data[label].unique())
test_labels = set(args.test_data[label].unique())
if sorted(train_labels) != sorted(test_labels):
missing_classes = test_labels.difference(train_labels)
s["not_present_in_train"] = missing_classes
elif (state.problem_type in [REGRESSION]) and self._test_data_with_label_present(args, label):
# Out-of-domain range detection
test_data = args.test_data
label_min, label_max = np.min(train_data[label]), np.max(train_data[label])
padding = np.abs(label_max - label_min) * self.regression_ood_threshold
df_ood = args.test_data[
(test_data[label] < label_min - padding) | (test_data[label] > label_max + padding)
]
if len(df_ood) > 0:
s["ood"] = {
"count": len(df_ood),
"train_range": [label_min, label_max],
"test_range": [np.min(test_data[label]), np.max(test_data[label])],
"threshold": self.regression_ood_threshold,
}
if len(s) > 0:
state.label_insights = s
def _test_data_with_label_present(self, args, label):
return (args.test_data is not None) and (label in args.test_data.columns)