Source code for autogluon.core.scheduler.fifo

import copy
import json
import logging
import multiprocessing as mp
import os
import pickle
import threading
import time
from collections import OrderedDict
from time import sleep

import numpy as np
from tqdm.auto import tqdm

from .reporter import DistStatusReporter, FakeReporter
from .resource import DistributedResource
from .scheduler import TaskScheduler
from ..decorator import _autogluon_method
from ..searcher import BaseSearcher
from ..searcher import searcher_factory
from ..searcher.bayesopt.tuning_algorithms.base_classes import DEFAULT_CONSTRAINT_METRIC
from ..task.task import Task
from ..utils import save, load, mkdir, try_import_mxboard
from ..utils.default_arguments import check_and_merge_defaults, \
    Float, Integer, String, Boolean, assert_no_invalid_options

__all__ = ['FIFOScheduler']

logger = logging.getLogger(__name__)


_ARGUMENT_KEYS = {
    'args', 'resource', 'searcher', 'search_options', 'checkpoint', 'resume',
    'num_trials', 'time_out', 'max_reward', 'reward_attr', 'time_attr', 'constraint_attr',
    'dist_ip_addrs', 'visualizer', 'training_history_callback',
    'training_history_callback_delta_secs', 'training_history_searcher_info',
    'delay_get_config', 'stop_jobs_after_time_out'}

_DEFAULT_OPTIONS = {
    'resource': {'num_cpus': 1, 'num_gpus': 0},
    'searcher': 'random',
    'resume': False,
    'reward_attr': 'accuracy',
    'time_attr': 'epoch',
    'constraint_attr': DEFAULT_CONSTRAINT_METRIC,
    'visualizer': 'none',
    'training_history_callback_delta_secs': 60,
    'training_history_searcher_info': False,
    'delay_get_config': True,
    'stop_jobs_after_time_out': True}

_CONSTRAINTS = {
    'checkpoint': String(),
    'resume': Boolean(),
    'num_trials': Integer(1, None),
    'time_out': Float(0.0, None),
    'max_reward': Float(),
    'reward_attr': String(),
    'time_attr': String(),
    'constraint_attr': String(),
    'visualizer': String(),
    'training_history_callback_delta_secs': Integer(1, None),
    'training_history_searcher_info': Boolean(),
    'delay_get_config': Boolean(),
    'stop_jobs_after_time_out': Boolean()}


