Source code for autogluon.tabular.models.fastainn.tabular_nn_fastai

import copy
import logging
import time
from builtins import classmethod
from pathlib import Path
from typing import Dict, Union

import numpy as np
import pandas as pd
import sklearn

from autogluon.common.features.types import (
    R_BOOL,
    R_CATEGORY,
    R_DATETIME,
    R_FLOAT,
    R_INT,
    R_OBJECT,
    S_TEXT_AS_CATEGORY,
    S_TEXT_NGRAM,
    S_TEXT_SPECIAL,
)
from autogluon.common.utils.pandas_utils import get_approximate_df_mem_usage
from autogluon.common.utils.resource_utils import ResourceManager
from autogluon.common.utils.try_import import try_import_fastai
from autogluon.core.constants import BINARY, QUANTILE, REGRESSION
from autogluon.core.hpo.constants import RAY_BACKEND
from autogluon.core.models import AbstractModel
from autogluon.core.utils.exceptions import TimeLimitExceeded
from autogluon.core.utils.files import make_temp_directory
from autogluon.core.utils.loaders import load_pkl
from autogluon.core.utils.savers import save_pkl

from .hyperparameters.parameters import get_param_baseline
from .hyperparameters.searchspaces import get_default_searchspace

# FIXME: Has a leak somewhere, training additional models in a single python script will slow down training for each additional model. Gets very slow after 20+ models (10x+ slowdown)
#  Slowdown does not appear to impact Mac OS
# Reproduced with raw torch: https://github.com/pytorch/pytorch/issues/31867
# https://forums.fast.ai/t/runtimeerror-received-0-items-of-ancdata/48935
# https://github.com/pytorch/pytorch/issues/973
# https://pytorch.org/docs/master/multiprocessing.html#file-system-file-system
# Slowdown bug not experienced on Linux if 'torch.multiprocessing.set_sharing_strategy('file_system')' commented out
# NOTE: If below line is commented out, Torch uses many file descriptors. If issues arise, increase ulimit through 'ulimit -n 2048' or larger. Default on Linux is 1024.
# torch.multiprocessing.set_sharing_strategy('file_system')

# MacOS issue: torchvision==0.7.0 + torch==1.6.0 can cause segfaults; use torch==1.2.0 torchvision==0.4.0

LABEL = "__label__"

logger = logging.getLogger(__name__)


