Source code for autogluon.timeseries.models.chronos.model

import logging
import os
from typing import Any, Dict, Literal, Optional, Union

import numpy as np
import pandas as pd

from autogluon.common.loaders import load_pkl
from autogluon.timeseries.dataset.ts_dataframe import TimeSeriesDataFrame
from autogluon.timeseries.models.abstract import AbstractTimeSeriesModel
from autogluon.timeseries.utils.forecast import get_forecast_horizon_index_ts_dataframe
from autogluon.timeseries.utils.warning_filters import warning_filter

logger = logging.getLogger(__name__)


# allowed HuggingFace model paths with custom parameter definitions
MODEL_CONFIGS = {
    "amazon/chronos-t5-tiny": {
        "num_gpus": 0,  # minimum number of required GPUs
        "default_torch_dtype": "auto",
        "default_batch_size": 16,
    },
    "amazon/chronos-t5-mini": {
        "num_gpus": 0,
        "default_torch_dtype": "auto",
        "default_batch_size": 16,
    },
    "amazon/chronos-t5-small": {
        "num_gpus": 1,
        "default_torch_dtype": "bfloat16",
        "default_batch_size": 16,
    },
    "amazon/chronos-t5-base": {
        "num_gpus": 1,
        "default_torch_dtype": "bfloat16",
        "default_batch_size": 16,
    },
    "amazon/chronos-t5-large": {
        "num_gpus": 1,
        "default_torch_dtype": "bfloat16",
        "default_batch_size": 8,
    },
}


MODEL_ALIASES = {
    "tiny": "amazon/chronos-t5-tiny",
    "mini": "amazon/chronos-t5-mini",
    "small": "amazon/chronos-t5-small",
    "base": "amazon/chronos-t5-base",
    "large": "amazon/chronos-t5-large",
}


