Source code for autogluon.tabular.models.knn.knn_model
import logging
import numpy as np
import math
import psutil
import time
from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor
from autogluon.core.constants import REGRESSION
from autogluon.core.utils.exceptions import NotEnoughMemoryError
from autogluon.core.features.types import R_CATEGORY, R_OBJECT, S_TEXT_NGRAM, S_TEXT_SPECIAL, S_DATETIME_AS_INT
from .knn_utils import FAISSNeighborsClassifier, FAISSNeighborsRegressor
from autogluon.core.models.abstract.model_trial import skip_hpo
from autogluon.core.models import AbstractModel
logger = logging.getLogger(__name__)
# TODO: Normalize data!
[docs]class KNNModel(AbstractModel):
"""
KNearestNeighbors model (scikit-learn): https://scikit-learn.org/stable/modules/generated/sklearn.neighbors.KNeighborsClassifier.html
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._model_type = self._get_model_type()
def _get_model_type(self):
if self.problem_type == REGRESSION:
return KNeighborsRegressor
else:
return KNeighborsClassifier
def _preprocess(self, X, **kwargs):
X = super()._preprocess(X, **kwargs)
X = X.fillna(0).to_numpy(dtype=np.float32)
return X
def _set_default_params(self):
default_params = {
'weights': 'uniform',
'n_jobs': -1,
}
for param, val in default_params.items():
self._set_default_param_value(param, val)
def _get_default_auxiliary_params(self) -> dict:
default_auxiliary_params = super()._get_default_auxiliary_params()
extra_auxiliary_params = dict(
ignored_type_group_raw=[R_CATEGORY, R_OBJECT], # TODO: Eventually use category features
ignored_type_group_special=[S_TEXT_NGRAM, S_TEXT_SPECIAL, S_DATETIME_AS_INT],
)
default_auxiliary_params.update(extra_auxiliary_params)
return default_auxiliary_params
# TODO: Enable HPO for KNN
def _get_default_searchspace(self):
spaces = {}
return spaces
def _fit(self,
X,
y,
time_limit=None,
sample_weight=None,
**kwargs):
time_start = time.time()
X = self.preprocess(X)
self._validate_fit_memory_usage(X=X) # TODO: Can incorporate this into samples, can fit on portion of data to satisfy memory instead of raising exception immediately
if sample_weight is not None: # TODO: support
logger.log(15, "sample_weight not yet supported for KNNModel, this model will ignore them in training.")
num_rows_max = len(X)
# FIXME: v0.1 Must store final num rows for refit_full or else will use everything! Worst case refit_full could train far longer than the original model.
if time_limit is None or num_rows_max <= 10000:
self.model = self._model_type(**self.params).fit(X, y)
else:
self.model = self._fit_with_samples(X=X, y=y, time_limit=time_limit - (time.time() - time_start))
def _validate_fit_memory_usage(self, X):
max_memory_usage_ratio = self.params_aux['max_memory_usage_ratio']
model_size_bytes = 4 * X.shape[0] * X.shape[1] # Assuming float32 types
expected_final_model_size_bytes = model_size_bytes * 3.6 # Roughly what can be expected of the final KNN model in memory size
if expected_final_model_size_bytes > 10000000: # Only worth checking if expected model size is >10MB
available_mem = psutil.virtual_memory().available
model_memory_ratio = expected_final_model_size_bytes / available_mem
if model_memory_ratio > (0.15 * max_memory_usage_ratio):
logger.warning(f'\tWarning: Model is expected to require {round(model_memory_ratio * 100, 2)}% of available memory...')
if model_memory_ratio > (0.20 * max_memory_usage_ratio):
raise NotEnoughMemoryError # don't train full model to avoid OOM error
# TODO: Consider making this fully generic and available to all models
def _fit_with_samples(self, X, y, time_limit):
"""
Fit model with samples of the data repeatedly, gradually increasing the amount of data until time_limit is reached or all data is used.
X and y must already be preprocessed
"""
time_start = time.time()
sample_growth_factor = 2 # Growth factor of each sample in terms of row count
sample_time_growth_factor = 8 # Assume next sample will take 8x longer than previous (Somewhat safe but there are datasets where it is even >8x.
num_rows_samples = []
num_rows_max = len(X)
num_rows_cur = 10000
while True:
num_rows_cur = min(num_rows_cur, num_rows_max)
num_rows_samples.append(num_rows_cur)
if num_rows_cur == num_rows_max:
break
num_rows_cur *= sample_growth_factor
num_rows_cur = math.ceil(num_rows_cur)
if num_rows_cur * 1.5 >= num_rows_max:
num_rows_cur = num_rows_max
def sample_func(chunk, frac):
# Guarantee at least 1 sample (otherwise log_loss would crash or model would return different column counts in pred_proba)
n = max(math.ceil(len(chunk) * frac), 1)
return chunk.sample(n=n, replace=False, random_state=0)
if self.problem_type != REGRESSION:
y_df = y.to_frame(name='label').reset_index(drop=True)
else:
y_df = None
time_start_sample_loop = time.time()
time_limit_left = time_limit - (time_start_sample_loop - time_start)
for i, samples in enumerate(num_rows_samples):
if samples != num_rows_max:
if self.problem_type == REGRESSION:
idx = np.random.choice(num_rows_max, size=samples, replace=False)
else:
idx = y_df.groupby('label', group_keys=False).apply(sample_func, frac=samples/num_rows_max).index
X_samp = X[idx, :]
y_samp = y.iloc[idx]
else:
X_samp = X
y_samp = y
self.model = self._model_type(**self.params).fit(X_samp, y_samp)
time_limit_left_prior = time_limit_left
time_fit_end_sample = time.time()
time_limit_left = time_limit - (time_fit_end_sample - time_start)
time_fit_sample = time_limit_left_prior - time_limit_left
time_required_for_next = time_fit_sample * sample_time_growth_factor
logger.log(15, f'\t{round(time_fit_sample, 2)}s \t= Train Time (Using {samples}/{num_rows_max} rows) ({round(time_limit_left, 2)}s remaining time)')
if time_required_for_next > time_limit_left and i != len(num_rows_samples) - 1:
logger.log(20, f'\tNot enough time to train KNN model on all training rows. Fit {samples}/{num_rows_max} rows. (Training KNN model on {num_rows_samples[i+1]} rows is expected to take {round(time_required_for_next, 2)}s)')
break
return self.model
# TODO: Add HPO
def _hyperparameter_tune(self, **kwargs):
return skip_hpo(self, **kwargs)
class FAISSModel(KNNModel):
def _get_model_type(self):
if self.problem_type == REGRESSION:
return FAISSNeighborsRegressor
else:
return FAISSNeighborsClassifier
def _set_default_params(self):
default_params = {
'index_factory_string': 'Flat',
}
for param, val in default_params.items():
self._set_default_param_value(param, val)
super()._set_default_params()