# TODO: Takes extremely long time prior to training start if many (10000) continuous features from ngrams, debug - explore TruncateSVD option to reduce input dimensionality
# TODO: currently fastai automatically detect and use CUDA if available - add code to honor autogluon settings
[docs]class NNFastAiTabularModel(AbstractModel): """Class for fastai v1 neural network models that operate on tabular data. Hyperparameters: y_scaler: on a regression problems, the model can give unreasonable predictions on unseen data. This attribute allows to pass a scaler for y values to address this problem. Please note that intermediate iteration metrics will be affected by this transform and as a result intermediate iteration scores will be different from the final ones (these will be correct). https://scikit-learn.org/stable/modules/classes.html#module-sklearn.preprocessing 'layers': list of hidden layers sizes; None - use model's heuristics; default is None 'emb_drop': embedding layers dropout; default is 0.1 'ps': linear layers dropout - list of values applied to every layer in `layers`; default is [0.1] 'bs': batch size; default is 256 'lr': maximum learning rate for one cycle policy; default is 1e-2; see also https://docs.fast.ai/callback.schedule.html#Learner.fit_one_cycle, One-cycle policy paper: https://arxiv.org/abs/1803.09820 'epochs': number of epochs; default is 30 # Early stopping settings. See more details here: https://docs.fast.ai/callback.tracker.html#EarlyStoppingCallback 'early.stopping.min_delta': 0.0001, 'early.stopping.patience': 10, """ model_internals_file_name = "model-internals.pkl" def __init__(self, **kwargs): super().__init__(**kwargs) self.cat_columns = None self.cont_columns = None self.columns_fills = None self._columns_fills_names = None self.procs = None self.y_scaler = None self._cont_normalization = None self._load_model = None # Whether to load inner model when loading. self._num_cpus_infer = None def _preprocess_train(self, X, y, X_val, y_val): from fastai.data.block import CategoryBlock, RegressionBlock from fastai.data.transforms import IndexSplitter from fastai.tabular.core import TabularPandas from fastcore.basics import range_of X = self.preprocess(X, fit=True) if X_val is not None: X_val = self.preprocess(X_val) from fastai.tabular.core import Categorify self.procs = [Categorify] if self.problem_type in [REGRESSION, QUANTILE] and self.y_scaler is not None: y_norm = pd.Series(self.y_scaler.fit_transform(y.values.reshape(-1, 1)).reshape(-1)) y_val_norm = pd.Series(self.y_scaler.transform(y_val.values.reshape(-1, 1)).reshape(-1)) if y_val is not None else None logger.log(0, f"Training with scaled targets: {self.y_scaler} - !!! NN training metric will be different from the final results !!!") else: y_norm = y y_val_norm = y_val logger.log(15, f"Using {len(self.cont_columns)} cont features") df_train, train_idx, val_idx = self._generate_datasets(X, y_norm, X_val, y_val_norm) y_block = RegressionBlock() if self.problem_type in [REGRESSION, QUANTILE] else CategoryBlock() # Copy cat_columns and cont_columns because TabularList is mutating the list data = TabularPandas( df_train, cat_names=self.cat_columns.copy(), cont_names=self.cont_columns.copy(), procs=self.procs, y_block=y_block, y_names=LABEL, splits=IndexSplitter(val_idx)(range_of(df_train)), ) return data def _preprocess(self, X: pd.DataFrame, fit=False, **kwargs): X = super()._preprocess(X=X, **kwargs) if fit: self.cont_columns = self._feature_metadata.get_features(valid_raw_types=[R_INT, R_FLOAT, R_DATETIME]) self.cat_columns = self._feature_metadata.get_features(valid_raw_types=[R_OBJECT, R_CATEGORY, R_BOOL]) if self.cont_columns: self._cont_normalization = (np.array(X[self.cont_columns].mean()), np.array(X[self.cont_columns].std())) num_cat_cols_og = len(self.cat_columns) if self.cat_columns: try: X_stats = X[self.cat_columns].describe(include="all").T.reset_index() cat_cols_to_drop = list( X_stats[(X_stats["unique"] > self.params.get("max_unique_categorical_values", 10000)) | (X_stats["unique"].isna())]["index"].values ) except: cat_cols_to_drop = [] if len(cat_cols_to_drop) != 0: cat_cols_to_drop = set(cat_cols_to_drop) self.cat_columns = [col for col in self.cat_columns if (col not in cat_cols_to_drop)] num_cat_cols_use = len(self.cat_columns) logger.log(15, f"Using {num_cat_cols_use}/{num_cat_cols_og} categorical features") nullable_numeric_features = self._feature_metadata.get_features(valid_raw_types=[R_FLOAT, R_DATETIME], invalid_special_types=[S_TEXT_SPECIAL]) self.columns_fills = dict() self._columns_fills_names = nullable_numeric_features for c in self._columns_fills_names: # No need to do this for int features, int can't have null self.columns_fills[c] = X[c].mean() X = self._fill_missing(X) if self.cont_columns: cont_mean, cont_std = self._cont_normalization # Creating a new DataFrame is 10x+ faster than assigning results to X[self.cont_columns] X_cont = pd.DataFrame( (X[self.cont_columns].values - cont_mean) / cont_std, columns=self.cont_columns, index=X.index, ) if self.cat_columns: # Creating a new DataFrame via concatenation is faster than editing values in-place X = pd.concat([X_cont, X[self.cat_columns]], axis=1) else: X = X_cont.copy() return X def _fill_missing(self, df: pd.DataFrame) -> pd.DataFrame: # FIXME: Consider representing categories as int if self.columns_fills: # Speed up preprocessing by only filling columns where NaNs are present is_null = df[self._columns_fills_names].isnull().values.max(axis=0) columns_to_fill = [self._columns_fills_names[i] for i in range(len(is_null)) if is_null[i]] column_fills = {k: self.columns_fills[k] for k in columns_to_fill} if column_fills: # TODO: pandas==1.5.3 fillna is 10x+ slower than pandas==1.3.5 with large column count df = df.fillna(column_fills, inplace=False, downcast=False) else: df = df.copy() else: df = df.copy() return df def _fit(self, X, y, X_val=None, y_val=None, time_limit=None, num_cpus=None, num_gpus=0, sample_weight=None, **kwargs): try_import_fastai() import torch from fastai import torch_core from fastai.tabular.learner import tabular_learner from fastai.tabular.model import tabular_config from .callbacks import AgSaveModelCallback, EarlyStoppingCallbackWithTimeLimit from .quantile_helpers import HuberPinballLoss torch.set_num_threads(num_cpus) start_time = time.time() if sample_weight is not None: # TODO: support logger.log(15, "sample_weight not yet supported for NNFastAiTabularModel, this model will ignore them in training.") params = self._get_model_params() self._num_cpus_infer = params.pop("_num_cpus_infer", 1) self.y_scaler = params.get("y_scaler", None) if self.y_scaler is None: if self.problem_type == REGRESSION: self.y_scaler = sklearn.preprocessing.StandardScaler() elif self.problem_type == QUANTILE: self.y_scaler = sklearn.preprocessing.MinMaxScaler() else: self.y_scaler = copy.deepcopy(self.y_scaler) if num_gpus is not None: # TODO: Control CPU vs GPU usage during inference if num_gpus == 0: torch_core.default_device(False) else: # TODO: respect CUDA_VISIBLE_DEVICES to select proper GPU torch_core.default_device(True) logger.log(15, f"Fitting Neural Network with parameters {params}...") data = self._preprocess_train(X, y, X_val, y_val) nn_metric, objective_func_name = self.__get_objective_func_name(self.stopping_metric) objective_func_name_to_monitor = self.__get_objective_func_to_monitor(objective_func_name) objective_optim_mode = ( np.less if objective_func_name in [ "log_loss", "root_mean_squared_error", "mean_squared_error", "mean_absolute_error", "median_absolute_error", # Regression objectives "pinball_loss", # Quantile objective ] else np.greater ) # TODO: calculate max emb concat layer size and use 1st layer as that value and 2nd in between number of classes and the value if params.get("layers", None) is not None: layers = params["layers"] if isinstance(layers, tuple): layers = list(layers) elif self.problem_type in [REGRESSION, BINARY]: layers = [200, 100] elif self.problem_type == QUANTILE: base_size = max(len(self.quantile_levels) * 4, 128) layers = [base_size, base_size, base_size] else: base_size = max(data.c * 2, 100) layers = [base_size * 2, base_size] loss_func = None if self.problem_type == QUANTILE: loss_func = HuberPinballLoss(self.quantile_levels, alpha=self.params["alpha"]) best_epoch_stop = params.get("best_epoch", None) # Use best epoch for refit_full. batch_size = self._get_batch_size(X) dls = data.dataloaders(bs=batch_size) # Make deterministic from fastai.torch_core import set_seed set_seed(0, True) dls.rng.seed(0) if self.problem_type == QUANTILE: dls.c = len(self.quantile_levels) self.model = tabular_learner( dls, layers=layers, metrics=nn_metric, config=tabular_config(ps=params["ps"], embed_p=params["emb_drop"]), loss_func=loss_func, ) logger.log(15, self.model.model) fname = "model" save_callback = AgSaveModelCallback( monitor=objective_func_name_to_monitor, comp=objective_optim_mode, fname=fname, best_epoch_stop=best_epoch_stop, with_opt=True ) if time_limit is not None: time_elapsed = time.time() - start_time time_left = time_limit - time_elapsed if time_left <= time_limit * 0.7: # if 30% of time was spent preprocessing, likely not enough time to train model raise TimeLimitExceeded else: time_left = None early_stopping = EarlyStoppingCallbackWithTimeLimit( monitor=objective_func_name_to_monitor, comp=objective_optim_mode, min_delta=params["early.stopping.min_delta"], patience=params["early.stopping.patience"], time_limit=time_left, best_epoch_stop=best_epoch_stop, ) callbacks = [save_callback, early_stopping] with make_temp_directory() as temp_dir: with self.model.no_bar(): with self.model.no_logging(): original_path = self.model.path self.model.path = Path(temp_dir) len_val = len(X_val) if X_val is not None else 0 epochs = self._get_epochs_number(samples_num=len(X) + len_val, epochs=params["epochs"], batch_size=batch_size, time_left=time_left) if epochs == 0: # Stop early if there is not enough time to train a full epoch raise TimeLimitExceeded self.model.fit_one_cycle(epochs, params["lr"], cbs=callbacks) # Load the best one and export it self.model = self.model.load(fname) if objective_func_name == "log_loss": eval_result = self.model.validate(dl=dls.valid)[0] else: eval_result = self.model.validate(dl=dls.valid)[1] logger.log(15, f"Model validation metrics: {eval_result}") self.model.path = original_path self.params_trained["epochs"] = epochs self.params_trained["best_epoch"] = save_callback.best_epoch def _get_batch_size(self, X, default_batch_size_for_small_inputs=32): bs = self.params["bs"] if bs == "auto": bs = 512 if len(X) >= 200000 else 256 bs = bs if len(X) > bs else default_batch_size_for_small_inputs if self.params["bs"] == "auto": logger.log(15, f"Automated batch size selection: {bs}") return bs def _get_epochs_number(self, samples_num, epochs, batch_size, time_left=None, min_batches_count=30, default_epochs=30): if epochs == "auto": batches_count = int(samples_num / batch_size) + 1 if not time_left: return default_epochs elif batches_count < min_batches_count: return default_epochs else: est_batch_time = self._measure_batch_times(min_batches_count) est_epoch_time = batches_count * est_batch_time * 1.1 est_max_epochs = int(time_left / est_epoch_time) epochs = min(default_epochs, est_max_epochs) epochs = max(epochs, 0) logger.log( 15, f"Automated epochs selection: training for {epochs} epoch(s). Estimated time budget use {epochs * est_epoch_time:.2f} / {time_left:.2f} sec", ) return epochs def _measure_batch_times(self, min_batches_count): from fastai.callback.core import CancelFitException from .callbacks import BatchTimeTracker batch_time_tracker_callback = BatchTimeTracker(batches_to_measure=min_batches_count) try: with self.model.no_bar(): with self.model.no_logging(): self.model.fit(1, lr=0, cbs=[batch_time_tracker_callback]) except CancelFitException: pass # expected early exit batch_time = batch_time_tracker_callback.batch_measured_time return batch_time def _generate_datasets(self, X, y, X_val, y_val): df_train = pd.concat([X, X_val], ignore_index=True) df_train[LABEL] = pd.concat([y, y_val], ignore_index=True) train_idx = np.arange(len(X)) if X_val is None: # use validation set for refit_full case - it's not going to be used for early stopping val_idx = np.array([0, 1]) + len(train_idx) df_train = pd.concat([df_train, df_train[:2]], ignore_index=True) else: val_idx = np.arange(len(X_val)) + len(X) return df_train, train_idx, val_idx def __get_objective_func_name(self, stopping_metric): metrics_map = self.__get_metrics_map() # Unsupported metrics will be replaced by defaults for a given problem type objective_func_name = stopping_metric.name if objective_func_name not in metrics_map.keys(): if self.problem_type == REGRESSION: objective_func_name = "mean_squared_error" elif self.problem_type == QUANTILE: objective_func_name = "pinball_loss" else: objective_func_name = "log_loss" logger.warning(f"Metric {stopping_metric.name} is not supported by this model - using {objective_func_name} instead") nn_metric = metrics_map.get(objective_func_name, None) return nn_metric, objective_func_name def __get_objective_func_to_monitor(self, objective_func_name): monitor_obj_func = { **{k: m.name if hasattr(m, "name") else m.__name__ for k, m in self.__get_metrics_map().items() if m is not None}, "log_loss": "valid_loss", } objective_func_name_to_monitor = objective_func_name if objective_func_name in monitor_obj_func: objective_func_name_to_monitor = monitor_obj_func[objective_func_name] return objective_func_name_to_monitor def _predict_proba(self, X, **kwargs): X = self.preprocess(X, **kwargs) single_row = len(X) == 1 # fastai has issues predicting on a single row, duplicating the row as a workaround if single_row: X = pd.concat([X, X]).reset_index(drop=True) # Copy cat_columns and cont_columns because TabularList is mutating the list # TODO: This call has very high fixed cost with many features (0.7s for a single row with 3k features) # Primarily due to normalization performed on the inputs test_dl = self.model.dls.test_dl(X, inplace=True) with self.model.no_bar(): with self.model.no_logging(): preds, _ = self.model.get_preds(dl=test_dl) if single_row: preds = preds[:1, :] if self.problem_type == REGRESSION: if self.y_scaler is not None: return self.y_scaler.inverse_transform(preds.numpy()).reshape(-1) else: return preds.numpy().reshape(-1) elif self.problem_type == QUANTILE: from .quantile_helpers import isotonic if self.y_scaler is not None: preds = self.y_scaler.inverse_transform(preds.numpy()).reshape(-1, len(self.quantile_levels)) else: preds = preds.numpy().reshape(-1, len(self.quantile_levels)) return isotonic(preds, self.quantile_levels) elif self.problem_type == BINARY: return preds[:, 1].numpy() else: return preds.numpy() def save(self, path: str = None, verbose=True) -> str: from .fastai_helpers import export self._load_model = self.model is not None __model = self.model self.model = None path = super().save(path=path, verbose=verbose) self.model = __model # Export model if self._load_model: save_pkl.save_with_fn(f"{path}{self.model_internals_file_name}", self.model, pickle_fn=lambda m, buffer: export(m, buffer), verbose=verbose) self._load_model = None return path @classmethod def load(cls, path: str, reset_paths=True, verbose=True): from fastai.learner import load_learner model = super().load(path, reset_paths=reset_paths, verbose=verbose) if model._load_model: # Need the following logic to allow cross os loading of fastai model # https://github.com/fastai/fastai/issues/1482 import pathlib import platform plt = platform.system() og_windows_path = None if plt != "Windows": og_windows_path = pathlib.WindowsPath pathlib.WindowsPath = pathlib.PosixPath model.model = load_pkl.load_with_fn(f"{model.path}{model.model_internals_file_name}", lambda p: load_learner(p), verbose=verbose) if og_windows_path is not None: pathlib.WindowsPath = og_windows_path model._load_model = None return model def _set_default_params(self): """Specifies hyperparameter values to use by default""" default_params = get_param_baseline(self.problem_type) for param, val in default_params.items(): self._set_default_param_value(param, val) def _get_default_searchspace(self): return get_default_searchspace(self.problem_type, num_classes=None) def _get_default_auxiliary_params(self) -> dict: default_auxiliary_params = super()._get_default_auxiliary_params() extra_auxiliary_params = dict( valid_raw_types=[R_BOOL, R_INT, R_FLOAT, R_CATEGORY], ignored_type_group_special=[S_TEXT_NGRAM, S_TEXT_AS_CATEGORY], ) default_auxiliary_params.update(extra_auxiliary_params) return default_auxiliary_params def _get_default_resources(self): # logical=False is faster in training num_cpus = ResourceManager.get_cpu_count_psutil(logical=False) num_gpus = 0 return num_cpus, num_gpus def __get_metrics_map(self): from fastai.metrics import FBeta, Precision, R2Score, Recall, RocAucBinary, accuracy, mae, mse, rmse from .fastai_helpers import medae from .quantile_helpers import HuberPinballLoss metrics_map = { # Regression "root_mean_squared_error": rmse, "mean_squared_error": mse, "mean_absolute_error": mae, "r2": R2Score(), "median_absolute_error": medae, # Classification "accuracy": accuracy, "f1": FBeta(beta=1), "f1_macro": FBeta(beta=1, average="macro"), "f1_micro": FBeta(beta=1, average="micro"), "f1_weighted": FBeta(beta=1, average="weighted"), # this one has some issues "roc_auc": RocAucBinary(), "precision": Precision(), "precision_macro": Precision(average="macro"), "precision_micro": Precision(average="micro"), "precision_weighted": Precision(average="weighted"), "recall": Recall(), "recall_macro": Recall(average="macro"), "recall_micro": Recall(average="micro"), "recall_weighted": Recall(average="weighted"), "log_loss": None, "pinball_loss": HuberPinballLoss(quantile_levels=self.quantile_levels) # Not supported: pac_score } return metrics_map def _estimate_memory_usage(self, X, **kwargs): return 10 * get_approximate_df_mem_usage(X).sum() def _get_hpo_backend(self): """Choose which backend(Ray or Custom) to use for hpo""" return RAY_BACKEND def _get_maximum_resources(self) -> Dict[str, Union[int, float]]: # fastai model trains slower when utilizing virtual cores and this issue scale up when the number of cpu cores increases return {"num_cpus": ResourceManager.get_cpu_count_psutil(logical=False)} def get_minimum_resources(self, is_gpu_available=False): minimum_resources = { "num_cpus": 1, } if is_gpu_available: minimum_resources["num_gpus"] = 0.5 return minimum_resources def _more_tags(self): return {"can_refit_full": True}