Source code for autogluon.text.text_prediction.models.basic_v1

import numpy as np
import os
import math
import logging
import pandas as pd
import warnings
import time
import json
import functools
import tqdm
import mxnet as mx
from mxnet.util import use_np
from mxnet.lr_scheduler import PolyScheduler, CosineScheduler
from mxnet.gluon.data import DataLoader
from autogluon_contrib_nlp.models import get_backbone
from autogluon_contrib_nlp.lr_scheduler import InverseSquareRootScheduler
from autogluon_contrib_nlp.utils.config import CfgNode
from autogluon_contrib_nlp.utils.misc import logging_config, grouper,\
    count_parameters, repeat, get_mxnet_available_ctx
from autogluon_contrib_nlp.utils.parameter import move_to_ctx, clip_grad_global_norm
from autogluon.core import args, space
from autogluon.core.task.base import compile_scheduler_options
from autogluon.core.task.base.base_task import schedulers
from autogluon.core.metrics import get_metric, Scorer
from autogluon.core.utils.multiprocessing_utils import force_forkserver

from .. import constants as _C
from ..column_property import get_column_property_metadata, get_column_properties_from_metadata
from ..preprocessing import TabularBasicBERTPreprocessor
from ..modules.basic_prediction import BERTForTabularBasicV1
from ..dataset import TabularDataset
from ... import version


@use_np
def get_optimizer(cfg, updates_per_epoch):
    max_update = int(updates_per_epoch * cfg.num_train_epochs)
    warmup_steps = int(updates_per_epoch * cfg.num_train_epochs * cfg.warmup_portion)
    if cfg.lr_scheduler == 'triangular':
        lr_scheduler = PolyScheduler(max_update=max_update,
                                     base_lr=cfg.lr,
                                     warmup_begin_lr=cfg.begin_lr,
                                     pwr=1,
                                     final_lr=cfg.final_lr,
                                     warmup_steps=warmup_steps,
                                     warmup_mode='linear')
    elif cfg.lr_scheduler == 'inv_sqrt':
        warmup_steps = int(updates_per_epoch * cfg.num_train_epochs
                           * cfg.warmup_portion)
        lr_scheduler = InverseSquareRootScheduler(warmup_steps=warmup_steps,
                                                  base_lr=cfg.lr,
                                                  warmup_init_lr=cfg.begin_lr)
    elif cfg.lr_scheduler == 'constant':
        lr_scheduler = None
    elif cfg.lr_scheduler == 'cosine':
        max_update = int(updates_per_epoch * cfg.num_train_epochs)
        warmup_steps = int(updates_per_epoch * cfg.num_train_epochs
                           * cfg.warmup_portion)
        lr_scheduler = CosineScheduler(max_update=max_update,
                                       base_lr=cfg.lr,
                                       final_lr=cfg.final_lr,
                                       warmup_steps=warmup_steps,
                                       warmup_begin_lr=cfg.begin_lr)
    else:
        raise ValueError('Unsupported lr_scheduler="{}"'
                         .format(cfg.lr_scheduler))
    optimizer_params = {'learning_rate': cfg.lr,
                        'wd': cfg.wd,
                        'lr_scheduler': lr_scheduler}
    optimizer = cfg.optimizer
    additional_params = {key: value for key, value in cfg.optimizer_params}
    optimizer_params.update(additional_params)
    return optimizer, optimizer_params, max_update


@use_np
def apply_layerwise_decay(model, layerwise_decay, backbone_name, not_included=None):
    """Apply the layer-wise gradient decay
    .. math::
        lr = lr * layerwise_decay^(max_depth - layer_depth)

    Parameters:
    ----------
    model
        qa_net
    layerwise_decay: int
        layer-wise decay power
    not_included: list of str
        A list or parameter names that not included in the layer-wise decay
    """
    if not_included is None:
        not_included = []
    # consider the task specific fine-tuning layer as the last layer, following with pooler
    # In addition, the embedding parameters have the smaller learning rate based on this setting.
    if 'electra' in backbone_name:
        all_layers = model.encoder.all_encoder_layers
    else:
        all_layers = model.encoder.all_layers
    max_depth = len(all_layers) + 2
    for key, value in model.collect_params().items():
        if 'scores' in key:
            value.lr_mult = layerwise_decay ** 0
        if 'pool' in key:
            value.lr_mult = layerwise_decay ** 1
        if 'embed' in key:
            value.lr_mult = layerwise_decay ** max_depth

    for (layer_depth, layer) in enumerate(all_layers):
        layer_params = layer.collect_params()
        for key, value in layer_params.items():
            for pn in not_included:
                if pn in key:
                    continue
            value.lr_mult = layerwise_decay**(max_depth - (layer_depth + 1))


