Source code for evalml.automl.automl_algorithm.iterative_algorithm

import inspect
from operator import itemgetter

import numpy as np
from skopt.space import Categorical, Integer, Real

from .automl_algorithm import AutoMLAlgorithm, AutoMLAlgorithmException

from evalml.model_family import ModelFamily
from evalml.pipelines.utils import _make_stacked_ensemble_pipeline

_ESTIMATOR_FAMILY_ORDER = [
    ModelFamily.LINEAR_MODEL,
    ModelFamily.DECISION_TREE,
    ModelFamily.EXTRA_TREES,
    ModelFamily.RANDOM_FOREST,
    ModelFamily.XGBOOST,
    ModelFamily.LIGHTGBM,
    ModelFamily.CATBOOST,
    ModelFamily.ARIMA,
]


[docs]class IterativeAlgorithm(AutoMLAlgorithm): """An automl algorithm which first fits a base round of pipelines with default parameters, then does a round of parameter tuning on each pipeline in order of performance."""
[docs] def __init__( self, allowed_pipelines=None, max_iterations=None, tuner_class=None, random_seed=0, pipelines_per_batch=5, n_jobs=-1, # TODO remove number_features=None, # TODO remove ensembling=False, text_in_ensembling=False, pipeline_params=None, custom_hyperparameters=None, _estimator_family_order=None, ): """An automl algorithm which first fits a base round of pipelines with default parameters, then does a round of parameter tuning on each pipeline in order of performance. Arguments: allowed_pipelines (list(class)): A list of PipelineBase instances indicating the pipelines allowed in the search. The default of None indicates all pipelines for this problem type are allowed. max_iterations (int): The maximum number of iterations to be evaluated. tuner_class (class): A subclass of Tuner, to be used to find parameters for each pipeline. The default of None indicates the SKOptTuner will be used. random_seed (int): Seed for the random number generator. Defaults to 0. pipelines_per_batch (int): The number of pipelines to be evaluated in each batch, after the first batch. n_jobs (int or None): Non-negative integer describing level of parallelism used for pipelines. number_features (int): The number of columns in the input features. ensembling (boolean): If True, runs ensembling in a separate batch after every allowed pipeline class has been iterated over. Defaults to False. text_in_ensembling (boolean): If True and ensembling is True, then n_jobs will be set to 1 to avoid downstream sklearn stacking issues related to nltk. pipeline_params (dict or None): Pipeline-level parameters that should be passed to the proposed pipelines. custom_hyperparameters (dict or None): Custom hyperparameter ranges specified for pipelines to iterate over. _estimator_family_order (list(ModelFamily) or None): specify the sort order for the first batch. Defaults to _ESTIMATOR_FAMILY_ORDER. """ self._estimator_family_order = ( _estimator_family_order or _ESTIMATOR_FAMILY_ORDER ) indices = [] pipelines_to_sort = [] pipelines_end = [] for pipeline in allowed_pipelines or []: if pipeline.model_family in self._estimator_family_order: indices.append( self._estimator_family_order.index(pipeline.model_family) ) pipelines_to_sort.append(pipeline) else: pipelines_end.append(pipeline) pipelines_start = [ pipeline for _, pipeline in ( sorted(zip(indices, pipelines_to_sort), key=lambda pair: pair[0]) or [] ) ] allowed_pipelines = pipelines_start + pipelines_end super().__init__( allowed_pipelines=allowed_pipelines, custom_hyperparameters=custom_hyperparameters, max_iterations=max_iterations, tuner_class=tuner_class, random_seed=random_seed, ) self.pipelines_per_batch = pipelines_per_batch self.n_jobs = n_jobs self.number_features = number_features self._first_batch_results = [] self._best_pipeline_info = {} self.ensembling = ensembling and len(self.allowed_pipelines) > 1 self.text_in_ensembling = text_in_ensembling self._pipeline_params = pipeline_params or {} self._custom_hyperparameters = custom_hyperparameters or {} if custom_hyperparameters and not isinstance(custom_hyperparameters, dict): raise ValueError( f"If custom_hyperparameters provided, must be of type dict. Received {type(custom_hyperparameters)}" ) for param_name_val in self._pipeline_params.values(): for _, param_val in param_name_val.items(): if isinstance(param_val, (Integer, Real, Categorical)): raise ValueError( "Pipeline parameters should not contain skopt.Space variables, please pass them " "to custom_hyperparameters instead!" ) for hyperparam_name_val in self._custom_hyperparameters.values(): for _, hyperparam_val in hyperparam_name_val.items(): if not isinstance(hyperparam_val, (Integer, Real, Categorical)): raise ValueError( "Custom hyperparameters should only contain skopt.Space variables such as Categorical, Integer," " and Real!" )
[docs] def next_batch(self): """Get the next batch of pipelines to evaluate Returns: list(PipelineBase): a list of instances of PipelineBase subclasses, ready to be trained and evaluated. """ if self._batch_number == 1: if len(self._first_batch_results) == 0: raise AutoMLAlgorithmException( "No results were reported from the first batch" ) self._first_batch_results = sorted( self._first_batch_results, key=itemgetter(0) ) next_batch = [] if self._batch_number == 0: next_batch = [ pipeline.new( parameters=self._transform_parameters(pipeline, {}), random_seed=self.random_seed, ) for pipeline in self.allowed_pipelines ] # One after training all pipelines one round elif ( self.ensembling and self._batch_number != 1 and (self._batch_number) % (len(self._first_batch_results) + 1) == 0 ): input_pipelines = [] for pipeline_dict in self._best_pipeline_info.values(): pipeline = pipeline_dict["pipeline"] pipeline_params = pipeline_dict["parameters"] parameters = self._transform_parameters(pipeline, pipeline_params) input_pipelines.append( pipeline.new(parameters=parameters, random_seed=self.random_seed) ) n_jobs_ensemble = 1 if self.text_in_ensembling else self.n_jobs ensemble = _make_stacked_ensemble_pipeline( input_pipelines, input_pipelines[0].problem_type, random_seed=self.random_seed, n_jobs=n_jobs_ensemble, ) next_batch.append(ensemble) else: num_pipelines = ( (len(self._first_batch_results) + 1) if self.ensembling else len(self._first_batch_results) ) idx = (self._batch_number - 1) % num_pipelines pipeline = self._first_batch_results[idx][1] for i in range(self.pipelines_per_batch): proposed_parameters = self._tuners[pipeline.name].propose() parameters = self._transform_parameters(pipeline, proposed_parameters) next_batch.append( pipeline.new(parameters=parameters, random_seed=self.random_seed) ) self._pipeline_number += len(next_batch) self._batch_number += 1 return next_batch
[docs] def add_result(self, score_to_minimize, pipeline, trained_pipeline_results): """Register results from evaluating a pipeline Arguments: score_to_minimize (float): The score obtained by this pipeline on the primary objective, converted so that lower values indicate better pipelines. pipeline (PipelineBase): The trained pipeline object which was used to compute the score. trained_pipeline_results (dict): Results from training a pipeline. """ if pipeline.model_family != ModelFamily.ENSEMBLE: if self.batch_number == 1: try: super().add_result( score_to_minimize, pipeline, trained_pipeline_results ) except ValueError as e: if "is not within the bounds of the space" in str(e): raise ValueError( "Default parameters for components in pipeline {} not in the hyperparameter ranges: {}".format( pipeline.name, e ) ) else: raise (e) else: super().add_result( score_to_minimize, pipeline, trained_pipeline_results ) if self.batch_number == 1: self._first_batch_results.append((score_to_minimize, pipeline)) current_best_score = self._best_pipeline_info.get( pipeline.model_family, {} ).get("mean_cv_score", np.inf) if ( score_to_minimize is not None and score_to_minimize < current_best_score and pipeline.model_family != ModelFamily.ENSEMBLE ): self._best_pipeline_info.update( { pipeline.model_family: { "mean_cv_score": score_to_minimize, "pipeline": pipeline, "parameters": pipeline.parameters, "id": trained_pipeline_results["id"], } } )
def _transform_parameters(self, pipeline, proposed_parameters): """Given a pipeline parameters dict, make sure n_jobs and number_features are set.""" parameters = {} if "pipeline" in self._pipeline_params: parameters["pipeline"] = self._pipeline_params["pipeline"] for name, component_class in pipeline.linearized_component_graph: component_parameters = proposed_parameters.get(name, {}) init_params = inspect.signature(component_class.__init__).parameters # For first batch, pass the pipeline params to the components that need them if name in self._custom_hyperparameters and self._batch_number == 0: for param_name, value in self._custom_hyperparameters[name].items(): if isinstance(value, (Integer, Real)): # get a random value in the space component_parameters[param_name] = value.rvs( random_state=self.random_seed )[0] # Categorical else: component_parameters[param_name] = value.rvs( random_state=self.random_seed ) if name in self._pipeline_params and self._batch_number == 0: for param_name, value in self._pipeline_params[name].items(): component_parameters[param_name] = value # Inspects each component and adds the following parameters when needed if "n_jobs" in init_params: component_parameters["n_jobs"] = self.n_jobs if "number_features" in init_params: component_parameters["number_features"] = self.number_features if ( name in self._pipeline_params and name == "Drop Columns Transformer" and self._batch_number > 0 ): component_parameters["columns"] = self._pipeline_params[name]["columns"] if "pipeline" in self._pipeline_params: for param_name, value in self._pipeline_params["pipeline"].items(): if param_name in init_params: component_parameters[param_name] = value parameters[name] = component_parameters return parameters