Source code for autogluon.timeseries.models.local.statsforecast

import logging
from typing import List, Type, Union

import pandas as pd

from autogluon.core.utils.exceptions import TimeLimitExceeded
from autogluon.timeseries.dataset.ts_dataframe import ITEMID, TIMESTAMP, TimeSeriesDataFrame
from autogluon.timeseries.models.local.abstract_local_model import AbstractLocalModel
from autogluon.timeseries.utils.hashing import hash_ts_dataframe_items
from autogluon.timeseries.utils.warning_filters import statsmodels_warning_filter

logger = logging.getLogger(__name__)


class AbstractStatsForecastModel(AbstractLocalModel):
    """Wrapper for StatsForecast models.

    Cached predictions are stored inside the model to speed up validation & ensemble training downstream.

    Attributes
    ----------
    allowed_local_model_args : List[str]
        List of allowed arguments that can be passed to the underlying model.
        Arguments not in this list will be filtered out and not passed to the underlying model.
    """

    allowed_local_model_args: List[str] = []
    DEFAULT_N_JOBS: Union[float, int] = -1

    def get_model_type(self) -> Type:
        raise NotImplementedError

    def _fit(self, train_data, time_limit=None, verbosity=2, **kwargs) -> None:
        """Prepare hyperparameters that will be passed to the underlying model.

        As for all local models, actual fitting + predictions are delegated to the ``predict`` method.
        """
        # TODO: Find a way to ensure that SF models respect time_limit, e.g. https://docs.python.org/3/library/concurrent.futures.html
        # Fitting usually takes >= 15 seconds
        if time_limit is not None:
            if time_limit < 10:
                raise TimeLimitExceeded
            elif time_limit < 30:
                logger.warning(
                    f"Warning: {self.__class__.__name__} does not support early stopping "
                    f"and may exceed the remaining time_limit of {time_limit:.1f}s"
                )
        super()._fit(train_data=train_data, time_limit=time_limit, verbosity=verbosity, **kwargs)
        # seasonal_period is called season_length in StatsForecast
        self._local_model_args["season_length"] = self._local_model_args.pop("seasonal_period")
        return self

    def _to_statsforecast_dataframe(self, data: TimeSeriesDataFrame) -> pd.DataFrame:
        target = data[[self.target]]
        return target.reset_index().rename({ITEMID: "unique_id", TIMESTAMP: "ds", self.target: "y"}, axis=1)

    def _fit_and_cache_predictions(self, data: TimeSeriesDataFrame, **kwargs):
        """Make predictions for time series in data that are not cached yet."""
        # TODO: Improve prediction caching logic -> save predictions to a separate file, like in Tabular?
        from statsforecast import StatsForecast
        from statsforecast.models import SeasonalNaive

        data_hash = hash_ts_dataframe_items(data)
        items_to_fit = [item_id for item_id, ts_hash in data_hash.items() if ts_hash not in self._cached_predictions]
        if len(items_to_fit) > 0:
            logger.debug(f"{self.name} received {len(items_to_fit)} new items to predict, generating predictions")
            data_to_fit = pd.DataFrame(data).query("item_id in @items_to_fit")

            model_type = self.get_model_type()
            model = model_type(**self._local_model_args)

            sf = StatsForecast(
                models=[model],
                fallback_model=SeasonalNaive(season_length=self._local_model_args["season_length"]),
                sort_df=False,
                freq=self.freq,
                n_jobs=self.n_jobs,
            )

            # StatsForecast generates probabilistic forecasts in lo/hi confidence region boundaries
            # We chose the columns that correspond to the desired quantile_levels
            model_name = str(model)
            new_column_names = {"unique_id": ITEMID, "ds": TIMESTAMP, model_name: "mean"}
            levels = []
            for q in self.quantile_levels:
                level = round(abs(q - 0.5) * 200, 1)
                suffix = "lo" if q < 0.5 else "hi"
                levels.append(level)
                new_column_names[f"{model_name}-{suffix}-{level}"] = str(q)
            levels = sorted(list(set(levels)))
            chosen_columns = list(new_column_names.values())

            with statsmodels_warning_filter():
                raw_predictions = sf.forecast(
                    df=self._to_statsforecast_dataframe(data_to_fit),
                    h=self.prediction_length,
                    level=levels,
                ).reset_index()
            predictions = raw_predictions.rename(new_column_names, axis=1)[chosen_columns].set_index(TIMESTAMP)
            item_ids = predictions.pop(ITEMID)

            for item_id, preds in predictions.groupby(item_ids, sort=False):
                self._cached_predictions[data_hash.loc[item_id]] = preds
            # Make sure cached predictions can be reused by other models
            self.save()

    def hyperparameter_tune(self, **kwargs):
        # FIXME: multiprocessing.pool.ApplyResult.get() hangs inside StatsForecast.forecast if HPO enabled - needs investigation
        if self.n_jobs != 1:
            raise NotImplementedError(f"{self.__class__.__name__} does not support hyperparameter tuning.")