[docs]class FIFOScheduler(TaskScheduler): r"""Simple scheduler that just runs trials in submission order. Parameters ---------- train_fn : callable A task launch function for training. args : object (optional) Default arguments for launching train_fn. resource : dict Computation resources. For example, `{'num_cpus':2, 'num_gpus':1}` searcher : str or BaseSearcher Searcher (get_config decisions). If str, this is passed to searcher_factory along with search_options. search_options : dict If searcher is str, these arguments are passed to searcher_factory. checkpoint : str If filename given here, a checkpoint of scheduler (and searcher) state is written to file every time a job finishes. Note: May not be fully supported by all searchers. resume : bool If True, scheduler state is loaded from checkpoint, and experiment starts from there. Note: May not be fully supported by all searchers. num_trials : int Maximum number of jobs run in experiment. One of `num_trials`, `time_out` must be given. time_out : float If given, jobs are started only until this time_out (wall clock time). Moreover, we also stop jobs after time_out has passed, when they report a result. One of `num_trials`, `time_out` must be given. reward_attr : str Name of reward (i.e., metric to maximize) attribute in data obtained from reporter constraint_attr : str Name of constraint attribute in data obtained from reporter for running constrained Bayesian optimization time_attr : str Name of resource (or time) attribute in data obtained from reporter. This attribute is optional for FIFO scheduling, but becomes mandatory in multi-fidelity scheduling (e.g., Hyperband). Note: The type of resource must be int. dist_ip_addrs : list of str IP addresses of remote machines. training_history_callback : callable Callback function called every time a result is added to training_history, if at least training_history_callback_delta_secs seconds passed since the last recent call. See _add_training_result for the signature of this callback function. Use this callback to serialize self.training_history after regular intervals. training_history_callback_delta_secs : float See training_history_callback. training_history_searcher_info : bool If True, information about the current state of the searcher is added to every reported_result before added to training_history. This info includes in particular the current hyperparameters of the surrogate model of the searcher, as well as the dataset size. delay_get_config : bool If True, the call to searcher.get_config is delayed until a worker resource for evaluation is available. Otherwise, get_config is called just after a job has been started. For searchers which adapt to past data, True should be preferred. Otherwise, it does not matter. stop_jobs_after_time_out : bool Relevant only if `time_out` is used. If True, jobs which report a metric are stopped once `time_out` has passed. Otherwise, such jobs are allowed to continue until the end, or until stopped for other reasons. The latter can mean an experiment runs far longer than `time_out`. Examples -------- >>> import numpy as np >>> import autogluon.core as ag >>> @ag.args( ... lr=ag.space.Real(1e-3, 1e-2, log=True), ... wd=ag.space.Real(1e-3, 1e-2)) >>> def train_fn(args, reporter): ... print('lr: {}, wd: {}'.format(args.lr, args.wd)) ... for e in range(10): ... dummy_accuracy = 1 - np.power(1.8, -np.random.uniform(e, 2*e)) ... reporter(epoch=e+1, accuracy=dummy_accuracy, lr=args.lr, wd=args.wd) >>> scheduler = ag.scheduler.FIFOScheduler(train_fn, ... resource={'num_cpus': 2, 'num_gpus': 0}, ... num_trials=20, ... reward_attr='accuracy', ... time_attr='epoch') >>> scheduler.run() >>> scheduler.join_jobs() >>> scheduler.get_training_curves(plot=True) """ def __init__(self, train_fn, **kwargs): super().__init__(kwargs.get('dist_ip_addrs')) # Check values and impute default values assert_no_invalid_options( kwargs, _ARGUMENT_KEYS, name='FIFOScheduler') kwargs = check_and_merge_defaults( kwargs, set(), _DEFAULT_OPTIONS, _CONSTRAINTS, dict_name='scheduler_options') self.resource = kwargs['resource'] searcher = kwargs['searcher'] search_options = kwargs.get('search_options') if isinstance(searcher, str): if search_options is None: search_options = dict() _search_options = search_options.copy() _search_options['configspace'] = train_fn.cs _search_options['reward_attribute'] = kwargs['reward_attr'] _search_options['constraint_attr'] = kwargs['constraint_attr'] _search_options['resource_attribute'] = kwargs['time_attr'] # Adjoin scheduler info to search_options, if not already done by # subclass if 'scheduler' not in _search_options: _search_options['scheduler'] = 'fifo' self.searcher: BaseSearcher = searcher_factory( searcher, **_search_options) else: assert isinstance(searcher, BaseSearcher) self.searcher: BaseSearcher = searcher assert isinstance(train_fn, _autogluon_method) self.train_fn = train_fn args = kwargs.get('args') self.args = args if args else train_fn.args num_trials = kwargs.get('num_trials') time_out = kwargs.get('time_out') if num_trials is None: assert time_out is not None, \ "Need stopping criterion: Either num_trials or time_out" self.num_trials = num_trials self.time_out = time_out self.max_reward = kwargs.get('max_reward') # meta data self.metadata = { 'search_space': train_fn.kwspaces, 'search_strategy': searcher, 'stop_criterion': { 'time_limits': time_out, 'max_reward': self.max_reward}, 'resources_per_trial': self.resource} checkpoint = kwargs.get('checkpoint') self._checkpoint = checkpoint self._reward_attr = kwargs['reward_attr'] self._time_attr = kwargs['time_attr'] self._constraint_attr = kwargs['constraint_attr'] self.visualizer = kwargs['visualizer'].lower() if self.visualizer == 'tensorboard' or self.visualizer == 'mxboard': assert checkpoint is not None, "Need checkpoint to be set" try_import_mxboard() from mxboard import SummaryWriter self.mxboard = SummaryWriter( logdir=os.path.join(os.path.splitext(checkpoint)[0], 'logs'), flush_secs=3, verbose=False ) self._fifo_lock = mp.Lock() # training_history maintains the complete history of the experiment, # in terms of all results obtained from the reporter. Keys are # str(task.task_id) self.training_history = OrderedDict() self.config_history = OrderedDict() # Needed for training_history callback mechanism, which is used to # serialize training_history after each # training_history_call_delta_secs seconds self._start_time = None self._training_history_callback_last_block = None self.training_history_callback = kwargs.get('training_history_callback') self.training_history_callback_delta_secs = \ kwargs['training_history_callback_delta_secs'] self.training_history_searcher_info = \ kwargs['training_history_searcher_info'] self._delay_get_config = kwargs['delay_get_config'] self._stop_jobs_after_time_out = False if time_out is not None: self._stop_jobs_after_time_out = kwargs['stop_jobs_after_time_out'] if self._stop_jobs_after_time_out: msg = \ "The meaning of 'time_out' has changed. Previously, jobs started before\n" + \ "'time_out' were allowed to continue until stopped by other means. Now,\n" + \ "we stop jobs once 'time_out' is passed (at the next metric reporting).\n" + \ "If you like to keep the old behaviour, use\n" + \ " 'stop_jobs_after_time_out=False'" logger.warning(msg) # Resume experiment from checkpoint? if kwargs['resume']: assert checkpoint is not None, \ "Need checkpoint to be set if resume = True" if os.path.isfile(checkpoint): self.load_state_dict(load(checkpoint)) else: msg = f'checkpoint path {checkpoint} is not available for resume.' logger.critical(msg) raise FileExistsError(msg)
[docs] def run(self, **kwargs): """Run multiple number of trials """ # Make sure that this scheduler is configured at the searcher self.searcher.configure_scheduler(self) start_time = time.time() self._start_time = start_time num_trials = kwargs.get('num_trials', self.num_trials) time_out = kwargs.get('time_out', self.time_out) # For training_history callback mechanism: self._training_history_callback_last_block = -1 log_suffix = '' if time_out is not None: log_suffix = f' (time_out={round(time_out, 2)}s)' elif num_trials is not None: log_suffix = f' (num_trials={num_trials - self.num_finished_tasks})' logger.info(f'Starting Hyperparameter Tuning ...{log_suffix}') logger.log(15, f'Num of Finished Tasks is {self.num_finished_tasks}') if num_trials is not None: logger.log(15, f'Num of Pending Tasks is {num_trials - self.num_finished_tasks}') tbar = tqdm(range(self.num_finished_tasks, num_trials)) else: # In this case, only stopping by time_out is used. We do not display # a progress bar then tbar = range(self.num_finished_tasks, 100000) for _ in tbar: # Quick check if resources are available before we check the time limit # This is to prevent booking next job while we are waiting for a resource, which # results in going over the time limit resources = DistributedResource(**self.resource) while not FIFOScheduler.managers.check_availability(resources): sleep(0.1) if (time_out and time.time() - start_time >= time_out) or \ (self.max_reward and self.get_best_reward() >= self.max_reward): break self.schedule_next()
[docs] def save(self, checkpoint=None): """Save Checkpoint """ if checkpoint is None: checkpoint = self._checkpoint if checkpoint is not None: mkdir(os.path.dirname(checkpoint)) save(self.state_dict(), checkpoint)
def _create_new_task(self, config, resources=None): if resources is None: resources = DistributedResource(**self.resource) return Task( self.train_fn, {'args': self.args, 'config': config}, resources=resources)
[docs] def schedule_next(self): """Schedule next searcher suggested task """ resources = DistributedResource(**self.resource) if self._delay_get_config: # Wait for available resource here, instead of in add_job. This # delays the get_config call until a resource is available FIFOScheduler.managers.request_resources(resources) # Allow for the promotion of a previously chosen config. Also, # extra_kwargs contains extra info passed to both add_job and to # get_config (if no config is promoted) config, extra_kwargs = self._promote_config() # Time stamp to be used in get_config, and maybe in add_job extra_kwargs['elapsed_time'] = self._elapsed_time() if config is None: # No config to promote: Query next config to evaluate from searcher config = self.searcher.get_config(**extra_kwargs) extra_kwargs['new_config'] = True else: # This is not a new config, but a paused one which is now promoted extra_kwargs['new_config'] = False task = self._create_new_task(config, resources=resources) self.add_job(task, **extra_kwargs)
[docs] def run_with_config(self, config): """Run with config for final fit. It launches a single training trial under any fixed values of the hyperparameters. For example, after HPO has identified the best hyperparameter values based on a hold-out dataset, one can use this function to retrain a model with the same hyperparameters on all the available labeled data (including the hold out set). It can also returns other objects or states. """ task = self._create_new_task(config) reporter = FakeReporter() task.args['reporter'] = reporter return self.run_job(task)
def _dict_from_task(self, task): if isinstance(task, Task): return {'TASK_ID': task.task_id, 'Config': task.args['config']} else: assert isinstance(task, dict) return {'TASK_ID': task['TASK_ID'], 'Config': task['Config']}
[docs] def on_task_add(self, task, **kwargs): """ Called when new task is added. Register new task, inform searcher (pending evaluation) and train_fn. :param task: :param kwargs: """ config = task.args['config'] # Register pending evaluation self.searcher.register_pending(config) # Relay config_id to train_fn (for debug logging) debug_log = self.searcher.debug_log if debug_log is not None: config_id = debug_log.config_id(config) task.args['args']['config_id'] = config_id
def _class_for_add_job(self): return FIFOScheduler def _add_checkpointing_to_job(self, job): def _save_checkpoint_callback(fut): self._cleaning_tasks() self.save() job.add_done_callback(_save_checkpoint_callback)
[docs] def add_job(self, task, **kwargs): """Adding a training task to the scheduler. Args: task (:class:`autogluon.scheduler.Task`): a new training task Relevant entries in kwargs: - bracket: HB bracket to be used. Has been sampled in _promote_config - new_config: If True, task starts new config eval, otherwise it promotes a config (only if type == 'promotion') Only if new_config == False: - config_key: Internal key for config - resume_from: config promoted from this milestone - milestone: config promoted to this milestone (next from resume_from) """ cls = self._class_for_add_job() if not self._delay_get_config: # Wait for resource to become available here, as this has not # happened in schedule_next before cls.managers.request_resources(task.resources) # Create reporter (if not passed in kwargs) reporter = kwargs.get( 'reporter', DistStatusReporter(remote=task.resources.node)) task.args['reporter'] = reporter # Register task, inform searcher and train_fn self.on_task_add(task, **kwargs) # Main process if self.searcher.debug_log is not None: logger.info("Starting job on node = {}".format( task.resources.node.remote_id)) job = cls.jobs.start_distributed_job(task, cls.managers) # Reporter thread rp = threading.Thread( target=self._run_reporter, args=(task, job, reporter), daemon=False) rp.start() task_dict = self._dict_from_task(task) task_dict.update({'Task': task, 'Job': job, 'ReporterThread': rp}) with self.managers.lock: self.scheduled_tasks.append(task_dict) # Checkpoint thread if self._checkpoint is not None: self._add_checkpointing_to_job(job)
def _clean_task_internal(self, task_dict): task_dict['ReporterThread'].join() def _add_checkpointing_to_job(self, job): def _save_checkpoint_callback(fut): self._cleaning_tasks() self.save() job.add_done_callback(_save_checkpoint_callback) def _training_history_callback_current_block(self): return int(np.floor( self._elapsed_time() / self.training_history_callback_delta_secs)) def _trigger_training_history_callback(self): """ Decides whether 'training_history_callback' is to be called. This callback is (in general) serializing 'training_history' (or required parts of it). It may also serialize the current scheduler state. Checks condition for training_history_callback and executes the callback if this is met. This callback is (in general) serializing 'training_history' (or required parts of it). It may also serialize the current scheduler state. To this end, we partition time since start of experiment (see `_elapsed_time') into blocks of 'training_history_callback_delta_secs' seconds. The callback is triggered only if the last recent call happened in an earlier block. :return: Has callback been called? """ do_call = False if self.training_history_callback is not None: assert self._training_history_callback_last_block is not None current_block = self._training_history_callback_current_block() do_call = (current_block > self._training_history_callback_last_block) if do_call: # Note: no_fifo_lock = True avoids deadlock in 'state_dict', # since we acquired the _fifo_lock already self.training_history_callback( self.training_history, self._start_time, config_history=self.config_history, state_dict=self.state_dict(no_fifo_lock=True)) # Callback could take some time, so make sure to call # _training_history_callback_current_block here again self._training_history_callback_last_block = \ self._training_history_callback_current_block() return do_call def _add_training_result(self, task_id, reported_result, config=None): if self.visualizer == 'mxboard' or self.visualizer == 'tensorboard': if 'loss' in reported_result: self.mxboard.add_scalar( tag='loss', value=( f'task {task_id} valid_loss', reported_result['loss'] ), global_step=reported_result[self._time_attr] ) self.mxboard.add_scalar( tag=self._reward_attr, value=( f'task {task_id} {self._reward_attr}', reported_result[self._reward_attr] ), global_step=reported_result[self._time_attr] ) # This locking block includes the (optional) call of # training_history_callback. We need to make sure that it is not called # by two reporter threads at the same time with self._fifo_lock: # Note: We store all of reported_result in # training_history[task_id], not just the reward value. task_key = str(task_id) new_entry = copy.copy(reported_result) if task_key in self.training_history: self.training_history[task_key].append(new_entry) else: self.training_history[task_key] = [new_entry] if config: self.config_history[task_key] = config # training_history_callback mechanism self._trigger_training_history_callback() def _append_extra_searcher_info(self, result): # Extra information from searcher (optional) if self.training_history_searcher_info: dataset_size = self.searcher.dataset_size() if dataset_size > 0: result['searcher_data_size'] = dataset_size for k, v in self.searcher.model_parameters().items(): result['searcher_params_' + k] = v
[docs] def on_task_report(self, task, result): """ Called by reporter thread once a new result is reported. :param task: :param result: :return: Should reporter move on? Otherwise, it terminates """ debug_log = self.searcher.debug_log config = task.args['config'] config_id = debug_log.config_id(config) if debug_log else None if 'traceback' in result: # Evaluation has failed logger.critical(result['traceback']) self.searcher.evaluation_failed(config, **result) if debug_log is not None: msg = "config_id {}: Evaluation failed:\n{}".format( config_id, result['traceback']) logger.info(msg) return False # reporter terminate if result.get('done', False): return False # reporter terminate # If we are past self.time_out, we want to stop the job if self._stop_jobs_after_time_out: elapsed_time = self._elapsed_time() if elapsed_time > self.time_out: if debug_log is not None: msg = "config_id {}: Terminating because elapsed_time = {} > {} = self.time_out".format( config_id, elapsed_time, self.time_out) logger.info(msg) return False # reporter terminate if len(result) == 0: # An empty dict should just be skipped if debug_log is not None: msg = "config_id {}: Skipping empty dict received from reporter".format( config_id) logger.info(msg) else: self._append_extra_searcher_info(result) self._add_training_result(task.task_id, result, config=config) return True # reporter move on
def _run_reporter(self, task, task_job, reporter): last_result = None while not task_job.done(): reported_result = reporter.fetch() if self.on_task_report(task, reported_result): if len(reported_result) > 0: last_result = reported_result reporter.move_on() else: reporter.terminate() break # Pass all of last_result to searcher if last_result is not None: self.searcher.update(config=task.args['config'], **last_result) def _promote_config(self): """ Provides a hook in schedule_next, which allows to promote a config which has been selected and partially evaluated previously. :return: config, extra_args """ config = None extra_args = dict() return config, extra_args def _elapsed_time(self): """ :return: Time elapsed since start of experiment (see 'run') """ assert self._start_time is not None, \ "Experiment has not been started yet" return time.time() - self._start_time
[docs] def get_best_config(self): """Get the best configuration from the finished jobs. """ return self.searcher.get_best_config()
[docs] def get_best_task_id(self): """Get the task id that results in the best configuration/best reward. If there are duplicated configurations, we return the id of the first one. """ best_config = self.get_best_config() with self._fifo_lock: for task_id, config in self.config_history.items(): if pickle.dumps(best_config) == pickle.dumps(config): return task_id raise RuntimeError('The best config {} is not found in config history = {}. ' 'This should never happen!'.format(best_config, self.config_history))
[docs] def get_best_reward(self): """Get the best reward from the finished jobs. """ return self.searcher.get_best_reward()
[docs] def get_training_curves(self, filename=None, plot=False, use_legend=True): """Get Training Curves Parameters ---------- filename : str plot : bool use_legend : bool Examples -------- >>> scheduler.run() >>> scheduler.join_jobs() >>> scheduler.get_training_curves(plot=True) .. image:: https://github.com/zhanghang1989/AutoGluonWebdata/blob/master/doc/api/autogluon.1.png?raw=true """ if filename is None and not plot: logger.warning('Please either provide filename or allow plot in get_training_curves') import matplotlib.pyplot as plt eval_metric = self.__get_training_history_metric('eval_metric', default='validation_performance') sign_mult = int(self.__get_training_history_metric('greater_is_better', default=True)) * 2 - 1 plt.ylabel(eval_metric) plt.xlabel(self._time_attr) plt.title("Performance vs Training-Time in each HPO Trial") with self._fifo_lock: for task_id, task_res in self.training_history.items(): rewards = [x[self._reward_attr] * sign_mult for x in task_res] x = [x[self._time_attr] for x in task_res] plt.plot(x, rewards, label=f'task {task_id}') if use_legend: plt.legend(loc='best') if filename: logger.info(f'Saving Training Curve in {filename}') plt.savefig(filename) if plot: plt.show()
def __get_training_history_metric(self, metric, default=None): for _, task_res in self.training_history.items(): if task_res and metric in task_res[0]: return task_res[0][metric] return default
[docs] def state_dict(self, destination=None, no_fifo_lock=False): """ Returns a dictionary containing a whole state of the Scheduler. This is used for checkpointing. Note that the checkpoint only contains information which has been registered at scheduler and searcher. It does not contain information about currently running jobs, except what they reported before the checkpoint. Therefore, resuming an experiment from a checkpoint is slightly different from continuing the experiment past the checkpoint. The former behaves as if all currently running jobs are terminated at the checkpoint, and new jobs are scheduled from there, starting from scheduler and searcher state according to all information recorded until the checkpoint. Examples -------- >>> ag.save(scheduler.state_dict(), 'checkpoint.ag') """ destination = super().state_dict(destination) if not no_fifo_lock: self._fifo_lock.acquire() try: # The result of searcher.get_state can always be pickled destination['searcher'] = pickle.dumps(self.searcher.get_state()) destination['training_history'] = json.dumps(self.training_history) destination['config_history'] = json.dumps(self.config_history) finally: if not no_fifo_lock: self._fifo_lock.release() if self.visualizer == 'mxboard' or self.visualizer == 'tensorboard': destination['visualizer'] = json.dumps(self.mxboard._scalar_dict) return destination
[docs] def load_state_dict(self, state_dict): """ Load from the saved state dict. This can be used to resume an experiment from a checkpoint (see 'state_dict' for caveats). This method must only be called as part of scheduler construction. Calling it in the middle of an experiment can lead to an undefined inner state of scheduler or searcher. Examples -------- >>> scheduler.load_state_dict(ag.load('checkpoint.ag')) """ super().load_state_dict(state_dict) with self._fifo_lock: self.searcher = self.searcher.clone_from_state( pickle.loads(state_dict['searcher'])) self.training_history = json.loads(state_dict['training_history']) self.config_history = json.loads(state_dict['config_history']) if self.visualizer == 'mxboard' or self.visualizer == 'tensorboard': self.mxboard._scalar_dict = json.loads(state_dict['visualizer']) logger.debug(f'Loading Searcher State {self.searcher}')