def base_optimization_config():
    """The basic optimization phase"""
    cfg = CfgNode()
    cfg.lr_scheduler = 'triangular'
    cfg.optimizer = 'adamw'
    cfg.optimizer_params = [('beta1', 0.9),
                            ('beta2', 0.999),
                            ('epsilon', 1e-6),
                            ('correct_bias', False)]
    cfg.begin_lr = 0.0
    cfg.batch_size = 32
    cfg.model_average = 5
    cfg.per_device_batch_size = 16  # Per-device batch-size
    cfg.val_batch_size_mult = 2  # By default, we double the batch size for validation
    cfg.lr = 1E-4
    cfg.final_lr = 0.0
    cfg.num_train_epochs = 3
    cfg.warmup_portion = 0.1
    cfg.layerwise_lr_decay = 0.8  # The layer_wise decay
    cfg.wd = 0.01  # Weight Decay
    cfg.max_grad_norm = 1.0  # Maximum Gradient Norm
    # The validation frequency = validation frequency * num_updates_in_an_epoch
    cfg.valid_frequency = 0.1
    # Logging frequency = log frequency * num_updates_in_an_epoch
    cfg.log_frequency = 0.1
    return cfg


def base_model_config():
    cfg = CfgNode()
    cfg.preprocess = CfgNode()
    cfg.preprocess.merge_text = True
    cfg.preprocess.max_length = 128
    cfg.backbone = CfgNode()
    cfg.backbone.name = 'google_electra_base'
    cfg.network = BERTForTabularBasicV1.get_cfg()
    return cfg


def base_learning_config():
    cfg = CfgNode()
    cfg.early_stopping_patience = 10  # Stop if we cannot find a better checkpoint
    cfg.valid_ratio = 0.15      # The ratio of dataset to split for validation
    cfg.stop_metric = 'auto'    # Automatically define the stopping metric
    cfg.log_metrics = 'auto'    # Automatically determine the metrics used in logging
    return cfg


def base_misc_config():
    cfg = CfgNode()
    cfg.seed = 123
    cfg.exp_dir = './autonlp'
    return cfg


def base_cfg():
    cfg = CfgNode()
    cfg.version = 1
    cfg.optimization = base_optimization_config()
    cfg.learning = base_learning_config()
    cfg.model = base_model_config()
    cfg.misc = base_misc_config()
    cfg.freeze()
    return cfg


@use_np
def _classification_regression_predict(net, dataloader, problem_type,
                                       has_label=True, extract_embedding=False):
    """

    Parameters
    ----------
    net
        The network
    dataloader
        The dataloader
    problem_type
        Types of the labels
    has_label
        Whether label is used
    extract_embedding
        Whether to extract the embedding

    Returns
    -------
    predictions
        The predictions
    """
    predictions = []
    ctx_l = net.collect_params().list_ctx()
    for sample_l in grouper(dataloader, len(ctx_l)):
        iter_pred_l = []
        for sample, ctx in zip(sample_l, ctx_l):
            if sample is None:
                continue
            if has_label:
                batch_feature, batch_label = sample
            else:
                batch_feature = sample
            batch_feature = move_to_ctx(batch_feature, ctx)
            if extract_embedding:
                _, embeddings = net(batch_feature)
                iter_pred_l.append(embeddings)
            else:
                pred = net(batch_feature)
                if problem_type == _C.CLASSIFICATION:
                    pred = mx.npx.softmax(pred, axis=-1)
                iter_pred_l.append(pred)
        for pred in iter_pred_l:
            predictions.append(pred.asnumpy())
    predictions = np.concatenate(predictions, axis=0)
    return predictions


def calculate_metric(scorer, ground_truth, predictions, problem_type):
    if problem_type == _C.CLASSIFICATION and scorer.name == 'roc_auc':
        # For ROC_AUC, we need to feed in the probability of positive class to the scorer.
        return scorer._sign * scorer(ground_truth, predictions[:, 1])
    else:
        return scorer._sign * scorer(ground_truth, predictions)