[docs] class ChronosModel(AbstractTimeSeriesModel): """Chronos pretrained time series forecasting models, based on the original `ChronosModel <https://github.com/amazon-science/chronos-forecasting>`_ implementation. Chronos is family of pretrained models, based on the T5 family, with number of parameters ranging between 8M and 710M. The full collection of Chronos models is available on `Hugging Face <https://huggingface.co/collections/amazon/chronos-models-65f1791d630a8d57cb718444>`_. For Chronos small, base, and large variants a GPU is required to perform inference efficiently. Chronos takes a minimalistic approach to pretraining time series models, by discretizing time series data directly into bins which are treated as tokens, effectively performing regression by classification. This results in a simple and flexible framework for using any language model in the context of time series forecasting. See [Ansari2024]_ for more information. References ---------- .. [Ansari2024] Ansari, Abdul Fatir, Stella, Lorenzo et al. "Chronos: Learning the Language of Time Series." http://arxiv.org/abs/2403.07815 Other Parameters ---------------- model_path: str, default = "amazon/chronos-t5-small" Model path used for the model, i.e., a HuggingFace transformers ``name_or_path``. Can be a compatible model name on HuggingFace Hub or a local path to a model directory. Original Chronos models (i.e., ``amazon/chronos-t5-{model_size}``) can be specified with aliases ``tiny``, ``mini`` , ``small``, ``base``, and ``large``. batch_size : int, default = 16 Size of batches used during inference num_samples : int, default = 20 Number of samples used during inference device : str, default = None Device to use for inference. If None, model will use the GPU if available. For larger model sizes `small`, `base`, and `large`; inference will fail if no GPU is available. context_length : int or None, default = None The context length to use in the model. Shorter context lengths will decrease model accuracy, but result in faster inference. If None, the model will infer context length from the data set length at inference time, but set it to a maximum of 512. optimization_strategy : {None, "onnx", "openvino"}, default = None Optimization strategy to use for inference on CPUs. If None, the model will use the default implementation. If `onnx`, the model will be converted to ONNX and the inference will be performed using ONNX. If ``openvino``, inference will be performed with the model compiled to OpenVINO. torch_dtype : torch.dtype or {"auto", "bfloat16", "float32", "float64"}, default = "auto" Torch data type for model weights, provided to ``from_pretrained`` method of Hugging Face AutoModels. If original Chronos models are specified and the model size is ``small``, ``base``, or ``large``, the ``torch_dtype`` will be set to ``bfloat16`` to enable inference on GPUs. data_loader_num_workers : int, default = 0 Number of worker processes to be used in the data loader. See documentation on ``torch.utils.data.DataLoader`` for more information. """ # default number of samples for prediction default_num_samples: int = 20 default_model_path = "amazon/chronos-t5-small" maximum_context_length = 512 def __init__( self, freq: Optional[str] = None, prediction_length: int = 1, path: Optional[str] = None, name: Optional[str] = None, eval_metric: str = None, hyperparameters: Dict[str, Any] = None, **kwargs, # noqa ): hyperparameters = hyperparameters if hyperparameters is not None else {} model_path_input = hyperparameters.get("model_path", self.default_model_path) self.model_path = MODEL_ALIASES.get(model_path_input, model_path_input) # TODO: automatically determine batch size based on GPU / memory availability self.batch_size = hyperparameters.get("batch_size", self.default_batch_size) self.num_samples = hyperparameters.get("num_samples", self.default_num_samples) self.device = hyperparameters.get("device") # if the model requires a GPU, set the torch dtype to bfloat16 self.torch_dtype = hyperparameters.get("torch_dtype", self.default_torch_dtype) self.data_loader_num_workers = hyperparameters.get("data_loader_num_workers", 0) self.optimization_strategy: Optional[Literal["onnx", "openvino"]] = hyperparameters.get( "optimization_strategy", None ) self.context_length = hyperparameters.get("context_length") if self.context_length is not None and self.context_length > self.maximum_context_length: logger.warning( f"\tContext length {self.context_length} exceeds maximum context length {self.maximum_context_length}." f"Context length will be set to {self.maximum_context_length}." ) self.context_length = self.maximum_context_length # we truncate the name to avoid long path errors on Windows model_path_safe = str(model_path_input).replace("/", "__").replace(os.path.sep, "__")[-50:] name = (name if name is not None else "Chronos") + f"[{model_path_safe}]" super().__init__( path=path, freq=freq, prediction_length=prediction_length, name=name, eval_metric=eval_metric, hyperparameters=hyperparameters, **kwargs, ) self.model_pipeline: Optional[Any] = None # of type OptimizedChronosPipeline self.time_limit: Optional[float] = None def save(self, path: str = None, verbose: bool = True) -> str: pipeline = self.model_pipeline self.model_pipeline = None path = super().save(path=path, verbose=verbose) self.model_pipeline = pipeline return str(path) @classmethod def load(cls, path: str, reset_paths: bool = True, verbose: bool = True) -> "ChronosModel": model = load_pkl.load(path=os.path.join(path, cls.model_file_name), verbose=verbose) if reset_paths: model.set_contexts(path) return model def _is_gpu_available(self) -> bool: import torch.cuda return torch.cuda.is_available() @property def ag_default_config(self) -> Dict[str, Any]: """The default configuration of the model used by AutoGluon if the model is one of those defined in MODEL_CONFIGS. For now, these are ``amazon/chronos-t5-*`` family of models. """ return MODEL_CONFIGS.get(self.model_path, {}) @property def min_num_gpus(self) -> int: """Minimum number of GPUs required for the model. For models not defined in AutoGluon, this value defaults to 0. """ return self.ag_default_config.get("num_gpus", 0) @property def default_batch_size(self) -> int: """Default batch size used for the model. For models not defined in AutoGluon, this value defaults to 8. """ return self.ag_default_config.get("default_batch_size", 8) @property def default_torch_dtype(self) -> Any: """Default torch data type used for the model. For models not defined in AutoGluon, this value defaults to "auto". """ return self.ag_default_config.get("default_torch_dtype", "auto") def get_minimum_resources(self, is_gpu_available: bool = False) -> Dict[str, Union[int, float]]: minimum_resources = {"num_cpus": 1} # if GPU is available, we train with 1 GPU per trial if is_gpu_available: minimum_resources["num_gpus"] = self.min_num_gpus return minimum_resources def load_model_pipeline(self, context_length: Optional[int] = None): from .pipeline import OptimizedChronosPipeline gpu_available = self._is_gpu_available() if not gpu_available and self.min_num_gpus > 0: raise RuntimeError( f"{self.name} requires a GPU to run, but no GPU was detected. " "Please make sure that you are using a computer with a CUDA-compatible GPU and " "`import torch; torch.cuda.is_available()` returns `True`." ) device = self.device or ("cuda" if gpu_available else "auto") pipeline = OptimizedChronosPipeline.from_pretrained( self.model_path, device_map=device, optimization_strategy=self.optimization_strategy, torch_dtype=self.torch_dtype, context_length=context_length or self.context_length, ) self.model_pipeline = pipeline def persist(self) -> "ChronosModel": self.load_model_pipeline(context_length=self.context_length or self.maximum_context_length) return self def _fit( self, train_data: TimeSeriesDataFrame, val_data: Optional[TimeSeriesDataFrame] = None, time_limit: int = None, **kwargs, ) -> None: self._check_fit_params() self.time_limit = time_limit def _get_inference_data_loader( self, data: TimeSeriesDataFrame, context_length: int, num_workers: int = 0, time_limit: Optional[float] = None, ): from .utils import ChronosInferenceDataLoader, ChronosInferenceDataset, timeout_callback chronos_dataset = ChronosInferenceDataset( target_df=data, target_column=self.target, context_length=context_length, ) return ChronosInferenceDataLoader( chronos_dataset, batch_size=self.batch_size, shuffle=False, num_workers=num_workers, on_batch=timeout_callback(seconds=time_limit), ) def _predict( self, data: TimeSeriesDataFrame, known_covariates: Optional[TimeSeriesDataFrame] = None, **kwargs, ) -> TimeSeriesDataFrame: # We defer initialization of the model pipeline. i.e., the model is only loaded to device memory # during inference. We also infer the maximum length of the time series in the inference data set # and use that to determine the context length of the model. If the context length is specified # during initialization, this is always used. If not, the context length is set to the longest # item length. The context length is always capped by self.maximum_context_length. context_length = self.context_length or min( data.num_timesteps_per_item().max(), self.maximum_context_length, ) with warning_filter(all_warnings=True): import torch if self.model_pipeline is None: # load model pipeline to device memory self.load_model_pipeline(context_length=context_length) inference_data_loader = self._get_inference_data_loader( data=data, num_workers=self.data_loader_num_workers, context_length=context_length, time_limit=kwargs.get("time_limit"), ) self.model_pipeline.model.eval() with torch.inference_mode(): prediction_samples = [ self.model_pipeline.predict( batch, prediction_length=self.prediction_length, num_samples=self.num_samples, limit_prediction_length=False, ) .detach() .cpu() .numpy() for batch in inference_data_loader ] samples = np.concatenate(prediction_samples, axis=0).swapaxes(1, 2).reshape(-1, self.num_samples) mean = samples.mean(axis=-1, keepdims=True) quantiles = np.quantile(samples, self.quantile_levels, axis=-1).T df = pd.DataFrame( np.concatenate([mean, quantiles], axis=1), columns=["mean"] + [str(q) for q in self.quantile_levels], index=get_forecast_horizon_index_ts_dataframe(data, self.prediction_length, freq=self.freq), ) return TimeSeriesDataFrame(df) def _more_tags(self) -> Dict: return {"allow_nan": True} def score_and_cache_oof( self, val_data: TimeSeriesDataFrame, store_val_score: bool = False, store_predict_time: bool = False, **predict_kwargs, ) -> None: # All computation happens during inference, so we provide the time_limit at prediction time # TODO: Once custom predict_kwargs is allowed, make sure that `time_limit` is not among the keys super().score_and_cache_oof( val_data, store_val_score, store_predict_time, time_limit=self.time_limit, **predict_kwargs )