Source code for autogluon.text.text_prediction.text_prediction
import logging
import copy
import warnings
import os
from packaging import version
import numpy as np
import pandas as pd
import mxnet
from mxnet.util import use_np
from autogluon_contrib_nlp.utils.registry import Registry
from autogluon_contrib_nlp.utils.misc import logging_config
from . import constants as _C
from .dataset import random_split_train_val, TabularDataset, infer_problem_type,\
get_column_properties
from .models.basic_v1 import BertForTextPredictionBasic
from autogluon.core.task.base import BaseTask
from autogluon.core import space
from autogluon.core.utils import in_ipynb
from autogluon.core.utils.loaders import load_pd
from autogluon.core.utils.utils import get_cpu_count, get_gpu_count, default_holdout_frac
from autogluon.core.utils.miscs import verbosity2loglevel
__all__ = ['TextPrediction', 'ag_text_prediction_params']
logger = logging.getLogger(__name__) # return root logger
ag_text_prediction_params = Registry('ag_text_prediction_params')
@ag_text_prediction_params.register()
def default() -> dict:
"""The default hyperparameters.
It will have a version key and a list of candidate models.
Each model has its own search space inside.
"""
ret = {
'version': 1,
'models': {
'BertForTextPredictionBasic': {
'search_space': {
'model.backbone.name': 'google_electra_small',
'optimization.batch_size': 32,
'optimization.per_device_batch_size': 16,
'optimization.num_train_epochs': 4,
'optimization.lr': space.Real(1E-5, 1E-4, default=5E-5),
'optimization.layerwise_lr_decay': 0.8
}
},
},
'hpo_params': {
'search_strategy': 'local_sequential_auto', # Can be 'random', 'bayesopt', 'skopt',
# 'hyperband', 'bayesopt_hyperband', local_sequential_auto
'search_options': None, # Extra kwargs passed to searcher
'scheduler_options': None, # Extra kwargs passed to scheduler
'time_limits': None, # The total time limit
'num_trials': 3, # The number of trials
},
'seed': None, # The seed value
}
return ret
@ag_text_prediction_params.register()
def default_no_hpo() -> dict:
"""The default hyperparameters without HPO"""
cfg = default()
cfg['hpo_params']['num_trials'] = 1
return cfg
@ag_text_prediction_params.register()
def default_electra_small_no_hpo() -> dict:
"""The default search space that uses ELECTRA Small as the backbone."""
cfg = default_no_hpo()
cfg['models']['BertForTextPredictionBasic']['search_space']['model.backbone.name'] \
= 'google_electra_small'
cfg['models']['BertForTextPredictionBasic']['search_space'][
'optimization.per_device_batch_size'] = 16
return cfg
@ag_text_prediction_params.register()
def default_electra_base_no_hpo() -> dict:
"""The default search space that uses ELECTRA Base as the backbone"""
cfg = default_no_hpo()
cfg['models']['BertForTextPredictionBasic']['search_space']['model.backbone.name'] \
= 'google_electra_base'
cfg['models']['BertForTextPredictionBasic']['search_space'][
'optimization.per_device_batch_size'] = 8
return cfg
@ag_text_prediction_params.register()
def default_electra_large_no_hpo() -> dict:
"""The default search space that uses ELECTRA Base as the backbone"""
cfg = default_no_hpo()
cfg['models']['BertForTextPredictionBasic']['search_space']['model.backbone.name'] \
= 'google_electra_large'
cfg['models']['BertForTextPredictionBasic']['search_space'][
'optimization.per_device_batch_size'] = 4
return cfg
def merge_params(base_params, partial_params=None):
"""Merge a partial change to the base configuration.
Parameters
----------
base_params
The base parameters
partial_params
The partial parameters
Returns
-------
final_params
The final parameters
"""
if partial_params is None:
return base_params
elif base_params is None:
return partial_params
else:
if not isinstance(partial_params, dict):
return partial_params
assert isinstance(base_params, dict)
final_params = copy.deepcopy(base_params)
for key in partial_params:
if key in base_params:
final_params[key] = merge_params(base_params[key], partial_params[key])
else:
final_params[key] = partial_params[key]
return final_params
def get_recommended_resource(nthreads_per_trial=None,
ngpus_per_trial=None) -> dict:
"""Get the recommended resources.
Internally, we will try to use GPU whenever it's possible. That means, we will use
a single GPU for finetuning.
Parameters
----------
nthreads_per_trial
The number of threads per trial provided by the user.
ngpus_per_trial
The number of GPUs per trial provided by the user.
Returns
-------
resource
The recommended resource.
"""
if nthreads_per_trial is None and ngpus_per_trial is None:
nthreads_per_trial = get_cpu_count()
ngpus_per_trial = get_gpu_count()
elif nthreads_per_trial is not None and ngpus_per_trial is None:
ngpus_per_trial = get_gpu_count()
elif nthreads_per_trial is None and ngpus_per_trial is not None:
if ngpus_per_trial != 0:
num_parallel_jobs = get_gpu_count() // ngpus_per_trial
nthreads_per_trial = max(get_cpu_count() // num_parallel_jobs, 1)
else:
nthreads_per_trial = min(get_cpu_count(), 4)
nthreads_per_trial = min(nthreads_per_trial, get_cpu_count())
ngpus_per_trial = min(ngpus_per_trial, get_gpu_count())
assert nthreads_per_trial > 0 and ngpus_per_trial >= 0,\
'Invalid number of threads and number of GPUs.'
return {'num_cpus': nthreads_per_trial, 'num_gpus': ngpus_per_trial}
def infer_eval_stop_log_metrics(problem_type,
label_shape,
eval_metric=None,
stopping_metric=None):
"""Decide default evaluation, stopping, and logging metrics (based on type of prediction problem).
Parameters
----------
problem_type
Type of the problem
label_shape
Shape of the label
eval_metric
The eval metric provided by the user
stopping_metric
The stopping metric provided by the user
Returns
-------
eval_metric
The updated evaluation metric
stopping_metric
The updated stopping metric
log_metrics
The updated logging metric
"""
if eval_metric is not None and stopping_metric is None:
stopping_metric = eval_metric
if isinstance(eval_metric, list):
stopping_metric = eval_metric[0]
if problem_type == _C.CLASSIFICATION:
if stopping_metric is None:
stopping_metric = 'acc'
if eval_metric is None:
eval_metric = 'acc'
if label_shape == 2:
log_metrics = ['f1', 'mcc', 'roc_auc', 'acc', 'log_loss']
else:
log_metrics = ['acc', 'log_loss']
elif problem_type == _C.REGRESSION:
if stopping_metric is None:
stopping_metric = 'mse'
if eval_metric is None:
eval_metric = 'mse'
log_metrics = ['mse', 'rmse', 'mae']
else:
raise NotImplementedError('The problem type is not supported yet!')
for other_log_metric in [stopping_metric, eval_metric]:
if isinstance(other_log_metric, str) and other_log_metric not in log_metrics:
log_metrics.append(other_log_metric)
else:
if isinstance(other_log_metric, list):
for ele in other_log_metric:
if ele not in log_metrics:
log_metrics.append(ele)
return eval_metric, stopping_metric, log_metrics
[docs]@use_np
class TextPrediction(BaseTask):
"""AutoGluon Task for classification/regression with text data."""
[docs] @classmethod
def fit(cls, train_data,
label,
tuning_data=None,
time_limits=None,
output_directory='./ag_text',
feature_columns=None,
holdout_frac=None,
eval_metric=None,
stopping_metric=None,
nthreads_per_trial=None,
ngpus_per_trial=None,
dist_ip_addrs=None,
num_trials=None,
search_strategy=None,
search_options=None,
scheduler_options=None,
hyperparameters=None,
plot_results=None,
seed=None,
visualizer=None,
verbosity=2):
"""Fit models to make predictions based on text inputs.
Parameters
----------
train_data : :class:`autogluon.tabular.TabularDataset` or :class:`pd.DataFrame`
Training dataset where rows = individual training examples, columns = features.
label : str
Name of the label column. It can be a stringBy default, we will search for a column named
tuning_data : :class:`autogluon.tabular.TabularDataset` or :class:`pd.DataFrame`, default = None
Another dataset containing validation data reserved for hyperparameter tuning (in same format as training data).
If `tuning_data = None`, `fit()` will automatically hold out random examples from `train_data` for validation.
time_limits : int or str, default = None
Approximately how long `fit()` should run for (wallclock time in seconds if int).
String values may instead be used to specify time in different units such as: '1min' or '1hour'.
Longer `time_limits` will usually improve predictive accuracy.
If not specified, `fit()` will run until all models to try by default have completed training.
output_directory : str, default = './ag_text'
Path to directory where models and intermediate outputs should be saved.
feature_columns : List[str], default = None
Which columns of table to consider as predictive features (other columns will be ignored, except for label-column).
If None (by default), all columns of table are considered predictive features.
holdout_frac : float, default = None
Fraction of train_data to holdout as tuning data for optimizing hyperparameters (ignored unless `tuning_data = None`).
If None, default value is selected based on the number of training examples.
eval_metric : str, default = None
The evaluation metric that will be used to evaluate the model's predictive performance.
If None, an appropriate default metric will be selected (accuracy for classification, mean-squared-error for regression).
Options for classification include: 'acc' (accuracy), 'nll' (negative log-likelihood).
Additional options for binary classification include: 'f1' (F1 score), 'mcc' (Matthews coefficient), 'auc' (area under ROC curve).
Options for regression include: 'mse' (mean squared error), 'rmse' (root mean squared error), 'mae' (mean absolute error).
stopping_metric, default = None
Metric which iteratively-trained models use to early stop to avoid overfitting.
Defaults to `eval_metric` value (if None).
Options are identical to options for `eval_metric`.
nthreads_per_trial, default = None
The number of threads per individual model training run. By default, all available CPUs are used.
ngpus_per_trial, default = None
The number of GPUs to use per individual model training run. If unspecified, a default value is chosen based on total number of GPUs available.
dist_ip_addrs, default = None
List of IP addresses corresponding to remote workers, in order to leverage distributed computation.
num_trials : , default = None
The number of trials in the HPO search
search_strategy : str, default = None
Which hyperparameter search algorithm to use. Options include:
'random' (random search), 'bayesopt' (Gaussian process Bayesian optimization),
'skopt' (SKopt Bayesian optimization), 'grid' (grid search),
'hyperband' (Hyperband scheduling with random search), 'bayesopt-hyperband'
(Hyperband scheduling with GP-BO search), 'local_sequential_auto' (sequential local search)
If unspecified, the default is 'local_sequential_auto'.
search_options : dict, default = None
Options passed to searcher.
scheduler_options : dict, default = None
Additional kwargs passed to scheduler __init__.
hyperparameters : dict, default = None
Determines the hyperparameters used by the models. Each hyperparameter may be either fixed value or search space of many values.
For example of default hyperparameters, see: `autogluon.task.text_prediction.text_prediction.default()`
plot_results : bool, default = None
Whether or not to plot intermediate training results during `fit()`.
seed : int, default = None
Seed value for random state used inside `fit()`.
visualizer : str, default = None
How to visualize the neural network training progress during `fit()`. Options: ['mxboard', 'tensorboard', None].
verbosity : int, default = 2
Verbosity levels range from 0 to 4 and control how much information is printed
during fit().
Higher levels correspond to more detailed print statements
(you can set verbosity = 0 to suppress warnings).
If using logging, you can alternatively control amount of information printed
via `logger.setLevel(L)`,
where `L` ranges from 0 to 50 (Note: higher values of `L` correspond to fewer print
statements, opposite of verbosity levels)
Returns
-------
model
A `BertForTextPredictionBasic` object that can be used for making predictions on new data.
"""
assert dist_ip_addrs is None, 'Training on remote machine is currently not supported.'
# Version check of MXNet
if version.parse(mxnet.__version__) < version.parse('1.7.0') \
or version.parse(mxnet.__version__) >= version.parse('2.0.0'):
raise ImportError('You will need to ensure that you have mxnet>=1.7.0, <2.0.0. '
'For more information about how to install mxnet, you can refer to '
'https://sxjscience.github.io/KDD2020/ .')
if verbosity < 0:
verbosity = 0
elif verbosity > 4:
verbosity = 4
console_log = verbosity >= 2
logging_config(folder=output_directory, name='ag_text_prediction',
logger=logger, level=verbosity2loglevel(verbosity),
console=console_log)
# Parse the hyper-parameters
if hyperparameters is None:
hyperparameters = ag_text_prediction_params.create('default')
elif isinstance(hyperparameters, str):
hyperparameters = ag_text_prediction_params.create(hyperparameters)
else:
base_params = ag_text_prediction_params.create('default')
hyperparameters = merge_params(base_params, hyperparameters)
if seed is not None:
hyperparameters['seed'] = seed
np.random.seed(hyperparameters['seed'])
if not isinstance(train_data, pd.DataFrame):
train_data = load_pd.load(train_data)
# Inference the label
if not isinstance(label, list):
label = [label]
label_columns = []
for ele in label:
if isinstance(ele, int):
label_columns.append(train_data.columns[ele])
else:
label_columns.append(ele)
if feature_columns is None:
all_columns = list(train_data.columns)
feature_columns = [ele for ele in all_columns if ele not in label_columns]
else:
if isinstance(feature_columns, str):
feature_columns = [feature_columns]
for col in feature_columns:
assert col not in label_columns, 'Feature columns and label columns cannot overlap.'
assert col in train_data.columns,\
'Feature columns must be in the pandas dataframe! Received col = "{}", ' \
'all columns = "{}"'.format(col, train_data.columns)
all_columns = feature_columns + label_columns
all_columns = [ele for ele in train_data.columns if ele in all_columns]
if tuning_data is None:
if holdout_frac is None:
holdout_frac = default_holdout_frac(len(train_data), True)
train_data, tuning_data = random_split_train_val(train_data,
valid_ratio=holdout_frac)
else:
if not isinstance(tuning_data, pd.DataFrame):
tuning_data = load_pd.load(tuning_data)
train_data = train_data[all_columns]
tuning_data = tuning_data[all_columns]
column_properties = get_column_properties(
pd.concat([train_data, tuning_data]),
metadata=None,
label_columns=label_columns,
provided_column_properties=None,
categorical_default_handle_missing_value=True)
has_text_column = False
for k, v in column_properties.items():
if v.type == _C.TEXT:
has_text_column = True
break
if not has_text_column:
raise AssertionError('No Text Column is found! This is currently not supported by '
'the TextPrediction task. You may try to use '
'autogluon.tabular.TabularPredictor.\n'
'The inferred column properties of the training data is {}'
.format(train_data))
train_data = TabularDataset(train_data,
column_properties=column_properties,
label_columns=label_columns)
tuning_data = TabularDataset(tuning_data,
column_properties=train_data.column_properties,
label_columns=label_columns)
logger.info('Train Dataset:')
logger.info(train_data)
logger.info('Tuning Dataset:')
logger.info(tuning_data)
logger.debug('Hyperparameters:')
logger.debug(hyperparameters)
problem_types = []
label_shapes = []
for label_col_name in label_columns:
problem_type, label_shape = infer_problem_type(column_properties=column_properties,
label_col_name=label_col_name)
problem_types.append(problem_type)
label_shapes.append(label_shape)
logging.info('Label columns={}, Feature columns={}, Problem types={}, Label shapes={}'
.format(label_columns, feature_columns,
problem_types, label_shapes))
eval_metric, stopping_metric, log_metrics =\
infer_eval_stop_log_metrics(problem_types[0],
label_shapes[0],
eval_metric=eval_metric,
stopping_metric=stopping_metric)
logging.info('Eval Metric={}, Stop Metric={}, Log Metrics={}'.format(eval_metric,
stopping_metric,
log_metrics))
model_candidates = []
for model_type, kwargs in hyperparameters['models'].items():
search_space = kwargs['search_space']
if model_type == 'BertForTextPredictionBasic':
model = BertForTextPredictionBasic(column_properties=column_properties,
label_columns=label_columns,
feature_columns=feature_columns,
label_shapes=label_shapes,
problem_types=problem_types,
stopping_metric=stopping_metric,
log_metrics=log_metrics,
base_config=None,
search_space=search_space,
output_directory=output_directory,
logger=logger)
model_candidates.append(model)
else:
raise ValueError('model_type = "{}" is not supported. You can try to use '
'model_type = "BertForTextPredictionBasic"'.format(model_type))
assert len(model_candidates) == 1, 'Only one model is supported currently'
recommended_resource = get_recommended_resource(nthreads_per_trial=nthreads_per_trial,
ngpus_per_trial=ngpus_per_trial)
if search_strategy is None:
search_strategy = hyperparameters['hpo_params']['search_strategy']
if time_limits is None:
time_limits = hyperparameters['hpo_params']['time_limits']
else:
if isinstance(time_limits, str):
if time_limits.endswith('min'):
time_limits = int(float(time_limits[:-3]) * 60)
elif time_limits.endswith('hour'):
time_limits = int(float(time_limits[:-4]) * 60 * 60)
else:
raise ValueError('The given time_limits="{}" cannot be parsed!'
.format(time_limits))
if num_trials is None:
num_trials = hyperparameters['hpo_params']['num_trials']
if scheduler_options is None:
scheduler_options = hyperparameters['hpo_params']['scheduler_options']
if scheduler_options is None:
scheduler_options = dict()
scheduler_options['visualizer'] = visualizer
if search_strategy.endswith('hyperband'):
# Specific defaults for hyperband scheduling
scheduler_options['reduction_factor'] = scheduler_options.get(
'reduction_factor', 4)
scheduler_options['grace_period'] = scheduler_options.get(
'grace_period', 10)
scheduler_options['max_t'] = scheduler_options.get(
'max_t', 50)
if recommended_resource['num_gpus'] == 0:
if 'AUTOGLUON_TEXT_TRAIN_WITHOUT_GPU' in os.environ:
use_warning = int(os.environ['AUTOGLUON_TEXT_TRAIN_WITHOUT_GPU'])
else:
use_warning = False
if use_warning:
warnings.warn('No GPU is detected in the machine and we will recommend you to '
'use TextPrediction on a GPU-enabled instance. Currently, '
'training on CPU is slow.')
else:
raise RuntimeError('No GPU is detected in the machine and we will '
'not proceed to run TexPrediction because they will train '
'too slowly with only CPU. You may try to set `ngpus_per_trial` '
'to a number larger than 0 when calling `.fit()`. '
'Also, you can set the environment variable '
'"AUTOGLUON_TEXT_TRAIN_WITHOUT_GPU=1" to force the model to '
'use CPU for training.')
model = model_candidates[0]
if plot_results is None:
if in_ipynb():
plot_results = True
else:
plot_results = False
model.train(train_data=train_data,
tuning_data=tuning_data,
resource=recommended_resource,
time_limits=time_limits,
search_strategy=search_strategy,
search_options=search_options,
scheduler_options=scheduler_options,
num_trials=num_trials,
plot_results=plot_results,
console_log=verbosity >= 2,
ignore_warning=verbosity <= 2)
return model
[docs] @staticmethod
def load(dir_path):
"""Load a model object previously produced by `fit()` from disk and return this object.
It is highly recommended the model be loaded with the exact AutoGluon version it was previously fit with.
Parameters
----------
dir_path : str
Path to directory where this model was previously saved (i.e. `output_directory` specified in previous call to `fit`).
Returns
-------
model
A `BertForTextPredictionBasic` object that can be used for making predictions on new data.
"""
return BertForTextPredictionBasic.load(dir_path)