@use_np
def train_function(args, reporter, train_df_path, tuning_df_path,
                   time_limits, time_start, base_config, problem_types,
                   column_properties, label_columns, label_shapes,
                   log_metrics, stopping_metric, console_log,
                   ignore_warning=False):
    if time_limits is not None:
        start_train_tick = time.time()
        time_left = time_limits - (start_train_tick - time_start)
        if time_left <= 0:
            reporter.terminate()
            return
    import os
    # Get the log metric scorers
    if isinstance(log_metrics, str):
        log_metrics = [log_metrics]
    # Load the training and tuning data from the parquet file
    train_data = pd.read_parquet(train_df_path)
    tuning_data = pd.read_parquet(tuning_df_path)
    log_metric_scorers = [get_metric(ele) for ele in log_metrics]
    stopping_metric_scorer = get_metric(stopping_metric)
    greater_is_better = stopping_metric_scorer.greater_is_better
    os.environ['MKL_NUM_THREADS'] = '1'
    os.environ['OMP_NUM_THREADS'] = '1'
    os.environ['MKL_DYNAMIC'] = 'FALSE'
    if ignore_warning:
        import warnings
        warnings.filterwarnings("ignore")
    search_space = args['search_space']
    cfg = base_config.clone()
    specified_values = []
    for key in search_space:
        specified_values.append(key)
        specified_values.append(search_space[key])
    cfg.merge_from_list(specified_values)
    exp_dir = cfg.misc.exp_dir
    if reporter is not None:
        # When the reporter is not None,
        # we create the saved directory based on the task_id + time
        task_id = args.task_id
        exp_dir = os.path.join(exp_dir, 'task{}'.format(task_id))
        os.makedirs(exp_dir, exist_ok=True)
        cfg.defrost()
        cfg.misc.exp_dir = exp_dir
        cfg.freeze()
    logger = logging.getLogger()
    logging_config(folder=exp_dir, name='training', logger=logger, console=console_log)
    logger.info(cfg)
    # Load backbone model
    backbone_model_cls, backbone_cfg, tokenizer, backbone_params_path, _ \
        = get_backbone(cfg.model.backbone.name)
    with open(os.path.join(exp_dir, 'cfg.yml'), 'w') as f:
        f.write(str(cfg))
    text_backbone = backbone_model_cls.from_cfg(backbone_cfg)
    # Build Preprocessor + Preprocess the training dataset + Inference problem type
    # TODO Move preprocessor + Dataloader to outer loop to better cache the dataloader
    preprocessor = TabularBasicBERTPreprocessor(tokenizer=tokenizer,
                                                column_properties=column_properties,
                                                label_columns=label_columns,
                                                max_length=cfg.model.preprocess.max_length,
                                                merge_text=cfg.model.preprocess.merge_text)
    logger.info('Process training set...')
    processed_train = preprocessor.process_train(train_data)
    logger.info('Done!')
    logger.info('Process dev set...')
    processed_dev = preprocessor.process_test(tuning_data)
    logger.info('Done!')
    label = label_columns[0]
    # Get the ground-truth dev labels
    gt_dev_labels = np.array(tuning_data[label].apply(column_properties[label].transform))
    ctx_l = get_mxnet_available_ctx()
    base_batch_size = cfg.optimization.per_device_batch_size
    num_accumulated = int(np.ceil(cfg.optimization.batch_size / base_batch_size))
    inference_base_batch_size = base_batch_size * cfg.optimization.val_batch_size_mult
    train_dataloader = DataLoader(processed_train,
                                  batch_size=base_batch_size,
                                  shuffle=True,
                                  batchify_fn=preprocessor.batchify(is_test=False))
    dev_dataloader = DataLoader(processed_dev,
                                batch_size=inference_base_batch_size,
                                shuffle=False,
                                batchify_fn=preprocessor.batchify(is_test=True))
    net = BERTForTabularBasicV1(text_backbone=text_backbone,
                                feature_field_info=preprocessor.feature_field_info(),
                                label_shape=label_shapes[0],
                                cfg=cfg.model.network)
    net.initialize_with_pretrained_backbone(backbone_params_path, ctx=ctx_l)
    net.hybridize()
    num_total_params, num_total_fixed_params = count_parameters(net.collect_params())
    logger.info('#Total Params/Fixed Params={}/{}'.format(num_total_params,
                                                           num_total_fixed_params))
    # Initialize the optimizer
    updates_per_epoch = int(len(train_dataloader) / (num_accumulated * len(ctx_l)))
    optimizer, optimizer_params, max_update \
        = get_optimizer(cfg.optimization,
                        updates_per_epoch=updates_per_epoch)
    valid_interval = math.ceil(cfg.optimization.valid_frequency * updates_per_epoch)
    train_log_interval = math.ceil(cfg.optimization.log_frequency * updates_per_epoch)
    trainer = mx.gluon.Trainer(net.collect_params(),
                               optimizer, optimizer_params,
                               update_on_kvstore=False)
    if 0 < cfg.optimization.layerwise_lr_decay < 1:
        apply_layerwise_decay(net.text_backbone,
                              cfg.optimization.layerwise_lr_decay,
                              backbone_name=cfg.model.backbone.name)
    # Do not apply weight decay to all the LayerNorm and bias
    for _, v in net.collect_params('.*beta|.*gamma|.*bias').items():
        v.wd_mult = 0.0
    params = [p for p in net.collect_params().values() if p.grad_req != 'null']

    # Set grad_req if gradient accumulation is required
    if num_accumulated > 1:
        logger.info('Using gradient accumulation.'
                    ' Global batch size = {}'.format(cfg.optimization.batch_size))
        for p in params:
            p.grad_req = 'add'
        net.collect_params().zero_grad()
    train_loop_dataloader = grouper(repeat(train_dataloader), len(ctx_l))
    log_loss_l = [mx.np.array(0.0, dtype=np.float32, ctx=ctx) for ctx in ctx_l]
    log_num_samples_l = [0 for _ in ctx_l]
    logging_start_tick = time.time()
    best_performance_score = None
    mx.npx.waitall()
    no_better_rounds = 0
    report_idx = 0
    start_tick = time.time()
    if time_limits is not None:
        time_limits -= start_tick - time_start
        if time_limits <= 0:
            reporter.terminate()
            return
    best_report_items = None
    for update_idx in tqdm.tqdm(range(max_update), disable=None):
        num_samples_per_update_l = [0 for _ in ctx_l]
        for accum_idx in range(num_accumulated):
            sample_l = next(train_loop_dataloader)
            loss_l = []
            num_samples_l = [0 for _ in ctx_l]
            for i, (sample, ctx) in enumerate(zip(sample_l, ctx_l)):
                feature_batch, label_batch = sample
                feature_batch = move_to_ctx(feature_batch, ctx)
                label_batch = move_to_ctx(label_batch, ctx)
                with mx.autograd.record():
                    pred = net(feature_batch)
                    if problem_types[0] == _C.CLASSIFICATION:
                        logits = mx.npx.log_softmax(pred, axis=-1)
                        loss = - mx.npx.pick(logits, label_batch[0])
                    elif problem_types[0] == _C.REGRESSION:
                        loss = mx.np.square(pred - label_batch[0])
                    loss_l.append(loss.mean() / len(ctx_l))
                    num_samples_l[i] = loss.shape[0]
                    num_samples_per_update_l[i] += loss.shape[0]
            for loss in loss_l:
                loss.backward()
            for i in range(len(ctx_l)):
                log_loss_l[i] += loss_l[i] * len(ctx_l) * num_samples_l[i]
                log_num_samples_l[i] += num_samples_per_update_l[i]
        # Begin to update
        trainer.allreduce_grads()
        num_samples_per_update = sum(num_samples_per_update_l)
        total_norm, ratio, is_finite = \
            clip_grad_global_norm(params, cfg.optimization.max_grad_norm * num_accumulated)
        total_norm = total_norm / num_accumulated
        trainer.update(num_samples_per_update)

        # Clear after update
        if num_accumulated > 1:
            net.collect_params().zero_grad()
        if (update_idx + 1) % train_log_interval == 0:
            log_loss = sum([ele.as_in_ctx(ctx_l[0]) for ele in log_loss_l]).asnumpy()
            log_num_samples = sum(log_num_samples_l)
            logger.info(
                '[Iter {}/{}, Epoch {}] train loss={:0.4e}, gnorm={:0.4e}, lr={:0.4e}, #samples processed={},'
                ' #sample per second={:.2f}'
                    .format(update_idx + 1, max_update,
                            int(update_idx / updates_per_epoch),
                            log_loss / log_num_samples, total_norm, trainer.learning_rate,
                            log_num_samples,
                            log_num_samples / (time.time() - logging_start_tick)))
            logging_start_tick = time.time()
            log_loss_l = [mx.np.array(0.0, dtype=np.float32, ctx=ctx) for ctx in ctx_l]
            log_num_samples_l = [0 for _ in ctx_l]
        if (update_idx + 1) % valid_interval == 0 or (update_idx + 1) == max_update:
            valid_start_tick = time.time()
            dev_predictions = \
                _classification_regression_predict(net, dataloader=dev_dataloader,
                                                   problem_type=problem_types[0],
                                                   has_label=False)
            log_scores = [calculate_metric(scorer, gt_dev_labels, dev_predictions, problem_types[0])
                          for scorer in log_metric_scorers]
            dev_score = calculate_metric(stopping_metric_scorer, gt_dev_labels, dev_predictions,
                                         problem_types[0])
            valid_time_spent = time.time() - valid_start_tick

            if best_performance_score is None or \
                    (greater_is_better and dev_score >= best_performance_score) or \
                    (not greater_is_better and dev_score <= best_performance_score):
                find_better = True
                no_better_rounds = 0
                best_performance_score = dev_score
                net.save_parameters(os.path.join(exp_dir, 'best_model.params'))
            else:
                find_better = False
                no_better_rounds += 1
            mx.npx.waitall()
            loss_string = ', '.join(['{}={:0.4e}'.format(metric.name, score)
                                     for score, metric in zip(log_scores, log_metric_scorers)])
            logger.info('[Iter {}/{}, Epoch {}] valid {}, time spent={:.3f}s,'
                         ' total_time={:.2f}min'.format(
                update_idx + 1, max_update, int(update_idx / updates_per_epoch),
                loss_string, valid_time_spent, (time.time() - start_tick) / 60))
            report_items = [('iteration', update_idx + 1),
                            ('report_idx', report_idx + 1),
                            ('epoch', int(update_idx / updates_per_epoch))] +\
                           [(metric.name, score)
                            for score, metric in zip(log_scores, log_metric_scorers)] + \
                           [('find_better', find_better),
                            ('time_spent', int(time.time() - start_tick))]
            total_time_spent = time.time() - start_tick

            if stopping_metric_scorer._sign < 0:
                report_items.append(('reward_attr', -dev_score))
            else:
                report_items.append(('reward_attr', dev_score))
            report_items.append(('eval_metric', stopping_metric_scorer.name))
            report_items.append(('exp_dir', exp_dir))
            if find_better:
                best_report_items = report_items
            reporter(**dict(report_items))
            report_idx += 1
            if no_better_rounds >= cfg.learning.early_stopping_patience:
                logger.info('Early stopping patience reached!')
                break
            if time_limits is not None and total_time_spent > time_limits:
                break

    best_report_items_dict = dict(best_report_items)
    best_report_items_dict['report_idx'] = report_idx + 1
    reporter(**best_report_items_dict)

