import logging
import math
import os
import re
import warnings
from typing import Any, Callable, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
from scipy.stats import norm
from sklearn.base import BaseEstimator
import autogluon.core as ag
from autogluon.tabular import TabularPredictor
from autogluon.timeseries.dataset.ts_dataframe import ITEMID, TIMESTAMP, TimeSeriesDataFrame
from autogluon.timeseries.models.abstract import AbstractTimeSeriesModel
from autogluon.timeseries.utils.seasonality import get_seasonality
from autogluon.timeseries.utils.warning_filters import statsmodels_warning_filter
logger = logging.getLogger(__name__)
class TabularEstimator(BaseEstimator):
"""Scikit-learn compatible interface for TabularPredictor."""
_label_column_name = "y"
def __init__(self, predictor_init_kwargs: Optional[dict] = None, predictor_fit_kwargs: Optional[dict] = None):
self.predictor_init_kwargs = predictor_init_kwargs if predictor_init_kwargs is not None else {}
self.predictor_fit_kwargs = predictor_fit_kwargs if predictor_fit_kwargs is not None else {}
def get_params(self, deep: bool = True) -> dict:
return {
"predictor_init_kwargs": self.predictor_init_kwargs,
"predictor_fit_kwargs": self.predictor_fit_kwargs,
}
def fit(self, X: pd.DataFrame, y: pd.Series) -> "TabularEstimator":
assert isinstance(X, pd.DataFrame) and isinstance(y, pd.Series)
df = pd.concat([X, y.rename(self._label_column_name).to_frame()], axis=1)
self.predictor = TabularPredictor(label=self._label_column_name, **self.predictor_init_kwargs)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
self.predictor.fit(df, **self.predictor_fit_kwargs)
return self
def predict(self, X: pd.DataFrame) -> np.ndarray:
assert isinstance(X, pd.DataFrame)
return self.predictor.predict(X).values
[docs]class RecursiveTabularModel(AbstractTimeSeriesModel):
"""Predict future time series values one by one using TabularPredictor from AutoGluon-Tabular.
Based on the `mlforecast <https://github.com/Nixtla/mlforecast>`_ library.
Other Parameters
----------------
lags : List[int], default = None
Lags of the target that will be used as features for predictions. If None, will be determined automatically
based on the frequency of the data.
date_features : List[Union[str, Callable]], default = None
Features computed from the dates. Can be pandas date attributes or functions that will take the dates as input.
If None, will be determined automatically based on the frequency of the data.
differences : List[int], default = None
Differences to take of the target before computing the features. These are restored at the forecasting step.
If None, will be set to ``[seasonal_period]``, where seasonal_period is determined based on the data frequency.
standardize : bool, default = True
If True, time series values will be standardized by subtracting mean & dividing by standard deviation.
tabular_hyperparameters : Dict[Dict[str, Any]], optional
Hyperparameters dictionary passed to ``TabularPredictor.fit``. Contains the names of models that should be fit.
Defaults to ``{"GBM": {}}``.
tabular_fit_kwargs : Dict[str, Any], optional
Additional keyword arguments passed to ``TabularPredictor.fit``. Defaults to an empty dict.
max_num_samples : int, default = 1_000_000
If given, training and validation datasets will contain at most this many rows (starting from the end of each
series).
subsampling_strategy : {"items", "timesteps", None}, default = "items"
Strategy used to limit memory consumption of the model if the dataset is too large. Use "items" if the dataset
contains many time series, "timesteps" if the dataset contains a few very long time series, or None to disable
subsampling. Only applies to datasets with > 20_000_000 rows.
"""
# TODO: Use sample_weight to align metrics with Tabular
# TODO: Add lag_transforms
TIMESERIES_METRIC_TO_TABULAR_METRIC = {
"MASE": "mean_absolute_error",
"MAPE": "mean_absolute_percentage_error",
"sMAPE": "mean_absolute_percentage_error",
"mean_wQuantileLoss": "mean_absolute_error",
"MSE": "mean_squared_error",
"RMSE": "root_mean_squared_error",
}
def __init__(
self,
freq: Optional[str] = None,
prediction_length: int = 1,
path: Optional[str] = None,
name: Optional[str] = None,
eval_metric: str = None,
hyperparameters: Dict[str, Any] = None,
**kwargs, # noqa
):
name = name or re.sub(r"Model$", "", self.__class__.__name__) # TODO: look name up from presets
super().__init__(
path=path,
freq=freq,
prediction_length=prediction_length,
name=name,
eval_metric=eval_metric,
hyperparameters=hyperparameters,
**kwargs,
)
from mlforecast import MLForecast
from .utils import StandardScaler
self.mlf: Optional[MLForecast] = None
self.scaler: Optional[StandardScaler] = None
self.required_ts_length: int = 1
self.residuals_std: float = 0.0
@staticmethod
def _get_date_features(freq: str) -> List[Callable]:
# TODO: Use categorical variables for date features
from gluonts.time_feature import time_features_from_frequency_str
return time_features_from_frequency_str(freq)
def _get_mlforecast_init_args(self, train_data: TimeSeriesDataFrame, model_params: dict) -> dict:
from gluonts.time_feature import get_lags_for_frequency
from mlforecast.target_transforms import Differences
from .utils import StandardScaler
lags = model_params.get("lags")
if lags is None:
lags = get_lags_for_frequency(self.freq)
date_features = model_params.get("date_features")
if date_features is None:
date_features = self._get_date_features(self.freq)
differences = model_params.get("differences")
if differences is None:
differences = [get_seasonality(self.freq)]
ts_lengths = train_data.num_timesteps_per_item()
required_ts_length = sum(differences) + 1
all_train_ts_are_long_enough = ts_lengths.min() >= required_ts_length
some_ts_available_for_validation = ts_lengths.max() >= required_ts_length + self.prediction_length
if not (all_train_ts_are_long_enough and some_ts_available_for_validation):
logger.warning(
f"\tTime series in the dataset are too short for chosen differences {differences}. "
f"Setting differences to [1]."
)
differences = [1]
target_transforms = []
if len(differences) > 0:
target_transforms.append(Differences(differences))
self.required_ts_length = sum(differences) + 1
if model_params.get("standardize", True):
self.scaler = StandardScaler()
target_transforms.append(self.scaler)
return {
"lags": lags,
"date_features": date_features,
"target_transforms": target_transforms,
}
def _to_mlforecast_df(
self,
data: TimeSeriesDataFrame,
static_features: pd.DataFrame,
include_target: bool = True,
) -> pd.DataFrame:
"""Convert TimeSeriesDataFrame to a format expected by MLForecast methods `predict` and `preprocess`.
Each row contains unique_id, ds, y, and (optionally) known covariates & static features.
"""
# past_covariates & lags for known_covariates are not supported
selected_columns = self.metadata.known_covariates_real.copy()
column_name_mapping = {ITEMID: "unique_id", TIMESTAMP: "ds"}
if include_target:
selected_columns += [self.target]
column_name_mapping[self.target] = "y"
df = pd.DataFrame(data)[selected_columns].reset_index()
if static_features is not None:
df = pd.merge(df, static_features, how="left", on=ITEMID, suffixes=(None, "_static_feat"))
# FIXME: If unique_id column is not sorted, MLForecast will assign incorrect IDs to forecasts
return df.rename(columns=column_name_mapping).sort_values(by="unique_id", kind="stable")
def _get_features_dataframe(
self,
data: TimeSeriesDataFrame,
last_k_values: Optional[int] = None,
) -> Tuple[pd.DataFrame, pd.Series]:
"""Construct feature matrix containing lags, covariates, and target time series values.
Rows where the regression target equals NaN are dropped, but rows where the features are missing are kept.
Parameters
----------
data : TimeSeriesDataFrame
Time series data that needs to be converted.
last_k_values : int, optional
If given, only last `last_k_values` rows will be kept for each time series.
"""
item_ids_to_exclude = data.item_ids[data.num_timesteps_per_item() < self.required_ts_length]
if len(item_ids_to_exclude) > 0:
data = data.drop(item_ids_to_exclude, level=0)
df = self._to_mlforecast_df(data, data.static_features)
# FIXME: keep_last_n produces a bug if time series too short -> manually select tail of each series
features = self.mlf.preprocess(
df,
dropna=False,
static_features=None, # we handle static features in `_to_mlforecast_df`, without relying on MLForecast
)
del self.mlf.ts.features_
if last_k_values is not None:
features = features.groupby("unique_id", sort=False).tail(last_k_values)
features.dropna(subset=self.mlf.ts.target_col, inplace=True)
features = features.reset_index(drop=True)
return features[self.mlf.ts.features_order_], features[self.mlf.ts.target_col]
@staticmethod
def _subsample_data_to_avoid_oom(
data: TimeSeriesDataFrame,
strategy: Optional[str] = "items",
max_num_rows: int = 20_000_000,
) -> TimeSeriesDataFrame:
"""Subsample time series from the dataset to avoid out of memory errors inside MLForecast.preprocess."""
# TODO: Find a better way to ensure that the model does not run out of memory. E.g., by estimating the expected
# memory usage & comparing it to currently available RAM
if len(data) > max_num_rows:
if strategy == "items":
item_ids = data.item_ids
num_items_to_keep = math.ceil(len(item_ids) * max_num_rows / len(data))
items_to_keep = np.random.choice(item_ids, num_items_to_keep, replace=False)
logger.debug(
f"\tRandomly selected {num_items_to_keep} ({num_items_to_keep / len(item_ids):.1%}) time series "
"to limit peak memory usage"
)
data = data.query("item_id in @items_to_keep")
elif strategy == "timesteps":
num_timesteps_to_remove = math.floor((len(data) - max_num_rows) / data.num_items)
logger.debug(
f"\tRemoving {num_timesteps_to_remove} from the start of each time series to limit peak memory usage"
)
data = data.slice_by_timestep(num_timesteps_to_remove, None)
return data
def _fit(
self,
train_data: TimeSeriesDataFrame,
val_data: Optional[TimeSeriesDataFrame] = None,
time_limit: Optional[int] = None,
verbosity: int = 2,
**kwargs,
) -> None:
self._check_fit_params()
from mlforecast import MLForecast
# TabularEstimator is passed to MLForecast later to include tuning_data
model_params = self._get_model_params().copy()
subsampling_strategy = model_params.pop("subsampling_strategy", "items")
train_data = self._subsample_data_to_avoid_oom(train_data, strategy=subsampling_strategy)
mlforecast_init_args = self._get_mlforecast_init_args(train_data, model_params)
self.mlf = MLForecast(models={}, freq=self.freq, **mlforecast_init_args)
# Do not use external val_data as tuning_data to avoid overfitting
train_subset, val_subset = train_data.train_test_split(self.prediction_length)
max_num_samples = model_params.get("max_num_samples", 1_000_000)
max_rows_per_item = math.ceil(max_num_samples / train_data.num_items)
X_train, y_train = self._get_features_dataframe(train_subset, last_k_values=max_rows_per_item)
X_val, y_val = self._get_features_dataframe(
val_subset, last_k_values=min(self.prediction_length, max_rows_per_item)
)
estimator = TabularEstimator(
predictor_init_kwargs={
"path": self.path + os.sep + "point_predictor",
"problem_type": ag.constants.REGRESSION,
"eval_metric": self.TIMESERIES_METRIC_TO_TABULAR_METRIC[self.eval_metric],
"verbosity": verbosity - 2,
},
predictor_fit_kwargs={
"tuning_data": pd.concat([X_val, y_val], axis=1),
"time_limit": time_limit,
"hyperparameters": model_params.get("tabular_hyperparameters", {"GBM": {}}),
**model_params.get("tabular_fit_kwargs", {}),
},
)
self.mlf.models = {"mean": estimator}
with statsmodels_warning_filter():
self.mlf.fit_models(X_train, y_train)
self.residuals_std = (self.mlf.models_["mean"].predict(X_train) - y_train).std()
def _predict_with_mlforecast(
self,
data: TimeSeriesDataFrame,
known_covariates: TimeSeriesDataFrame = None,
) -> pd.DataFrame:
"""Generate a point forecast with MLForecast.
Returns
-------
predictions : pd.DataFrame
Predictions with a single column "mean" containing the point forecast.
"""
new_data = self._to_mlforecast_df(data, data.static_features)
if known_covariates is not None:
dynamic_dfs = [self._to_mlforecast_df(known_covariates, data.static_features, include_target=False)]
else:
dynamic_dfs = None
with statsmodels_warning_filter():
raw_predictions = self.mlf.predict(
horizon=self.prediction_length,
new_data=new_data,
dynamic_dfs=dynamic_dfs,
)
predictions = raw_predictions.rename(columns={"unique_id": ITEMID, "ds": TIMESTAMP})
return predictions.set_index([ITEMID, TIMESTAMP])
def _get_scale_per_item(self, item_ids: pd.Index) -> pd.Series:
"""Extract the 'std' values from the scaler object, if available."""
if self.scaler is not None:
return self.scaler.stats_["_std"].copy().reindex(item_ids)
else:
return pd.Series(1.0, index=item_ids)
def predict(
self,
data: TimeSeriesDataFrame,
known_covariates: TimeSeriesDataFrame = None,
**kwargs,
) -> TimeSeriesDataFrame:
if (data.num_timesteps_per_item() < self.required_ts_length).any():
# Raise a RuntimeError to avoid a numba segfault that kills the Python process
raise RuntimeError(f"{self.name} requires that all time series have length >= {self.required_ts_length}")
predictions = self._predict_with_mlforecast(data=data, known_covariates=known_covariates)
scale_per_item = self._get_scale_per_item(predictions.index.unique(level=ITEMID))
num_items = int(len(predictions) / self.prediction_length)
sqrt_h = np.sqrt(np.arange(1, self.prediction_length + 1))
# Series where normal_scale_per_timestep.loc[item_id].loc[N] = sqrt(1 + N) for N in range(prediction_length)
normal_scale_per_timestep = pd.Series(np.tile(sqrt_h, num_items), index=predictions.index)
std_per_timestep = self.residuals_std * scale_per_item * normal_scale_per_timestep
for q in self.quantile_levels:
predictions[str(q)] = predictions["mean"] + norm.ppf(q) * std_per_timestep
return TimeSeriesDataFrame(predictions.reindex(data.item_ids, level=ITEMID))
def _more_tags(self) -> dict:
return {"can_refit_full": True}