Source code for autogluon.text.text_prediction.text_prediction

import logging
import copy
import warnings
import os
from packaging import version

import numpy as np
import pandas as pd
import mxnet
from mxnet.util import use_np
from autogluon_contrib_nlp.utils.registry import Registry
from autogluon_contrib_nlp.utils.misc import logging_config

from . import constants as _C
from .dataset import random_split_train_val, TabularDataset, infer_problem_type,\
from .models.basic_v1 import BertForTextPredictionBasic
from autogluon.core.task.base import BaseTask
from autogluon.core import space
from autogluon.core.utils import in_ipynb
from autogluon.core.utils.loaders import load_pd
from autogluon.core.utils.utils import get_cpu_count, get_gpu_count, default_holdout_frac
from autogluon.core.utils.miscs import verbosity2loglevel

__all__ = ['TextPrediction', 'ag_text_prediction_params']

logger = logging.getLogger(__name__)  # return root logger

ag_text_prediction_params = Registry('ag_text_prediction_params')

def default() -> dict:
    """The default hyperparameters.

    It will have a version key and a list of candidate models.
    Each model has its own search space inside.
    ret = {
        'version': 1,
        'models': {
            'BertForTextPredictionBasic': {
                'search_space': {
                    '': 'google_electra_small',
                    'optimization.batch_size': 32,
                    'optimization.per_device_batch_size': 16,
                    'optimization.num_train_epochs': 4,
                    '': space.Real(1E-5, 1E-4, default=5E-5),
                    'optimization.layerwise_lr_decay': 0.8
        'hpo_params': {
            'search_strategy': 'local_sequential_auto',   # Can be 'random', 'bayesopt', 'skopt',
                                           # 'hyperband', 'bayesopt_hyperband', local_sequential_auto
            'search_options': None,        # Extra kwargs passed to searcher
            'scheduler_options': None,     # Extra kwargs passed to scheduler
            'time_limits': None,           # The total time limit
            'num_trials': 3,               # The number of trials
        'seed': None,                      # The seed value
    return ret

def default_no_hpo() -> dict:
    """The default hyperparameters without HPO"""
    cfg = default()
    cfg['hpo_params']['num_trials'] = 1
    return cfg

def default_electra_small_no_hpo() -> dict:
    """The default search space that uses ELECTRA Small as the backbone."""
    cfg = default_no_hpo()
    cfg['models']['BertForTextPredictionBasic']['search_space'][''] \
        = 'google_electra_small'
        'optimization.per_device_batch_size'] = 16
    return cfg

def default_electra_base_no_hpo() -> dict:
    """The default search space that uses ELECTRA Base as the backbone"""
    cfg = default_no_hpo()
    cfg['models']['BertForTextPredictionBasic']['search_space'][''] \
        = 'google_electra_base'
        'optimization.per_device_batch_size'] = 8
    return cfg

def default_electra_large_no_hpo() -> dict:
    """The default search space that uses ELECTRA Base as the backbone"""
    cfg = default_no_hpo()
    cfg['models']['BertForTextPredictionBasic']['search_space'][''] \
        = 'google_electra_large'
        'optimization.per_device_batch_size'] = 4
    return cfg

def merge_params(base_params, partial_params=None):
    """Merge a partial change to the base configuration.

        The base parameters
        The partial parameters

        The final parameters
    if partial_params is None:
        return base_params
    elif base_params is None:
        return partial_params
        if not isinstance(partial_params, dict):
            return partial_params
        assert isinstance(base_params, dict)
        final_params = copy.deepcopy(base_params)
        for key in partial_params:
            if key in base_params:
                final_params[key] = merge_params(base_params[key], partial_params[key])
                final_params[key] = partial_params[key]
        return final_params