[docs]@use_np class BertForTextPredictionBasic: """A model object returned by `fit()` in TextPrediction tasks. Use for making predictions on new data and viewing information about models trained during `fit()`. """ def __init__(self, column_properties, label_columns, feature_columns, label_shapes, problem_types, stopping_metric, log_metrics, output_directory=None, logger=None, base_config=None, search_space=None): """Creates model object. Parameters ---------- column_properties The column properties. label_columns Label columns. feature_columns label_shapes problem_types stopping_metric log_metrics output_directory logger base_config The basic configuration that the search space will be based upon. search_space The hyperparameter search space. """ super(BertForTextPredictionBasic, self).__init__() if base_config is None: self._base_config = base_cfg() else: self._base_config = base_cfg().clone_merge(base_config) self._base_config.defrost() if output_directory is not None: self._base_config.misc.exp_dir = output_directory else: output_directory = self._base_config.misc.exp_dir self._base_config.misc.exp_dir = os.path.abspath(self._base_config.misc.exp_dir) self._base_config.freeze() if search_space is None: self._search_space = dict() else: assert isinstance(search_space, dict) self._search_space = search_space self._column_properties = column_properties self._stopping_metric = stopping_metric self._log_metrics = log_metrics self._logger = logger self._output_directory = output_directory self._label_columns = label_columns self._feature_columns = feature_columns self._label_shapes = label_shapes self._problem_types = problem_types # Need to be set in the fit call self._net = None self._embed_net = None self._preprocessor = None self._config = None self._results = None @property def label_columns(self): return self._label_columns @property def label_shapes(self): return self._label_shapes @property def problem_types(self): return self._problem_types @property def feature_columns(self): return self._feature_columns @property def search_space(self): return self._search_space @property def base_config(self): return self._base_config @property def results(self): return self._results @property def config(self): return self._config @property def net(self): return self._net @staticmethod def default_config(): """Get the default configuration Returns ------- cfg The configuration specified by the key """ return base_cfg() def train(self, train_data, tuning_data, resource, time_limits=None, search_strategy='random', search_options=None, scheduler_options=None, num_trials=None, plot_results=False, console_log=True, ignore_warning=True, verbosity=2): if search_strategy != 'local_sequential_auto': force_forkserver() start_tick = time.time() logging_config(folder=self._output_directory, name='main', console=console_log, logger=self._logger) assert len(self._label_columns) == 1 # TODO(sxjscience) Try to support S3 os.makedirs(self._output_directory, exist_ok=True) search_space_reg = args(search_space=space.Dict(**self.search_space)) # Scheduler and searcher for HPO if scheduler_options is None: scheduler_options = dict() scheduler_options = compile_scheduler_options( scheduler_options=scheduler_options, search_strategy=search_strategy, search_options=search_options, nthreads_per_trial=resource['num_cpus'], ngpus_per_trial=resource['num_gpus'], checkpoint=os.path.join(self._output_directory, 'checkpoint.ag'), num_trials=num_trials, time_out=time_limits, resume=False, visualizer=scheduler_options.get('visualizer'), time_attr='report_idx', reward_attr='reward_attr', dist_ip_addrs=scheduler_options.get('dist_ip_addrs')) # Create a temporary cache file and then ask the inner function to load the # temporary cache. train_df_path = os.path.join(self._output_directory, 'cache_train_dataframe.pq') tuning_df_path = os.path.join(self._output_directory, 'cache_tuning_dataframe.pq') train_data.table.to_parquet(train_df_path) tuning_data.table.to_parquet(tuning_df_path) train_fn = search_space_reg(functools.partial(train_function, train_df_path=train_df_path, time_limits=time_limits, time_start=start_tick, tuning_df_path=tuning_df_path, base_config=self.base_config, problem_types=self.problem_types, column_properties=self._column_properties, label_columns=self._label_columns, label_shapes=self._label_shapes, log_metrics=self._log_metrics, stopping_metric=self._stopping_metric, console_log=console_log, ignore_warning=ignore_warning)) scheduler_cls = schedulers[search_strategy.lower()] # Create scheduler, run HPO experiment scheduler = scheduler_cls(train_fn, **scheduler_options) scheduler.run() scheduler.join_jobs() if len(scheduler.config_history) == 0: raise RuntimeError('No training job has been completed! ' 'There are two possibilities: ' '1) The time_limits is too small, ' 'or 2) There are some internal errors in AutoGluon. ' 'For the first case, you can increase the time_limits or set it to ' 'None, e.g., setting "TextPrediction.fit(..., time_limits=None). To ' 'further investigate the root cause, you can also try to train with ' '"verbosity=3", i.e., TextPrediction.fit(..., verbosity=3).') best_config = scheduler.get_best_config() if verbosity >= 2: self._logger.info('Results=', scheduler.searcher._results) self._logger.info('Best_config={}'.format(best_config)) best_task_id = scheduler.get_best_task_id() best_model_saved_dir_path = os.path.join(self._output_directory, 'task{}'.format(best_task_id)) best_cfg_path = os.path.join(best_model_saved_dir_path, 'cfg.yml') cfg = self.base_config.clone_merge(best_cfg_path) self._results = dict() self._results.update(best_reward=scheduler.get_best_reward(), best_config=scheduler.get_best_config(), total_time=time.time() - start_tick, metadata=scheduler.metadata, training_history=scheduler.training_history, config_history=scheduler.config_history, reward_attr=scheduler._reward_attr, config=cfg) if plot_results: plot_training_curves = os.path.join(self._output_directory, 'plot_training_curves.png') scheduler.get_training_curves(filename=plot_training_curves, plot=plot_results, use_legend=True) # Consider to move this to a separate predictor self._config = cfg backbone_model_cls, backbone_cfg, tokenizer, backbone_params_path, _ \ = get_backbone(cfg.model.backbone.name) text_backbone = backbone_model_cls.from_cfg(backbone_cfg) preprocessor = TabularBasicBERTPreprocessor(tokenizer=tokenizer, column_properties=self._column_properties, label_columns=self._label_columns, max_length=cfg.model.preprocess.max_length, merge_text=cfg.model.preprocess.merge_text) self._preprocessor = preprocessor net = BERTForTabularBasicV1(text_backbone=text_backbone, feature_field_info=preprocessor.feature_field_info(), label_shape=self._label_shapes[0], cfg=cfg.model.network) net.hybridize() ctx_l = get_mxnet_available_ctx() net.load_parameters(os.path.join(best_model_saved_dir_path, 'best_model.params'), ctx=ctx_l) self._net = net mx.npx.waitall()
[docs] def evaluate(self, valid_data, metrics): """ Report the predictive performance evaluated for a given dataset. Parameters ---------- valid_data : str or :class:`TabularDataset` or `pandas.DataFrame` This Dataset must also contain the label-column with the same column-name as specified during `fit()`. If str is passed, `valid_data` will be loaded using the str value as the file path. metrics : List[str] A list of names of metrics to report. Returns ------- Dict mapping metric -> score calculated over the given dataset. """ if isinstance(metrics, str): metrics = [metrics] assert self.net is not None if not isinstance(valid_data, TabularDataset): valid_data = TabularDataset(valid_data, columns=self._feature_columns + self._label_columns, column_properties=self._column_properties) ground_truth = np.array(valid_data.table[self._label_columns[0]].apply( self._column_properties[self._label_columns[0]].transform)) if self._problem_types[0] == _C.CLASSIFICATION: predictions = self.predict_proba(valid_data) else: predictions = self.predict(valid_data) metric_scores = {metric: calculate_metric(get_metric(metric), ground_truth, predictions, self.problem_types[0]) for metric in metrics} return metric_scores
def _internal_predict(self, test_data, get_original_labels=True, get_probabilities=False): assert self.net is not None assert self.config is not None if not isinstance(test_data, TabularDataset): if isinstance(test_data, (list, dict)): test_data = pd.DataFrame(test_data) test_data = TabularDataset(test_data, columns=self._feature_columns, column_properties=self._column_properties) processed_test = self._preprocessor.process_test(test_data) inference_batch_size = self.config.optimization.per_device_batch_size\ * self.config.optimization.val_batch_size_mult test_dataloader = DataLoader(processed_test, batch_size=inference_batch_size, shuffle=False, batchify_fn=self._preprocessor.batchify(is_test=True)) test_predictions = _classification_regression_predict(self._net, dataloader=test_dataloader, problem_type=self._problem_types[0], has_label=False) if self._problem_types[0] == _C.CLASSIFICATION: if get_probabilities: return test_predictions else: test_predictions = test_predictions.argmax(axis=-1) if get_original_labels: test_predictions = np.array( list(map(self._column_properties[self._label_columns[0]].inv_transform, test_predictions))) return test_predictions @property def class_labels(self): """The original name of the class labels. For example, the tabular data may contain classes equal to "entailment", "contradiction", "neutral". Internally, these will be converted to 0, 1, 2, ... This function returns the original names of these raw labels. Returns ------- ret List that contain the class names """ if self._problem_types[0] != _C.CLASSIFICATION: warnings.warn('Accessing class names for a non-classification problem. Return None.') return None else: return self._column_properties[self._label_columns[0]].categories
[docs] def predict_proba(self, test_data): """Predict class probabilities instead of class labels (for classification tasks). Parameters ---------- test_data : `pandas.DataFrame`, `autogluon.tabular.TabularDataset`, or str The test data to get predictions for. Can be DataFrame/Dataset or a file that can be loaded into DataFrame/Dataset. Returns ------- probabilities : array The predicted class probabilities for each sample. Shape of this array is (#Samples, num_class). Here, the i-th number means the probability of belonging to the i-th class. You can access the class names by calling `self.class_names`. """ assert self.problem_types[0] == _C.CLASSIFICATION return self._internal_predict(test_data, get_original_labels=False, get_probabilities=True)
[docs] def predict(self, test_data, get_original_labels=True): """Make predictions on new data. Parameters ---------- test_data : `pandas.DataFrame`, `autogluon.tabular.TabularDataset`, or str The test data to get predictions for. Can be DataFrame/Dataset or a file that can be loaded into DataFrame/Dataset. get_original_labels : bool, default = True Whether or not predictions should be formatted in terms of the original labels. For example, the labels might be "entailment" or "not_entailment" and predictions could either be of this form (if `True`) or integer-indices corresponding to these classes (if `False`). Returns ------- predictions : array The predictions for each sample. Shape of this array is (#Samples,). """ return self._internal_predict(test_data, get_original_labels=get_original_labels, get_probabilities=False)
[docs] def save(self, dir_path): """Save this model to disk. Parameters ---------- dir_path : str Directory where the model should be saved. """ os.makedirs(dir_path, exist_ok=True) self.net.save_parameters(os.path.join(dir_path, 'net.params')) with open(os.path.join(dir_path, 'cfg.yml'), 'w') as of: of.write(self.config.dump()) with open(os.path.join(dir_path, 'column_metadata.json'), 'w') as of: json.dump(get_column_property_metadata(self._column_properties), of, ensure_ascii=True) # Save an additional assets about the parsed dataset information with open(os.path.join(dir_path, 'assets.json'), 'w') as of: json.dump( { 'label_columns': self._label_columns, 'label_shapes': self._label_shapes, 'problem_types': self._problem_types, 'feature_columns': self._feature_columns, 'version': version.__version__, }, of, ensure_ascii=True)
def cuda(self): """Try to use CUDA for inference""" self._net.collect_params().reset_ctx(mx.gpu()) def cpu(self): """Switch to use CPU for inference""" self._net.collect_params().reset_ctx(mx.cpu())
[docs] @classmethod def load(cls, dir_path: str): """Load a model object previously produced by `fit()` from disk and return this object. It is highly recommended the predictor be loaded with the exact AutoGluon version it was fit with. Parameters ---------- dir_path Path to directory where this model was previously saved. use_gpu Whether try to use GPU if possible. Returns ------- model A `BertForTextPredictionBasic` object that can be used for making predictions on new data. """ loaded_config = cls.default_config().clone_merge(os.path.join(dir_path, 'cfg.yml')) with open(os.path.join(dir_path, 'assets.json'), 'r') as f: assets = json.load(f) label_columns = assets['label_columns'] feature_columns = assets['feature_columns'] label_shapes = assets['label_shapes'] problem_types = assets['problem_types'] column_properties = get_column_properties_from_metadata( os.path.join(dir_path, 'column_metadata.json')) backbone_model_cls, backbone_cfg, tokenizer, backbone_params_path, _ \ = get_backbone(loaded_config.model.backbone.name) # Initialize the preprocessor preprocessor = TabularBasicBERTPreprocessor( tokenizer=tokenizer, column_properties=column_properties, label_columns=label_columns, max_length=loaded_config.model.preprocess.max_length, merge_text=loaded_config.model.preprocess.merge_text) text_backbone = backbone_model_cls.from_cfg(backbone_cfg) net = BERTForTabularBasicV1(text_backbone=text_backbone, feature_field_info=preprocessor.feature_field_info(), label_shape=label_shapes[0], cfg=loaded_config.model.network) net.hybridize() ctx_l = get_mxnet_available_ctx() net.load_parameters(os.path.join(dir_path, 'net.params'), ctx=ctx_l) model = cls(column_properties=column_properties, label_columns=label_columns, feature_columns=feature_columns, label_shapes=label_shapes, problem_types=problem_types, stopping_metric=None, log_metrics=None, base_config=loaded_config) model._net = net model._preprocessor = preprocessor model._config = loaded_config return model
def extract_embedding(self, data): """Extract the embedding from the pretrained model. Returns ------- embeddings The output embeddings will have shape (#samples, embedding_dim) """ if not isinstance(data, TabularDataset): if isinstance(data, (list, dict)): data = pd.DataFrame(data) data = TabularDataset(data, columns=self._feature_columns, column_properties=self._column_properties) processed_data = self._preprocessor.process_test(data) inference_batch_size = self.config.optimization.per_device_batch_size\ * self.config.optimization.val_batch_size_mult dataloader = DataLoader(processed_data, batch_size=inference_batch_size, shuffle=False, batchify_fn=self._preprocessor.batchify(is_test=True)) if self._embed_net is None: embed_net = BERTForTabularBasicV1( text_backbone=self.net.text_backbone, feature_field_info=self._preprocessor.feature_field_info(), label_shape=self.label_shapes[0], cfg=self.config.model.network, get_embedding=True, params=self.net.collect_params(), prefix='embed_net_') embed_net.hybridize() self._embed_net = embed_net embeddings = _classification_regression_predict(self._embed_net, dataloader=dataloader, problem_type=self._problem_types[0], has_label=False, extract_embedding=True) return embeddings