[docs]class AutoARIMAModel(AbstractStatsForecastModel): """Automatically tuned ARIMA model. Automatically selects the best (p,d,q,P,D,Q) model parameters using an information criterion Based on `statsforecast.models.AutoARIMA <https://nixtla.github.io/statsforecast/models.html#autoarima>`_. Other Parameters ---------------- d : int, optional Order of first differencing. If None, will be determined automatically using a statistical test. D : int, optional Order of seasonal differencing. If None, will be determined automatically using a statistical test. max_p : int, default = 5 Maximum number of autoregressive terms. max_q : int, default = 5 Maximum order of moving average. max_P : int, default = 2 Maximum number of seasonal autoregressive terms. max_Q : int, default = 2 Maximum order of seasonal moving average. max_d : int, default = 2 Maximum order of first differencing. max_D : int, default = 1 Maximum order of seasonal differencing. start_p : int, default = 2 Starting value of p in stepwise procedure. start_q : int, default = 2 Starting value of q in stepwise procedure. start_P : int, default = 1 Starting value of P in stepwise procedure. start_Q : int, default = 1 Starting value of Q in stepwise procedure. stationary : bool, default = False Restrict search to stationary models. seasonal : bool, default = True Whether to consider seasonal models. approximation : bool, default = True Approximate optimization for faster convergence. allowdrift : bool, default = False If True, drift term is allowed. allowmean : bool, default = True If True, non-zero mean is allowed. seasonal_period : int or None, default = None Number of time steps in a complete seasonal cycle for seasonal models. For example, 7 for daily data with a weekly cycle or 12 for monthly data with an annual cycle. When set to None, seasonal_period will be inferred from the frequency of the training data. Can also be specified manually by providing an integer > 1. If seasonal_period (inferred or provided) is equal to 1, seasonality will be disabled. n_jobs : int or float, default = -1 Number of CPU cores used to fit the models in parallel. When set to a float between 0.0 and 1.0, that fraction of available CPU cores is used. When set to a positive integer, that many cores are used. When set to -1, all CPU cores are used. """ allowed_local_model_args = [ "d", "D", "max_p", "max_q", "max_P", "max_Q", "max_d", "max_D", "start_p", "start_q", "start_P", "start_Q", "stationary", "seasonal", "approximatio", "allowdrift", "allowmean", "seasonal_period", ] def _update_local_model_args(self, local_model_args: dict, data: TimeSeriesDataFrame) -> dict: local_model_args.setdefault("approximation", True) local_model_args.setdefault("allowmean", True) return local_model_args def get_model_type(self): from statsforecast.models import AutoARIMA as AutoARIMA_ return AutoARIMA_
[docs]class AutoETSModel(AbstractStatsForecastModel): """Automatically tuned exponential smoothing with trend and seasonality. Automatically selects the best ETS (Error, Trend, Seasonality) model using an information criterion Based on `statsforecast.models.AutoETS <https://nixtla.github.io/statsforecast/models.html#autoets>`_. Other Parameters ---------------- model : str, default = "ZZZ" Model string describing the configuration of the E (error), T (trend) and S (seasonal) model components. Each component can be one of "M" (multiplicative), "A" (additive), "N" (omitted). For example when model="ANN" (additive error, no trend, and no seasonality), ETS will explore only a simple exponential smoothing. seasonal_period : int or None, default = None Number of time steps in a complete seasonal cycle for seasonal models. For example, 7 for daily data with a weekly cycle or 12 for monthly data with an annual cycle. When set to None, seasonal_period will be inferred from the frequency of the training data. Can also be specified manually by providing an integer > 1. If seasonal_period (inferred or provided) is equal to 1, seasonality will be disabled. n_jobs : int or float, default = -1 Number of CPU cores used to fit the models in parallel. When set to a float between 0.0 and 1.0, that fraction of available CPU cores is used. When set to a positive integer, that many cores are used. When set to -1, all CPU cores are used. """ allowed_local_model_args = [ "model", "seasonal_period", ] def get_model_type(self): from statsforecast.models import AutoETS as AutoETS_ return AutoETS_
[docs]class DynamicOptimizedThetaModel(AbstractStatsForecastModel): """Optimized Theta forecasting model from Fiorucci et al. (2016). Based on `statsforecast.models.DynamicOptimizedTheta <https://nixtla.github.io/statsforecast/models.html#dynamic-optimized-theta-method>`_. References ---------- Fiorucci, Jose et al. "Models for optimising the theta method and their relationship to state space models." International journal of forecasting 32.4 (2016): 1151-1161. Other Parameters ---------------- decomposition_type : {"multiplicative", "additive"}, default = "multiplicative" Seasonal decomposition type. seasonal_period : int or None, default = None Number of time steps in a complete seasonal cycle for seasonal models. For example, 7 for daily data with a weekly cycle or 12 for monthly data with an annual cycle. When set to None, seasonal_period will be inferred from the frequency of the training data. Can also be specified manually by providing an integer > 1. If seasonal_period (inferred or provided) is equal to 1, seasonality will be disabled. n_jobs : int or float, default = 0.5 Number of CPU cores used to fit the models in parallel. When set to a float between 0.0 and 1.0, that fraction of available CPU cores is used. When set to a positive integer, that many cores are used. When set to -1, all CPU cores are used. """ allowed_local_model_args = [ "decomposition_type", "seasonal_period", ] def get_model_type(self): from statsforecast.models import DynamicOptimizedTheta return DynamicOptimizedTheta