Source code for autogluon.tabular.models.knn.knn_model

import logging
import math
import time
from typing import Dict, Union

import numpy as np

from autogluon.common.features.types import R_FLOAT, R_INT, S_BOOL
from autogluon.common.utils.resource_utils import ResourceManager
from autogluon.core.constants import BINARY, MULTICLASS, REGRESSION
from autogluon.core.models import AbstractModel
from autogluon.core.utils.exceptions import NotEnoughMemoryError
from autogluon.core.utils.utils import normalize_pred_probas

logger = logging.getLogger(__name__)


# TODO: Normalize data!
[docs] class KNNModel(AbstractModel): """ KNearestNeighbors model (scikit-learn): https://scikit-learn.org/stable/modules/generated/sklearn.neighbors.KNeighborsClassifier.html """ def __init__(self, **kwargs): super().__init__(**kwargs) self._X_unused_index = None # Keeps track of unused training data indices, necessary for LOO OOF generation def _get_model_type(self): if self.params_aux.get("use_daal", True): try: # TODO: Add more granular switch, currently this affects all future KNN models even if they had `use_daal=False` from sklearnex.neighbors import KNeighborsClassifier, KNeighborsRegressor # sklearnex backend for KNN seems to be 20-40x+ faster than native sklearn with no downsides. logger.log(15, "\tUsing sklearnex KNN backend...") except: from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor else: from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor if self.problem_type == REGRESSION: return KNeighborsRegressor else: return KNeighborsClassifier def _preprocess(self, X, **kwargs): X = super()._preprocess(X, **kwargs) X = X.fillna(0).to_numpy(dtype=np.float32) return X def _set_default_params(self): default_params = { "weights": "uniform", } for param, val in default_params.items(): self._set_default_param_value(param, val) def _get_default_auxiliary_params(self) -> dict: default_auxiliary_params = super()._get_default_auxiliary_params() extra_auxiliary_params = dict( valid_raw_types=[R_INT, R_FLOAT], # TODO: Eventually use category features ignored_type_group_special=[S_BOOL], ) default_auxiliary_params.update(extra_auxiliary_params) return default_auxiliary_params @classmethod def _get_default_ag_args(cls) -> dict: default_ag_args = super()._get_default_ag_args() extra_ag_args = { "valid_stacker": False, "problem_types": [BINARY, MULTICLASS, REGRESSION], } default_ag_args.update(extra_ag_args) return default_ag_args @classmethod def _get_default_ag_args_ensemble(cls, **kwargs) -> dict: default_ag_args_ensemble = super()._get_default_ag_args_ensemble(**kwargs) extra_ag_args_ensemble = {"use_child_oof": True} default_ag_args_ensemble.update(extra_ag_args_ensemble) return default_ag_args_ensemble # TODO: Enable HPO for KNN def _get_default_searchspace(self): spaces = {} return spaces def _fit(self, X, y, num_cpus=-1, time_limit=None, sample_weight=None, **kwargs): time_start = time.time() X = self.preprocess(X) params = self._get_model_params() if "n_jobs" not in params: params["n_jobs"] = num_cpus if sample_weight is not None: # TODO: support logger.log(15, "sample_weight not yet supported for KNNModel, this model will ignore them in training.") num_rows_max = len(X) # FIXME: v0.1 Must store final num rows for refit_full or else will use everything! Worst case refit_full could train far longer than the original model. if time_limit is None or num_rows_max <= 10000: self.model = self._get_model_type()(**params).fit(X, y) else: self.model = self._fit_with_samples(X=X, y=y, model_params=params, time_limit=time_limit - (time.time() - time_start)) def _estimate_memory_usage(self, X, **kwargs): model_size_bytes = 4 * X.shape[0] * X.shape[1] # Assuming float32 types expected_final_model_size_bytes = model_size_bytes * 3.6 # Roughly what can be expected of the final KNN model in memory size return expected_final_model_size_bytes def _validate_fit_memory_usage(self, mem_error_threshold: float = 0.2, mem_warning_threshold: float = 0.15, mem_size_threshold: int = 1e7, **kwargs): return super()._validate_fit_memory_usage( mem_error_threshold=mem_error_threshold, mem_warning_threshold=mem_warning_threshold, mem_size_threshold=mem_size_threshold, **kwargs ) # TODO: Won't work for RAPIDS without modification # TODO: Technically isn't OOF, but can be used inplace of OOF. Perhaps rename to something more accurate? def predict_proba_oof(self, X, normalize=None, **kwargs): """X should be the same X passed to `.fit`""" y_oof_pred_proba = self._predict_proba_oof(X=X, **kwargs) if normalize is None: normalize = self.normalize_pred_probas if normalize: y_oof_pred_proba = normalize_pred_probas(y_oof_pred_proba, self.problem_type) y_oof_pred_proba = y_oof_pred_proba.astype(np.float32) return y_oof_pred_proba def _predict_proba_oof(self, X, **kwargs): from ._knn_loo_variants import KNeighborsClassifierLOOMixin, KNeighborsRegressorLOOMixin if self.problem_type in [BINARY, MULTICLASS]: y_oof_pred_proba = KNeighborsClassifierLOOMixin.predict_proba_loo(self.model) else: y_oof_pred_proba = KNeighborsRegressorLOOMixin.predict_loo(self.model) y_oof_pred_proba = self._convert_proba_to_unified_form(y_oof_pred_proba) if X is not None and self._X_unused_index: X_unused = X.iloc[self._X_unused_index] y_pred_proba_new = self.predict_proba(X_unused) X_unused_index = set(self._X_unused_index) num_rows = len(X) X_used_index = [i for i in range(num_rows) if i not in X_unused_index] oof_pred_shape = y_oof_pred_proba.shape if len(oof_pred_shape) == 1: y_oof_tmp = np.zeros(num_rows, dtype=np.float32) y_oof_tmp[X_used_index] = y_oof_pred_proba y_oof_tmp[self._X_unused_index] = y_pred_proba_new else: y_oof_tmp = np.zeros((num_rows, oof_pred_shape[1]), dtype=np.float32) y_oof_tmp[X_used_index, :] = y_oof_pred_proba y_oof_tmp[self._X_unused_index, :] = y_pred_proba_new y_oof_pred_proba = y_oof_tmp return y_oof_pred_proba # TODO: Consider making this fully generic and available to all models def _fit_with_samples(self, X, y, model_params, time_limit, start_samples=10000, max_samples=None, sample_growth_factor=2, sample_time_growth_factor=8): """ Fit model with samples of the data repeatedly, gradually increasing the amount of data until time_limit is reached or all data is used. X and y must already be preprocessed. Parameters ---------- X : np.ndarray The training data features (preprocessed). y : Series The training data ground truth labels. time_limit : float, default = None Time limit in seconds to adhere to when fitting model. start_samples : int, default = 10000 Number of samples to start with. This will be multiplied by sample_growth_factor after each model fit to determine the next number of samples. For example, if start_samples=10000, sample_growth_factor=2, then the number of samples per model fit would be [10000, 20000, 40000, 80000, ...] max_samples : int, default = None The maximum number of samples to use. If None or greater than the number of rows in X, then it is set equal to the number of rows in X. sample_growth_factor : float, default = 2 The rate of growth in sample size between each model fit. If 2, then the sample size doubles after each fit. sample_time_growth_factor : float, default = 8 The multiplier to the expected fit time of the next model. If `sample_time_growth_factor=8` and a model took 10 seconds to train, the next model fit will be expected to take 80 seconds. If an expected time is greater than the remaining time in `time_limit`, the model will not be trained and the method will return early. """ time_start = time.time() num_rows_samples = [] if max_samples is None: num_rows_max = len(X) else: num_rows_max = min(len(X), max_samples) num_rows_cur = start_samples while True: num_rows_cur = min(num_rows_cur, num_rows_max) num_rows_samples.append(num_rows_cur) if num_rows_cur == num_rows_max: break num_rows_cur *= sample_growth_factor num_rows_cur = math.ceil(num_rows_cur) if num_rows_cur * 1.5 >= num_rows_max: num_rows_cur = num_rows_max def sample_func(chunk, frac): # Guarantee at least 1 sample (otherwise log_loss would crash or model would return different column counts in pred_proba) n = max(math.ceil(len(chunk) * frac), 1) return chunk.sample(n=n, replace=False, random_state=0) if self.problem_type != REGRESSION: y_df = y.to_frame(name="label").reset_index(drop=True) else: y_df = None time_start_sample_loop = time.time() time_limit_left = time_limit - (time_start_sample_loop - time_start) model_type = self._get_model_type() idx = None for i, samples in enumerate(num_rows_samples): if samples != num_rows_max: if self.problem_type == REGRESSION: idx = np.random.choice(num_rows_max, size=samples, replace=False) else: idx = y_df.groupby("label", group_keys=False).apply(sample_func, frac=samples / num_rows_max).index X_samp = X[idx, :] y_samp = y.iloc[idx] else: X_samp = X y_samp = y idx = None self.model = model_type(**model_params).fit(X_samp, y_samp) time_limit_left_prior = time_limit_left time_fit_end_sample = time.time() time_limit_left = time_limit - (time_fit_end_sample - time_start) time_fit_sample = time_limit_left_prior - time_limit_left time_required_for_next = time_fit_sample * sample_time_growth_factor logger.log(15, f"\t{round(time_fit_sample, 2)}s \t= Train Time (Using {samples}/{num_rows_max} rows) ({round(time_limit_left, 2)}s remaining time)") if time_required_for_next > time_limit_left and i != len(num_rows_samples) - 1: logger.log( 20, f"\tNot enough time to train KNN model on all training rows. Fit {samples}/{num_rows_max} rows. (Training KNN model on {num_rows_samples[i+1]} rows is expected to take {round(time_required_for_next, 2)}s)", ) break if idx is not None: idx = set(idx) self._X_unused_index = [i for i in range(num_rows_max) if i not in idx] return self.model def _get_maximum_resources(self) -> Dict[str, Union[int, float]]: # use at most 32 cpus to avoid OpenBLAS error: https://github.com/autogluon/autogluon/issues/1020 return {"num_cpus": 32} def _get_default_resources(self): # use at most 32 cpus to avoid OpenBLAS error: https://github.com/autogluon/autogluon/issues/1020 num_cpus = ResourceManager.get_cpu_count() num_gpus = 0 return num_cpus, num_gpus def _more_tags(self): return { "valid_oof": True, "can_refit_full": True, }
class FAISSModel(KNNModel): def _get_model_type(self): from .knn_utils import FAISSNeighborsClassifier, FAISSNeighborsRegressor if self.problem_type == REGRESSION: return FAISSNeighborsRegressor else: return FAISSNeighborsClassifier def _set_default_params(self): default_params = { "index_factory_string": "Flat", } for param, val in default_params.items(): self._set_default_param_value(param, val) super()._set_default_params() @classmethod def _get_default_ag_args_ensemble(cls, **kwargs) -> dict: default_ag_args_ensemble = super()._get_default_ag_args_ensemble(**kwargs) extra_ag_args_ensemble = {"use_child_oof": False} default_ag_args_ensemble.update(extra_ag_args_ensemble) return default_ag_args_ensemble def _more_tags(self): return {"valid_oof": False}