def get_recommended_resource(nthreads_per_trial=None,
                             ngpus_per_trial=None) -> dict:
    """Get the recommended resources.

    Internally, we will try to use GPU whenever it's possible. That means, we will use
    a single GPU for finetuning.

        The number of threads per trial provided by the user.
        The number of GPUs per trial provided by the user.

        The recommended resource.
    if nthreads_per_trial is None and ngpus_per_trial is None:
        nthreads_per_trial = get_cpu_count()
        ngpus_per_trial = get_gpu_count()
    elif nthreads_per_trial is not None and ngpus_per_trial is None:
        ngpus_per_trial = get_gpu_count()
    elif nthreads_per_trial is None and ngpus_per_trial is not None:
        if ngpus_per_trial != 0:
            num_parallel_jobs = get_gpu_count() // ngpus_per_trial
            nthreads_per_trial = max(get_cpu_count() // num_parallel_jobs, 1)
            nthreads_per_trial = min(get_cpu_count(), 4)
    nthreads_per_trial = min(nthreads_per_trial, get_cpu_count())
    ngpus_per_trial = min(ngpus_per_trial, get_gpu_count())
    assert nthreads_per_trial > 0 and ngpus_per_trial >= 0,\
        'Invalid number of threads and number of GPUs.'
    return {'num_cpus': nthreads_per_trial, 'num_gpus': ngpus_per_trial}

def infer_eval_stop_log_metrics(problem_type,
    """Decide default evaluation, stopping, and logging metrics (based on type of prediction problem).

        Type of the problem
        Shape of the label
        The eval metric provided by the user
        The stopping metric provided by the user

        The updated evaluation metric
        The updated stopping metric
        The updated logging metric
    if eval_metric is not None and stopping_metric is None:
        stopping_metric = eval_metric
        if isinstance(eval_metric, list):
            stopping_metric = eval_metric[0]
    if problem_type == _C.CLASSIFICATION:
        if stopping_metric is None:
            stopping_metric = 'acc'
        if eval_metric is None:
            eval_metric = 'acc'
        if label_shape == 2:
            log_metrics = ['f1', 'mcc', 'roc_auc', 'acc', 'log_loss']
            log_metrics = ['acc', 'log_loss']
    elif problem_type == _C.REGRESSION:
        if stopping_metric is None:
            stopping_metric = 'mse'
        if eval_metric is None:
            eval_metric = 'mse'
        log_metrics = ['mse', 'rmse', 'mae']
        raise NotImplementedError('The problem type is not supported yet!')
    for other_log_metric in [stopping_metric, eval_metric]:
        if isinstance(other_log_metric, str) and other_log_metric not in log_metrics:
            if isinstance(other_log_metric, list):
                for ele in other_log_metric:
                    if ele not in log_metrics:
    return eval_metric, stopping_metric, log_metrics

