Source code for autogluon.tabular.models.tabular_nn.torch.tabular_nn_torch

import json
import logging
import os
import psutil
import random
import time
import warnings
import numpy as np
import pandas as pd

from autogluon.common.features.types import R_BOOL, R_INT, R_FLOAT, R_CATEGORY, S_TEXT_NGRAM, S_TEXT_AS_CATEGORY
from autogluon.core.constants import BINARY, MULTICLASS, REGRESSION, SOFTCLASS, QUANTILE
from autogluon.core.hpo.constants import RAY_BACKEND
from autogluon.core.utils import try_import_torch
from autogluon.core.utils.exceptions import TimeLimitExceeded
from autogluon.core.models.abstract.abstract_nn_model import AbstractNeuralNetworkModel

from ..hyperparameters.parameters import get_default_param
from ..hyperparameters.searchspaces import get_default_searchspace
from ..utils.data_preprocessor import create_preprocessor, get_feature_arraycol_map, get_feature_type_map
from ..utils.nn_architecture_utils import infer_y_range

logger = logging.getLogger(__name__)


# TODO: QuantileTransformer in pipelines accounts for majority of online inference time
[docs]class TabularNeuralNetTorchModel(AbstractNeuralNetworkModel): """ PyTorch neural network models for classification/regression with tabular data. """ # Constants used throughout this class: unique_category_str = '!missing!' # string used to represent missing values and unknown categories for categorical features. params_file_name = 'net.params' # Stores parameters of final network temp_file_name = 'temp_net.params' # Stores temporary network parameters (eg. during the course of training) def __init__(self, **kwargs): super().__init__(**kwargs) self.feature_arraycol_map = None self.feature_type_map = None self.features_to_drop = [] # may change between different bagging folds. TODO: consider just removing these from self._features_internal self.processor = None # data processor self.num_dataloading_workers = None self._architecture_desc = None self.optimizer = None self.device = None self.max_batch_size = None self._num_cpus_infer = None def _set_default_params(self): """ Specifies hyperparameter values to use by default """ default_params = get_default_param(problem_type=self.problem_type, framework='pytorch') for param, val in default_params.items(): self._set_default_param_value(param, val) def _get_default_auxiliary_params(self) -> dict: default_auxiliary_params = super()._get_default_auxiliary_params() extra_auxiliary_params = dict( valid_raw_types=[R_BOOL, R_INT, R_FLOAT, R_CATEGORY], ignored_type_group_special=[S_TEXT_NGRAM, S_TEXT_AS_CATEGORY], ) default_auxiliary_params.update(extra_auxiliary_params) return default_auxiliary_params def _get_default_searchspace(self): return get_default_searchspace(problem_type=self.problem_type, framework='pytorch') def _get_num_net_outputs(self): if self.problem_type in [MULTICLASS, SOFTCLASS]: return self.num_classes elif self.problem_type == BINARY: return 2 elif self.problem_type == REGRESSION: return 1 elif self.problem_type == QUANTILE: return len(self.quantile_levels) else: raise ValueError(f'Unknown problem_type: {self.problem_type}') def _get_device(self, num_gpus): import torch if num_gpus is not None and num_gpus >= 1: if torch.cuda.is_available(): device = torch.device("cuda") logger.log(15, "Training on GPU") if num_gpus > 1: logger.warning(f"{self.__class__.__name__} not yet able to use more than 1 GPU. 'num_gpus' is set to >1, but we will be using only 1 GPU.") else: device = torch.device("cpu") logger.log(15, "Training on CPU") else: device = torch.device("cpu") logger.log(15, "Training on CPU") return device def _set_net_defaults(self, train_dataset, params): params = params.copy() y_range_extend = params.pop('y_range_extend', None) """ Sets dataset-adaptive default values to use for our neural network """ if self.problem_type in [REGRESSION, QUANTILE]: if params['y_range'] is None: params['y_range'] = infer_y_range(y_vals=train_dataset.data_list[train_dataset.label_index], y_range_extend=y_range_extend) return params def _get_default_loss_function(self): import torch if self.problem_type == REGRESSION: return torch.nn.L1Loss() # or torch.nn.MSELoss() elif self.problem_type in [BINARY, MULTICLASS]: return torch.nn.CrossEntropyLoss() elif self.problem_type == SOFTCLASS: return torch.nn.KLDivLoss() # compares log-probability prediction vs probability target. @staticmethod def _prepare_params(params): params = params.copy() processor_param_keys = {'proc.embed_min_categories', 'proc.impute_strategy', 'proc.max_category_levels', 'proc.skew_threshold', 'use_ngram_features'} processor_kwargs = {k: v for k, v in params.items() if k in processor_param_keys} for key in processor_param_keys: params.pop(key, None) optimizer_param_keys = {'optimizer', 'learning_rate', 'weight_decay'} optimizer_kwargs = {k: v for k, v in params.items() if k in optimizer_param_keys} for key in optimizer_param_keys: params.pop(key, None) fit_param_keys = {'num_epochs', 'epochs_wo_improve'} fit_kwargs = {k: v for k, v in params.items() if k in fit_param_keys} for key in fit_param_keys: params.pop(key, None) loss_param_keys = {'loss_function', 'gamma'} loss_kwargs = {k: v for k, v in params.items() if k in loss_param_keys} for key in loss_param_keys: params.pop(key, None) return processor_kwargs, optimizer_kwargs, fit_kwargs, loss_kwargs, params def _fit(self, X, y, X_val=None, y_val=None, time_limit=None, sample_weight=None, num_cpus=1, num_gpus=0, reporter=None, verbosity=2, **kwargs): try_import_torch() import torch torch.set_num_threads(num_cpus) from .tabular_torch_dataset import TabularTorchDataset start_time = time.time() params = self._get_model_params() processor_kwargs, optimizer_kwargs, fit_kwargs, loss_kwargs, params = self._prepare_params(params=params) seed_value = params.pop('seed_value', 0) self._num_cpus_infer = params.pop('_num_cpus_infer', 1) if seed_value is not None: # Set seeds random.seed(seed_value) np.random.seed(seed_value) torch.manual_seed(seed_value) if sample_weight is not None: # TODO: support logger.log(15, f"sample_weight not yet supported for {self.__class__.__name__}," " this model will ignore them in training.") if num_cpus is not None: self.num_dataloading_workers = max(1, int(num_cpus/2.0)) else: self.num_dataloading_workers = 1 if self.num_dataloading_workers == 1: self.num_dataloading_workers = 0 # TODO: verify 0 is typically faster and uses less memory than 1 in pytorch self.num_dataloading_workers = 0 # TODO: >0 crashes on MacOS self.max_batch_size = params.pop('max_batch_size', 512) batch_size = params.pop('batch_size', None) if batch_size is None: if isinstance(X, TabularTorchDataset): batch_size = min(int(2 ** (3 + np.floor(np.log10(len(X))))), self.max_batch_size) else: batch_size = min(int(2 ** (3 + np.floor(np.log10(X.shape[0])))), self.max_batch_size) train_dataset, val_dataset = self._generate_datasets(X=X, y=y, params=processor_kwargs, X_val=X_val, y_val=y_val) logger.log(15, f"Training data for {self.__class__.__name__} has: " f"{train_dataset.num_examples} examples, {train_dataset.num_features} features " f"({len(train_dataset.feature_groups['vector'])} vector, {len(train_dataset.feature_groups['embed'])} embedding)") self.device = self._get_device(num_gpus=num_gpus) self._get_net(train_dataset, params=params) self.optimizer = self._init_optimizer(**optimizer_kwargs) if time_limit is not None: time_elapsed = time.time() - start_time time_limit_orig = time_limit time_limit = time_limit - time_elapsed # if 60% of time was spent preprocessing, likely not enough time to train model if time_limit <= time_limit_orig * 0.4: raise TimeLimitExceeded # train network self._train_net(train_dataset=train_dataset, loss_kwargs=loss_kwargs, batch_size=batch_size, val_dataset=val_dataset, time_limit=time_limit, reporter=reporter, verbosity=verbosity, **fit_kwargs) def _get_net(self, train_dataset, params): from .torch_network_modules import EmbedNet # set network params params = self._set_net_defaults(train_dataset, params) self.model = EmbedNet(problem_type=self.problem_type, num_net_outputs=self._get_num_net_outputs(), quantile_levels=self.quantile_levels, train_dataset=train_dataset, device=self.device, **params) self.model = self.model.to(self.device) if not os.path.exists(self.path): os.makedirs(self.path) def _train_net(self, train_dataset, loss_kwargs, batch_size, num_epochs, epochs_wo_improve, val_dataset=None, time_limit=None, reporter=None, verbosity=2): import torch start_time = time.time() logging.debug("initializing neural network...") self.model.init_params() logging.debug("initialized") train_dataloader = train_dataset.build_loader(batch_size, self.num_dataloading_workers, is_test=False) if isinstance(loss_kwargs.get('loss_function', 'auto'), str) and loss_kwargs.get('loss_function', 'auto') == 'auto': loss_kwargs['loss_function'] = self._get_default_loss_function() if val_dataset is not None: y_val = val_dataset.get_labels() if y_val.ndim == 2 and y_val.shape[1] == 1: y_val = y_val.flatten() else: y_val = None if verbosity <= 1: verbose_eval = False else: verbose_eval = True logger.log(15, "Neural network architecture:") logger.log(15, str(self.model)) net_filename = self.path + self.temp_file_name if num_epochs == 0: # use dummy training loop that stops immediately # useful for using NN just for data preprocessing / debugging logger.log(20, "Not training Tabular Neural Network since num_updates == 0") # for each batch for batch_idx, data_batch in enumerate(train_dataloader): if batch_idx > 0: break loss = self.model.compute_loss(data_batch, **loss_kwargs) self.optimizer.zero_grad() loss.backward() self.optimizer.step() os.makedirs(os.path.dirname(self.path), exist_ok=True) torch.save(self.model, net_filename) logger.log(15, "Untrained Tabular Neural Network saved to file") return # start training loop: logger.log(15, f"Training tabular neural network for up to {num_epochs} epochs...") total_updates = 0 num_updates_per_epoch = len(train_dataloader) update_to_check_time = min(10, max(1, int(num_updates_per_epoch/10))) do_update = True epoch = 0 best_epoch = 0 best_val_metric = -np.inf # higher = better best_val_update = 0 val_improve_epoch = 0 # most recent epoch where validation-score strictly improved start_fit_time = time.time() if time_limit is not None: time_limit = time_limit - (start_fit_time - start_time) if time_limit <= 0: raise TimeLimitExceeded while do_update: time_start_epoch = time.time() total_train_loss = 0.0 total_train_size = 0.0 for batch_idx, data_batch in enumerate(train_dataloader): # forward loss = self.model.compute_loss(data_batch, **loss_kwargs) total_train_loss += loss.item() total_train_size += 1 # update self.optimizer.zero_grad() loss.backward() self.optimizer.step() total_updates += 1 # time limit if time_limit is not None: time_cur = time.time() update_cur = batch_idx + 1 if epoch == 0 and update_cur == update_to_check_time: time_elapsed_epoch = time_cur - time_start_epoch estimated_time = time_elapsed_epoch / update_cur * num_updates_per_epoch if estimated_time > time_limit: logger.log(30, f"\tNot enough time to train first epoch. " f"(Time Required: {round(estimated_time, 2)}s, Time Left: {round(time_limit, 2)}s)") raise TimeLimitExceeded time_elapsed = time_cur - start_fit_time if time_limit < time_elapsed: logger.log(15, f"\tRan out of time, stopping training early. (Stopped on Update {total_updates} (Epoch {epoch}))") do_update = False break if not do_update: break epoch += 1 # validation if val_dataset is not None: # compute validation score val_metric = self.score(X=val_dataset, y=y_val, metric=self.stopping_metric, _reset_threads=False) if np.isnan(val_metric): if best_epoch == 0: raise RuntimeError(f"NaNs encountered in {self.__class__.__name__} training. " "Features/labels may be improperly formatted, " "or NN weights may have diverged.") else: logger.warning(f"Warning: NaNs encountered in {self.__class__.__name__} training. " "Reverting model to last checkpoint without NaNs.") break # update best validation if (val_metric >= best_val_metric) or best_epoch == 0: if val_metric > best_val_metric: val_improve_epoch = epoch best_val_metric = val_metric os.makedirs(os.path.dirname(self.path), exist_ok=True) torch.save(self.model, net_filename) best_epoch = epoch best_val_update = total_updates if verbose_eval: logger.log(15, f"Epoch {epoch} (Update {total_updates}).\t" f"Train loss: {round(total_train_loss / total_train_size, 4)}, " f"Val {self.stopping_metric.name}: {round(val_metric, 4)}, " f"Best Epoch: {best_epoch}") if reporter is not None: reporter(epoch=total_updates, validation_performance=val_metric, # Higher val_metric = better train_loss=total_train_loss / total_train_size, eval_metric=self.eval_metric.name, greater_is_better=self.eval_metric.greater_is_better) # no improvement if epoch - val_improve_epoch >= epochs_wo_improve: break if epoch >= num_epochs: break if time_limit is not None: time_elapsed = time.time() - start_fit_time time_epoch_average = time_elapsed / (epoch+1) time_left = time_limit - time_elapsed if time_left < time_epoch_average: logger.log(20, f"\tRan out of time, stopping training early. (Stopping on epoch {epoch})") break if epoch == 0: raise AssertionError('0 epochs trained!') # revert back to best model if val_dataset is not None: logger.log(15, f"Best model found on Epoch {best_epoch} (Update {best_val_update}). Val {self.stopping_metric.name}: {best_val_metric}") try: self.model = torch.load(net_filename) os.remove(net_filename) except FileNotFoundError: pass else: logger.log(15, f"Best model found on Epoch {best_epoch} (Update {best_val_update}).") self.params_trained['batch_size'] = batch_size self.params_trained['num_epochs'] = best_epoch # FIXME: torch.set_num_threads(self._num_cpus_infer) is required because XGBoost<=1.5 mutates global OpenMP thread limit # If this isn't here, inference speed is slowed down massively. # Remove once upgraded to XGBoost>=1.6 def _predict_proba(self, X, _reset_threads=True, **kwargs): if _reset_threads: from ..._utils.torch_utils import TorchThreadManager with TorchThreadManager(num_threads=self._num_cpus_infer): pred_proba = self._predict_proba_internal(X=X, **kwargs) else: pred_proba = self._predict_proba_internal(X=X, **kwargs) return pred_proba def _predict_proba_internal(self, X, **kwargs): """ To align predict with abstract_model API. Preprocess here only refers to feature processing steps done by all AbstractModel objects, not tabularNN-specific preprocessing steps. If X is not DataFrame but instead TabularNNDataset object, we can still produce predictions, but cannot use preprocess in this case (needs to be already processed). """ from .tabular_torch_dataset import TabularTorchDataset if isinstance(X, TabularTorchDataset): return self._predict_tabular_data(new_data=X, process=False) elif isinstance(X, pd.DataFrame): X = self.preprocess(X, **kwargs) return self._predict_tabular_data(new_data=X, process=True) else: raise ValueError("X must be of type pd.DataFrame or TabularTorchDataset, not type: %s" % type(X)) def _predict_tabular_data(self, new_data, process=True): from .tabular_torch_dataset import TabularTorchDataset if process: new_data = self._process_test_data(new_data) if not isinstance(new_data, TabularTorchDataset): raise ValueError("new_data must of of type TabularTorchDataset if process=False") val_dataloader = new_data.build_loader(self.max_batch_size, self.num_dataloading_workers, is_test=True) preds_dataset = [] for batch_idx, data_batch in enumerate(val_dataloader): preds_batch = self.model.predict(data_batch) preds_dataset.append(preds_batch) preds_dataset = np.concatenate(preds_dataset, 0) return preds_dataset def _generate_datasets(self, X, y, params, X_val=None, y_val=None): from .tabular_torch_dataset import TabularTorchDataset impute_strategy = params['proc.impute_strategy'] max_category_levels = params['proc.max_category_levels'] skew_threshold = params['proc.skew_threshold'] embed_min_categories = params['proc.embed_min_categories'] use_ngram_features = params['use_ngram_features'] if isinstance(X, TabularTorchDataset): train_dataset = X else: X = self.preprocess(X) train_dataset = self._process_train_data(df=X, labels=y, impute_strategy=impute_strategy, max_category_levels=max_category_levels, skew_threshold=skew_threshold, embed_min_categories=embed_min_categories, use_ngram_features=use_ngram_features) if X_val is not None: if isinstance(X_val, TabularTorchDataset): val_dataset = X_val else: X_val = self.preprocess(X_val) val_dataset = self._process_test_data(df=X_val, labels=y_val) else: val_dataset = None return train_dataset, val_dataset def _process_test_data(self, df, labels=None): """ Process train or test DataFrame into a form fit for neural network models. Args: df (pd.DataFrame): Data to be processed (X) labels (pd.Series): labels to be processed (y) Returns: Dataset object """ from .tabular_torch_dataset import TabularTorchDataset # sklearn processing n_quantiles warning warnings.filterwarnings("ignore", module='sklearn.preprocessing') if labels is not None and len(labels) != len(df): raise ValueError("Number of examples in Dataframe does not match number of labels") if (self.processor is None or self._types_of_features is None or self.feature_arraycol_map is None or self.feature_type_map is None): raise ValueError("Need to process training data before test data") if self.features_to_drop: drop_cols = [col for col in df.columns if col in self.features_to_drop] if drop_cols: df = df.drop(columns=drop_cols) # self.feature_arraycol_map, self.feature_type_map have been previously set while processing training data. df = self.processor.transform(df) return TabularTorchDataset(df, self.feature_arraycol_map, self.feature_type_map, self.problem_type, labels) def _process_train_data(self, df, impute_strategy, max_category_levels, skew_threshold, embed_min_categories, use_ngram_features, labels): from .tabular_torch_dataset import TabularTorchDataset # sklearn processing n_quantiles warning warnings.filterwarnings("ignore", module='sklearn.preprocessing') if labels is None: raise ValueError("Attempting process training data without labels") if len(labels) != len(df): raise ValueError("Number of examples in Dataframe does not match number of labels") # dict with keys: : 'continuous', 'skewed', 'onehot', 'embed', values = column-names of df self._types_of_features, df = self._get_types_of_features(df, skew_threshold=skew_threshold, embed_min_categories=embed_min_categories, use_ngram_features=use_ngram_features) logger.log(15, "Tabular Neural Network treats features as the following types:") logger.log(15, json.dumps(self._types_of_features, indent=4)) logger.log(15, "\n") if self.processor is not None: Warning(f"Attempting to process training data for {self.__class__.__name__}, but previously already did this.") self.processor = create_preprocessor( impute_strategy=impute_strategy, max_category_levels=max_category_levels, unique_category_str=self.unique_category_str, continuous_features=self._types_of_features['continuous'], skewed_features=self._types_of_features['skewed'], onehot_features=self._types_of_features['onehot'], embed_features=self._types_of_features['embed'], bool_features=self._types_of_features['bool'] ) df = self.processor.fit_transform(df) # OrderedDict of feature-name -> list of column-indices in df corresponding to this feature self.feature_arraycol_map = get_feature_arraycol_map(processor=self.processor, max_category_levels=max_category_levels) num_array_cols = np.sum([len(self.feature_arraycol_map[key]) for key in self.feature_arraycol_map]) # should match number of columns in processed array if num_array_cols != df.shape[1]: raise ValueError("Error during one-hot encoding data processing for neural network. " "Number of columns in df array does not match feature_arraycol_map.") # OrderedDict of feature-name -> feature_type string (options: 'vector', 'embed') self.feature_type_map = get_feature_type_map(feature_arraycol_map=self.feature_arraycol_map, types_of_features=self._types_of_features) return TabularTorchDataset(df, self.feature_arraycol_map, self.feature_type_map, self.problem_type, labels) def _init_optimizer(self, optimizer, learning_rate, weight_decay): """ Set up optimizer needed for training. Network must first be initialized before this. """ import torch if optimizer == 'sgd': optimizer = torch.optim.SGD(params=self.model.parameters(), lr=learning_rate, weight_decay=weight_decay) elif optimizer == 'adam': optimizer = torch.optim.Adam(params=self.model.parameters(), lr=learning_rate, weight_decay=weight_decay) else: raise ValueError(f"Unknown optimizer specified: {optimizer}") return optimizer def reduce_memory_size(self, remove_fit=True, requires_save=True, **kwargs): super().reduce_memory_size(remove_fit=remove_fit, requires_save=requires_save, **kwargs) if remove_fit and requires_save: self.optimizer = None def _get_default_stopping_metric(self): return self.eval_metric def _get_default_resources(self): # psutil.cpu_count(logical=False) is faster in training than psutil.cpu_count() num_cpus = psutil.cpu_count(logical=False) num_gpus = 0 return num_cpus, num_gpus def save(self, path: str = None, verbose=True) -> str: if self.model is not None: self._architecture_desc = self.model.architecture_desc temp_model = self.model self.model = None path_final = super().save(path=path, verbose=verbose) self.model = temp_model self._architecture_desc = None # Export model if self.model is not None: import torch params_filepath = path_final + self.params_file_name # TODO: Don't use os.makedirs here, have save_parameters function in tabular_nn_model that checks if local path or S3 path os.makedirs(os.path.dirname(path_final), exist_ok=True) torch.save(self.model, params_filepath) return path_final @classmethod def load(cls, path: str, reset_paths=True, verbose=True): model: TabularNeuralNetTorchModel = super().load(path=path, reset_paths=reset_paths, verbose=verbose) if model._architecture_desc is not None: import torch from .torch_network_modules import EmbedNet # recreate network from architecture description model.model = EmbedNet(problem_type=model.problem_type, num_net_outputs=model._get_num_net_outputs(), quantile_levels=model.quantile_levels, architecture_desc=model._architecture_desc, device=model.device) model._architecture_desc = None model.model = torch.load(model.path + model.params_file_name) return model def _get_hpo_backend(self): """Choose which backend(Ray or Custom) to use for hpo""" return RAY_BACKEND def _more_tags(self): # `can_refit_full=True` because batch_size and num_epochs is communicated at end of `_fit`: # self.params_trained['batch_size'] = batch_size # self.params_trained['num_epochs'] = best_epoch return {'can_refit_full': True}