Source code for autogluon.tabular.models.vowpalwabbit.vowpalwabbit_model
import logging
import time
import numpy as np
import pandas as pd
from autogluon.core.models import AbstractModel
from autogluon.common.features.types import R_INT, R_FLOAT, R_CATEGORY, R_OBJECT, S_IMAGE_PATH, S_TEXT_NGRAM, S_TEXT_AS_CATEGORY, S_TEXT_SPECIAL
from autogluon.core.utils.try_import import try_import_vowpalwabbit
from autogluon.core.constants import BINARY, REGRESSION, MULTICLASS, \
PROBLEM_TYPES_CLASSIFICATION, PROBLEM_TYPES_REGRESSION
from autogluon.core.utils.exceptions import TimeLimitExceeded
from .vowpalwabbit_utils import VWFeaturesConverter
logger = logging.getLogger(__name__)
[docs]class VowpalWabbitModel(AbstractModel):
"""
VowpalWabbit Model: https://vowpalwabbit.org/
VowpalWabbit Command Line args: https://github.com/VowpalWabbit/vowpal_wabbit/wiki/Command-line-arguments
"""
model_internals_file_name = 'model-internals.pkl'
# Ref: https://github.com/VowpalWabbit/vowpal_wabbit/wiki/Loss-functions
CLASSIFICATION_LOSS_FUNCTIONS = ['logistic', 'hinge']
REGRESSION_LOSS_FUNCTIONS = ['squared', 'quantile', 'poisson', 'classic']
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._load_model = None # Used for saving and loading internal model file
# The `_preprocess` method takes the input data and transforms it to the internal representation usable by the model.
# `_preprocess` is called by `preprocess` and is used during model fit and model inference.
def _preprocess(self, X: pd.DataFrame, is_train=False, **kwargs) -> pd.Series:
X = super()._preprocess(X, **kwargs)
if is_train:
self._features_converter = VWFeaturesConverter()
self._feature_metadata_dict = self._feature_metadata.to_dict()
# self._feature_metadata contains the information related to features metadata.
X_series = self._features_converter.convert_features_to_vw_format(X, self._feature_metadata_dict)
return X_series
# The `_fit` method takes the input training data (and optionally the validation data) and trains the model.
def _fit(self,
X: pd.DataFrame, # training data
y: pd.Series, # training labels
time_limit=None,
verbosity=2,
**kwargs): # kwargs includes many other potential inputs, refer to AbstractModel documentation for details
time_start = time.time()
try_import_vowpalwabbit()
import vowpalwabbit
seed = 0 # Random seed
# Valid self.problem_type values include ['binary', 'multiclass', 'regression', 'quantile', 'softclass']
if self.problem_type not in PROBLEM_TYPES_REGRESSION + PROBLEM_TYPES_CLASSIFICATION:
raise TypeError(f"Vowpal Wabbit does not support {self.problem_type}")
# Certain parameters like passes are passed as hyperparameters but are not used
# while initialising the model.
# passes: Used as epochs
params = self._get_model_params()
params['loss_function'] = params.get('loss_function', self._get_default_loss_function())
passes = params.pop('passes')
# Make sure to call preprocess on X near the start of `_fit`.
# This is necessary because the data is converted via preprocess during predict, and needs to be in the same format as during fit.
X_series = self.preprocess(X, is_train=True)
self._validate_loss_function(loss_function=params['loss_function'])
# VW expects label from 1 to N for Binary and Multiclass classification problems
# AutoGluon does label encoding from 0 to N-1, hence we increment the value of y by 1
if self.problem_type != REGRESSION:
y = y.apply(lambda row: row + 1)
y = y.astype(str) + ' '
# Concatenate y and X to get the training data in VW format
final_training_data = y + X_series
final_training_data = final_training_data.tolist()
extra_params = {
'cache_file': 'train.cache',
'holdout_off': True,
}
if verbosity <= 3:
extra_params['quiet'] = True
# Initialize the model
if self.problem_type in PROBLEM_TYPES_CLASSIFICATION:
# Ref: https://github.com/VowpalWabbit/vowpal_wabbit/wiki/Predicting-probabilities#multi-class---oaa
extra_params['oaa'] = self.num_classes
extra_params['probabilities'] = True
self.model = vowpalwabbit.Workspace(**params, **extra_params)
time_start_fit = time.time()
if time_limit is not None:
time_limit_fit = time_limit - (time_start_fit - time_start) - 0.3 # Account for 0.3s overhead
if time_limit_fit <= 0:
raise TimeLimitExceeded
else:
time_limit_fit = None
# Train the model
np.random.seed(seed)
epoch = 0
for epoch in range(1, passes + 1):
# TODO: Add Early Stopping support via validation
self._train_single_epoch(training_data=final_training_data)
if time_limit_fit is not None and epoch < passes:
time_fit_used = time.time() - time_start_fit
time_fit_used_per_epoch = time_fit_used / epoch
time_left = time_limit_fit - time_fit_used
if time_left <= (time_fit_used_per_epoch*2):
logger.log(30, f'\tEarly stopping due to lack of time. Fit {epoch}/{passes} passes...')
break
self.params_trained['passes'] = epoch
def _train_single_epoch(self, training_data):
row_order = np.arange(0, len(training_data))
row_order = np.random.permutation(row_order)
for row_i in row_order:
row = training_data[row_i]
self.model.learn(row)
def _validate_loss_function(self, loss_function):
# Ref: https://github.com/VowpalWabbit/vowpal_wabbit/wiki/Loss-functions
if loss_function:
if self.problem_type in PROBLEM_TYPES_CLASSIFICATION:
assert loss_function in self.CLASSIFICATION_LOSS_FUNCTIONS, \
f'For {self.problem_type} problem, VW supports: {self.CLASSIFICATION_LOSS_FUNCTIONS}. ' \
f'Got loss_function:{loss_function}'
elif self.problem_type in PROBLEM_TYPES_REGRESSION:
assert loss_function in self.REGRESSION_LOSS_FUNCTIONS, \
f'For {self.problem_type} problem, VW supports: {self.REGRESSION_LOSS_FUNCTIONS}. ' \
f'Got loss_function:{loss_function}'
def _get_default_loss_function(self) -> str:
# Ref: https://github.com/VowpalWabbit/vowpal_wabbit/wiki/Loss-functions
if self.problem_type in PROBLEM_TYPES_CLASSIFICATION:
return 'logistic'
else:
return 'squared'
def save(self, path: str = None, verbose=True) -> str:
"""
AutoGluon by default saves the complete Abstract Model in a pickle file format.
This includes the internal self.model which is the actual model.
However, saving VW model in pickle is not possible.
Hence, we dump the Abstract Model by setting setting self.model as None
and save self.model as a separate internal file using that model's saving mechanism
:param path: path where model is to be saved
:param verbose: verbosity
:return: path where model is saved
"""
self._load_model = self.model is not None
__model = self.model
self.model = None
path = super().save(path=path, verbose=verbose)
self.model = __model
# Export model
if self._load_model:
file_path = path + self.model_internals_file_name
self.model.save(file_path)
self._load_model = None
return path
@classmethod
def load(cls, path: str, reset_paths=True, verbose=True):
"""
There are two files which needs to be loaded.
First is the Abstract Model pickle dump and second is the internal model file.
For VW, based on different problem_type/hyperparams, loading arguments will be different
"""
try_import_vowpalwabbit()
import vowpalwabbit
# Load Abstract Model. This is without the internal model
model = super().load(path, reset_paths=reset_paths, verbose=verbose)
params = model._get_model_params()
# Load the internal model file
if model._load_model:
file_path = path + cls.model_internals_file_name
model_load_params = f" -i {file_path} --quiet"
if model.problem_type in PROBLEM_TYPES_CLASSIFICATION:
model_load_params += " --probabilities --loss_function=logistic"
if params['sparse_weights']:
model_load_params += " --sparse_weights"
model.model = vowpalwabbit.Workspace(model_load_params)
model._load_model = None
return model
def _predict_proba(self, X, **kwargs):
# Preprocess the set of X features
X = self.preprocess(X, **kwargs)
y_pred_proba = np.array([self.model.predict(row) for row in X])
return self._convert_proba_to_unified_form(y_pred_proba)
def get_memory_size(self) -> int:
# TODO: Can be improved further to make it more accurate
# Returning 5MB as the value
return int(5e6)
# The `_set_default_params` method defines the default hyperparameters of the model.
# User-specified parameters will override these values on a key-by-key basis.
def _set_default_params(self):
default_params = {
'passes': 10, # TODO: Much better if 500+, revisit this if wanting to use VW to get strong results
'bit_precision': 32,
'ngram': 2,
'skips': 1,
'learning_rate': 1,
'sparse_weights': True,
}
for param, val in default_params.items():
self._set_default_param_value(param, val)
# The `_get_default_auxiliary_params` method defines various model-agnostic parameters such as maximum memory usage and valid input column dtypes.
# For most users who build custom models, they will only need to specify the valid/invalid dtypes to the model here.
def _get_default_auxiliary_params(self) -> dict:
default_auxiliary_params = super()._get_default_auxiliary_params()
# Ignore the below mentioned special types. Only those features that are not of the below mentioned
# type are passed to the model for training list are passed features
extra_auxiliary_params = dict(
valid_raw_types=[R_INT, R_FLOAT, R_CATEGORY, R_OBJECT],
ignored_type_group_special=[S_IMAGE_PATH, S_TEXT_NGRAM, S_TEXT_AS_CATEGORY, S_TEXT_SPECIAL]
)
default_auxiliary_params.update(extra_auxiliary_params)
return default_auxiliary_params
@classmethod
def _get_default_ag_args(cls) -> dict:
default_ag_args = super()._get_default_ag_args()
extra_ag_args = {
'valid_stacker': False,
'problem_types': [BINARY, MULTICLASS, REGRESSION],
}
default_ag_args.update(extra_ag_args)
return default_ag_args
def _more_tags(self):
# `can_refit_full=True` because best epoch is communicated at end of `_fit`: `self.params_trained['passes'] = epoch`
return {'can_refit_full': True}
@classmethod
def _class_tags(cls):
return {'handles_text': True}