[docs]@use_np class TextPrediction(BaseTask): """AutoGluon Task for classification/regression with text data."""
[docs] @classmethod def fit(cls, train_data, label, tuning_data=None, time_limits=None, output_directory='./ag_text', feature_columns=None, holdout_frac=None, eval_metric=None, stopping_metric=None, nthreads_per_trial=None, ngpus_per_trial=None, dist_ip_addrs=None, num_trials=None, search_strategy=None, search_options=None, scheduler_options=None, hyperparameters=None, plot_results=None, seed=None, visualizer=None, verbosity=2): """Fit models to make predictions based on text inputs. Parameters ---------- train_data : :class:`autogluon.tabular.TabularDataset` or :class:`pd.DataFrame` Training dataset where rows = individual training examples, columns = features. label : str Name of the label column. It can be a stringBy default, we will search for a column named tuning_data : :class:`autogluon.tabular.TabularDataset` or :class:`pd.DataFrame`, default = None Another dataset containing validation data reserved for hyperparameter tuning (in same format as training data). If `tuning_data = None`, `fit()` will automatically hold out random examples from `train_data` for validation. time_limits : int or str, default = None Approximately how long `fit()` should run for (wallclock time in seconds if int). String values may instead be used to specify time in different units such as: '1min' or '1hour'. Longer `time_limits` will usually improve predictive accuracy. If not specified, `fit()` will run until all models to try by default have completed training. output_directory : str, default = './ag_text' Path to directory where models and intermediate outputs should be saved. feature_columns : List[str], default = None Which columns of table to consider as predictive features (other columns will be ignored, except for label-column). If None (by default), all columns of table are considered predictive features. holdout_frac : float, default = None Fraction of train_data to holdout as tuning data for optimizing hyperparameters (ignored unless `tuning_data = None`). If None, default value is selected based on the number of training examples. eval_metric : str, default = None The evaluation metric that will be used to evaluate the model's predictive performance. If None, an appropriate default metric will be selected (accuracy for classification, mean-squared-error for regression). Options for classification include: 'acc' (accuracy), 'nll' (negative log-likelihood). Additional options for binary classification include: 'f1' (F1 score), 'mcc' (Matthews coefficient), 'auc' (area under ROC curve). Options for regression include: 'mse' (mean squared error), 'rmse' (root mean squared error), 'mae' (mean absolute error). stopping_metric, default = None Metric which iteratively-trained models use to early stop to avoid overfitting. Defaults to `eval_metric` value (if None). Options are identical to options for `eval_metric`. nthreads_per_trial, default = None The number of threads per individual model training run. By default, all available CPUs are used. ngpus_per_trial, default = None The number of GPUs to use per individual model training run. If unspecified, a default value is chosen based on total number of GPUs available. dist_ip_addrs, default = None List of IP addresses corresponding to remote workers, in order to leverage distributed computation. num_trials : , default = None The number of trials in the HPO search search_strategy : str, default = None Which hyperparameter search algorithm to use. Options include: 'random' (random search), 'bayesopt' (Gaussian process Bayesian optimization), 'skopt' (SKopt Bayesian optimization), 'grid' (grid search), 'hyperband' (Hyperband scheduling with random search), 'bayesopt-hyperband' (Hyperband scheduling with GP-BO search), 'local_sequential_auto' (sequential local search) If unspecified, the default is 'local_sequential_auto'. search_options : dict, default = None Options passed to searcher. scheduler_options : dict, default = None Additional kwargs passed to scheduler __init__. hyperparameters : dict, default = None Determines the hyperparameters used by the models. Each hyperparameter may be either fixed value or search space of many values. For example of default hyperparameters, see: `autogluon.task.text_prediction.text_prediction.default()` plot_results : bool, default = None Whether or not to plot intermediate training results during `fit()`. seed : int, default = None Seed value for random state used inside `fit()`. visualizer : str, default = None How to visualize the neural network training progress during `fit()`. Options: ['mxboard', 'tensorboard', None]. verbosity : int, default = 2 Verbosity levels range from 0 to 4 and control how much information is printed during fit(). Higher levels correspond to more detailed print statements (you can set verbosity = 0 to suppress warnings). If using logging, you can alternatively control amount of information printed via `logger.setLevel(L)`, where `L` ranges from 0 to 50 (Note: higher values of `L` correspond to fewer print statements, opposite of verbosity levels) Returns ------- model A `BertForTextPredictionBasic` object that can be used for making predictions on new data. """ assert dist_ip_addrs is None, 'Training on remote machine is currently not supported.' # Version check of MXNet if version.parse(mxnet.__version__) < version.parse('1.7.0') \ or version.parse(mxnet.__version__) >= version.parse('2.0.0'): raise ImportError('You will need to ensure that you have mxnet>=1.7.0, <2.0.0. ' 'For more information about how to install mxnet, you can refer to ' ' .') if verbosity < 0: verbosity = 0 elif verbosity > 4: verbosity = 4 console_log = verbosity >= 2 logging_config(folder=output_directory, name='ag_text_prediction', logger=logger, level=verbosity2loglevel(verbosity), console=console_log) # Parse the hyper-parameters if hyperparameters is None: hyperparameters = ag_text_prediction_params.create('default') elif isinstance(hyperparameters, str): hyperparameters = ag_text_prediction_params.create(hyperparameters) else: base_params = ag_text_prediction_params.create('default') hyperparameters = merge_params(base_params, hyperparameters) if seed is not None: hyperparameters['seed'] = seed np.random.seed(hyperparameters['seed']) if not isinstance(train_data, pd.DataFrame): train_data = load_pd.load(train_data) # Inference the label if not isinstance(label, list): label = [label] label_columns = [] for ele in label: if isinstance(ele, int): label_columns.append(train_data.columns[ele]) else: label_columns.append(ele) if feature_columns is None: all_columns = list(train_data.columns) feature_columns = [ele for ele in all_columns if ele not in label_columns] else: if isinstance(feature_columns, str): feature_columns = [feature_columns] for col in feature_columns: assert col not in label_columns, 'Feature columns and label columns cannot overlap.' assert col in train_data.columns,\ 'Feature columns must be in the pandas dataframe! Received col = "{}", ' \ 'all columns = "{}"'.format(col, train_data.columns) all_columns = feature_columns + label_columns all_columns = [ele for ele in train_data.columns if ele in all_columns] if tuning_data is None: if holdout_frac is None: holdout_frac = default_holdout_frac(len(train_data), True) train_data, tuning_data = random_split_train_val(train_data, valid_ratio=holdout_frac) else: if not isinstance(tuning_data, pd.DataFrame): tuning_data = load_pd.load(tuning_data) train_data = train_data[all_columns] tuning_data = tuning_data[all_columns] column_properties = get_column_properties( pd.concat([train_data, tuning_data]), metadata=None, label_columns=label_columns, provided_column_properties=None, categorical_default_handle_missing_value=True) has_text_column = False for k, v in column_properties.items(): if v.type == _C.TEXT: has_text_column = True break if not has_text_column: raise AssertionError('No Text Column is found! This is currently not supported by ' 'the TextPrediction task. You may try to use ' 'autogluon.tabular.TabularPredictor.\n' 'The inferred column properties of the training data is {}' .format(train_data)) train_data = TabularDataset(train_data, column_properties=column_properties, label_columns=label_columns) tuning_data = TabularDataset(tuning_data, column_properties=train_data.column_properties, label_columns=label_columns)'Train Dataset:')'Tuning Dataset:') logger.debug('Hyperparameters:') logger.debug(hyperparameters) problem_types = [] label_shapes = [] for label_col_name in label_columns: problem_type, label_shape = infer_problem_type(column_properties=column_properties, label_col_name=label_col_name) problem_types.append(problem_type) label_shapes.append(label_shape)'Label columns={}, Feature columns={}, Problem types={}, Label shapes={}' .format(label_columns, feature_columns, problem_types, label_shapes)) eval_metric, stopping_metric, log_metrics =\ infer_eval_stop_log_metrics(problem_types[0], label_shapes[0], eval_metric=eval_metric, stopping_metric=stopping_metric)'Eval Metric={}, Stop Metric={}, Log Metrics={}'.format(eval_metric, stopping_metric, log_metrics)) model_candidates = [] for model_type, kwargs in hyperparameters['models'].items(): search_space = kwargs['search_space'] if model_type == 'BertForTextPredictionBasic': model = BertForTextPredictionBasic(column_properties=column_properties, label_columns=label_columns, feature_columns=feature_columns, label_shapes=label_shapes, problem_types=problem_types, stopping_metric=stopping_metric, log_metrics=log_metrics, base_config=None, search_space=search_space, output_directory=output_directory, logger=logger) model_candidates.append(model) else: raise ValueError('model_type = "{}" is not supported. You can try to use ' 'model_type = "BertForTextPredictionBasic"'.format(model_type)) assert len(model_candidates) == 1, 'Only one model is supported currently' recommended_resource = get_recommended_resource(nthreads_per_trial=nthreads_per_trial, ngpus_per_trial=ngpus_per_trial) if search_strategy is None: search_strategy = hyperparameters['hpo_params']['search_strategy'] if time_limits is None: time_limits = hyperparameters['hpo_params']['time_limits'] else: if isinstance(time_limits, str): if time_limits.endswith('min'): time_limits = int(float(time_limits[:-3]) * 60) elif time_limits.endswith('hour'): time_limits = int(float(time_limits[:-4]) * 60 * 60) else: raise ValueError('The given time_limits="{}" cannot be parsed!' .format(time_limits)) if num_trials is None: num_trials = hyperparameters['hpo_params']['num_trials'] if scheduler_options is None: scheduler_options = hyperparameters['hpo_params']['scheduler_options'] if scheduler_options is None: scheduler_options = dict() scheduler_options['visualizer'] = visualizer if search_strategy.endswith('hyperband'): # Specific defaults for hyperband scheduling scheduler_options['reduction_factor'] = scheduler_options.get( 'reduction_factor', 4) scheduler_options['grace_period'] = scheduler_options.get( 'grace_period', 10) scheduler_options['max_t'] = scheduler_options.get( 'max_t', 50) if recommended_resource['num_gpus'] == 0: if 'AUTOGLUON_TEXT_TRAIN_WITHOUT_GPU' in os.environ: use_warning = int(os.environ['AUTOGLUON_TEXT_TRAIN_WITHOUT_GPU']) else: use_warning = False if use_warning: warnings.warn('No GPU is detected in the machine and we will recommend you to ' 'use TextPrediction on a GPU-enabled instance. Currently, ' 'training on CPU is slow.') else: raise RuntimeError('No GPU is detected in the machine and we will ' 'not proceed to run TexPrediction because they will train ' 'too slowly with only CPU. You may try to set `ngpus_per_trial` ' 'to a number larger than 0 when calling `.fit()`. ' 'Also, you can set the environment variable ' '"AUTOGLUON_TEXT_TRAIN_WITHOUT_GPU=1" to force the model to ' 'use CPU for training.') model = model_candidates[0] if plot_results is None: if in_ipynb(): plot_results = True else: plot_results = False model.train(train_data=train_data, tuning_data=tuning_data, resource=recommended_resource, time_limits=time_limits, search_strategy=search_strategy, search_options=search_options, scheduler_options=scheduler_options, num_trials=num_trials, plot_results=plot_results, console_log=verbosity >= 2, ignore_warning=verbosity <= 2) return model
[docs] @staticmethod def load(dir_path): """Load a model object previously produced by `fit()` from disk and return this object. It is highly recommended the model be loaded with the exact AutoGluon version it was previously fit with. Parameters ---------- dir_path : str Path to directory where this model was previously saved (i.e. `output_directory` specified in previous call to `fit`). Returns ------- model A `BertForTextPredictionBasic` object that can be used for making predictions on new data. """ return BertForTextPredictionBasic.load(dir_path)