Source code for autogluon.tabular.models.xgboost.xgboost_model

import time
import logging

import psutil

from autogluon.common.features.types import R_BOOL, R_INT, R_FLOAT, R_CATEGORY
from autogluon.common.utils.pandas_utils import get_approximate_df_mem_usage
from autogluon.core.constants import MULTICLASS, REGRESSION, SOFTCLASS, PROBLEM_TYPES_CLASSIFICATION
from autogluon.core.models import AbstractModel
from autogluon.core.models._utils import get_early_stopping_rounds
from autogluon.core.utils import try_import_xgboost
from autogluon.core.utils.exceptions import NotEnoughMemoryError

from . import xgboost_utils
from .hyperparameters.parameters import get_param_baseline
from .hyperparameters.searchspaces import get_default_searchspace

logger = logging.getLogger(__name__)


[docs]class XGBoostModel(AbstractModel): """ XGBoost model: https://xgboost.readthedocs.io/en/latest/ Hyperparameter options: https://xgboost.readthedocs.io/en/latest/parameter.html """ def __init__(self, **kwargs): super().__init__(**kwargs) self._ohe_generator = None self._xgb_model_type = None def _set_default_params(self): default_params = get_param_baseline(problem_type=self.problem_type, num_classes=self.num_classes) for param, val in default_params.items(): self._set_default_param_value(param, val) def _get_default_searchspace(self): return get_default_searchspace(problem_type=self.problem_type, num_classes=self.num_classes) def _get_default_auxiliary_params(self) -> dict: default_auxiliary_params = super()._get_default_auxiliary_params() extra_auxiliary_params = dict( valid_raw_types=[R_BOOL, R_INT, R_FLOAT, R_CATEGORY], ) default_auxiliary_params.update(extra_auxiliary_params) return default_auxiliary_params # Use specialized XGBoost metric if available (fast), otherwise use custom func generator def get_eval_metric(self): eval_metric = xgboost_utils.convert_ag_metric_to_xgbm(ag_metric_name=self.stopping_metric.name, problem_type=self.problem_type) if eval_metric is None: eval_metric = xgboost_utils.func_generator(metric=self.stopping_metric, is_higher_better=True, needs_pred_proba=not self.stopping_metric.needs_pred, problem_type=self.problem_type) return eval_metric def _preprocess(self, X, is_train=False, max_category_levels=None, **kwargs): X = super()._preprocess(X=X, **kwargs) if self._ohe_generator is None: self._ohe_generator = xgboost_utils.OheFeatureGenerator(max_levels=max_category_levels) if is_train: self._ohe_generator.fit(X) X = self._ohe_generator.transform(X) return X def _fit(self, X, y, X_val=None, y_val=None, time_limit=None, num_gpus=0, num_cpus=None, sample_weight=None, sample_weight_val=None, verbosity=2, **kwargs): # TODO: utilize sample_weight_val in early-stopping if provided start_time = time.time() ag_params = self._get_ag_params() params = self._get_model_params() if num_cpus: params['n_jobs'] = num_cpus max_category_levels = params.pop('proc.max_category_levels', 100) if verbosity <= 2: verbose = False log_period = None elif verbosity == 3: verbose = True log_period = 50 else: verbose = True log_period = 1 X = self.preprocess(X, is_train=True, max_category_levels=max_category_levels) num_rows_train = X.shape[0] eval_set = [] eval_metric = self.get_eval_metric() if X_val is None: early_stopping_rounds = None eval_set = None else: X_val = self.preprocess(X_val, is_train=False) eval_set.append((X_val, y_val)) early_stopping_rounds = ag_params.get('ag.early_stop', 'adaptive') if isinstance(early_stopping_rounds, (str, tuple, list)): early_stopping_rounds = self._get_early_stopping_rounds(num_rows_train=num_rows_train, strategy=early_stopping_rounds) if num_gpus != 0: params['tree_method'] = 'gpu_hist' if 'gpu_id' not in params: params['gpu_id'] = 0 elif 'tree_method' not in params: params['tree_method'] = 'hist' try_import_xgboost() from .callbacks import EarlyStoppingCustom from xgboost.callback import EvaluationMonitor callbacks = [] if eval_set is not None: if log_period is not None: callbacks.append(EvaluationMonitor(period=log_period)) callbacks.append(EarlyStoppingCustom(early_stopping_rounds, start_time=start_time, time_limit=time_limit, verbose=verbose)) from xgboost import XGBClassifier, XGBRegressor model_type = XGBClassifier if self.problem_type in PROBLEM_TYPES_CLASSIFICATION else XGBRegressor if 'eval_metric' not in params and params.get('objective') == 'binary:logistic': # avoid unnecessary warning from XGBoost v1.3.0 params['eval_metric'] = 'logloss' self.model = model_type(**params) self.model.fit( X=X, y=y, eval_set=eval_set, eval_metric=eval_metric, verbose=False, callbacks=callbacks, sample_weight=sample_weight ) bst = self.model.get_booster() # TODO: Investigate speed-ups from GPU inference # bst.set_param({"predictor": "gpu_predictor"}) self.params_trained['n_estimators'] = bst.best_ntree_limit def _predict_proba(self, X, num_cpus=-1, **kwargs): X = self.preprocess(X, **kwargs) self.model.set_params(n_jobs=num_cpus) if self.problem_type == REGRESSION: return self.model.predict(X) y_pred_proba = self.model.predict_proba(X) return self._convert_proba_to_unified_form(y_pred_proba) def _get_early_stopping_rounds(self, num_rows_train, strategy='auto'): return get_early_stopping_rounds(num_rows_train=num_rows_train, strategy=strategy) def _get_num_classes(self, y): if self.problem_type == MULTICLASS: if self.num_classes is not None: num_classes = self.num_classes else: num_classes = 10 # Guess if not given, can do better by looking at y elif self.problem_type == SOFTCLASS: # TODO: delete this elif if it's unnecessary. num_classes = y.shape[1] else: num_classes = 1 return num_classes def _ag_params(self) -> set: return {'ag.early_stop'} def _estimate_memory_usage(self, X, **kwargs): num_classes = self.num_classes if self.num_classes else 1 # self.num_classes could be None after initialization if it's a regression problem data_mem_usage = get_approximate_df_mem_usage(X).sum() approx_mem_size_req = data_mem_usage * 7 + data_mem_usage / 4 * num_classes # TODO: Extremely crude approximation, can be vastly improved return approx_mem_size_req def _validate_fit_memory_usage(self, **kwargs): max_memory_usage_ratio = self.params_aux['max_memory_usage_ratio'] approx_mem_size_req = self.estimate_memory_usage(**kwargs) if approx_mem_size_req > 1e9: # > 1 GB available_mem = psutil.virtual_memory().available ratio = approx_mem_size_req / available_mem if ratio > (1 * max_memory_usage_ratio): logger.warning('\tWarning: Not enough memory to safely train XGBoost model, roughly requires: %s GB, but only %s GB is available...' % (round(approx_mem_size_req / 1e9, 3), round(available_mem / 1e9, 3))) raise NotEnoughMemoryError elif ratio > (0.75 * max_memory_usage_ratio): logger.warning('\tWarning: Potentially not enough memory to safely train XGBoost model, roughly requires: %s GB, but only %s GB is available...' % (round(approx_mem_size_req / 1e9, 3), round(available_mem / 1e9, 3))) def _get_default_resources(self): # psutil.cpu_count(logical=False) is faster in training than psutil.cpu_count() num_cpus = psutil.cpu_count(logical=False) num_gpus = 0 return num_cpus, num_gpus def save(self, path: str = None, verbose=True) -> str: _model = self.model self.model = None if _model is not None: self._xgb_model_type = _model.__class__ path = super().save(path=path, verbose=verbose) if _model is not None: # Halves disk usage compared to .json / .pkl _model.save_model(path + 'xgb.ubj') self.model = _model return path @classmethod def load(cls, path: str, reset_paths=True, verbose=True): model = super().load(path=path, reset_paths=reset_paths, verbose=verbose) if model._xgb_model_type is not None: model.model = model._xgb_model_type() # Much faster to load using .ubj than .json (10x+ speedup) model.model.load_model(path + 'xgb.ubj') model._xgb_model_type = None return model def _more_tags(self): # `can_refit_full=True` because n_estimators is communicated at end of `_fit`: # self.params_trained['n_estimators'] = bst.best_ntree_limit return {'can_refit_full': True}