Source code for evalml.automl.automl_algorithm.default_algorithm

"""An automl algorithm that consists of two modes: fast and long, where fast is a subset of long."""
import logging

import numpy as np

from evalml.automl.automl_algorithm.automl_algorithm import AutoMLAlgorithm
from evalml.model_family import ModelFamily
from evalml.pipelines.components import (
    EmailFeaturizer,
    OneHotEncoder,
    RFClassifierSelectFromModel,
    RFRegressorSelectFromModel,
    URLFeaturizer,
)
from evalml.pipelines.components.transformers.column_selectors import (
    SelectByType,
    SelectColumns,
)
from evalml.pipelines.components.transformers.encoders.ordinal_encoder import (
    OrdinalEncoder,
)
from evalml.pipelines.components.utils import get_estimators, handle_component_class
from evalml.pipelines.utils import (
    _get_sampler,
    _make_pipeline_from_multiple_graphs,
    make_pipeline,
)
from evalml.problem_types import is_multiseries, is_regression, is_time_series
from evalml.utils import infer_feature_types
from evalml.utils.logger import get_logger


[docs]class DefaultAlgorithm(AutoMLAlgorithm): """An automl algorithm that consists of two modes: fast and long, where fast is a subset of long. 1. Naive pipelines: a. run baseline with default preprocessing pipeline b. run naive linear model with default preprocessing pipeline c. run basic RF pipeline with default preprocessing pipeline 2. Naive pipelines with feature selection a. subsequent pipelines will use the selected features with a SelectedColumns transformer At this point we have a single pipeline candidate for preprocessing and feature selection 3. Pipelines with preprocessing components: a. scan rest of estimators (our current batch 1). 4. First ensembling run Fast mode ends here. Begin long mode. 6. Run top 3 estimators: a. Generate 50 random parameter sets. Run all 150 in one batch 7. Second ensembling run 8. Repeat these indefinitely until stopping criterion is met: a. For each of the previous top 3 estimators, sample 10 parameters from the tuner. Run all 30 in one batch b. Run ensembling Args: X (pd.DataFrame): Training data. y (pd.Series): Target data. problem_type (ProblemType): Problem type associated with training data. sampler_name (BaseSampler): Sampler to use for preprocessing. tuner_class (class): A subclass of Tuner, to be used to find parameters for each pipeline. The default of None indicates the SKOptTuner will be used. random_seed (int): Seed for the random number generator. Defaults to 0. search_parameters (dict or None): Pipeline-level parameters and custom hyperparameter ranges specified for pipelines to iterate over. Hyperparameter ranges must be passed in as skopt.space objects. Defaults to None. n_jobs (int or None): Non-negative integer describing level of parallelism used for pipelines. Defaults to -1. text_in_ensembling (boolean): If True and ensembling is True, then n_jobs will be set to 1 to avoid downstream sklearn stacking issues related to nltk. Defaults to False. top_n (int): top n number of pipelines to use for long mode. num_long_explore_pipelines (int): number of pipelines to explore for each top n pipeline at the start of long mode. num_long_pipelines_per_batch (int): number of pipelines per batch for each top n pipeline through long mode. allow_long_running_models (bool): Whether or not to allow longer-running models for large multiclass problems. If False and no pipelines, component graphs, or model families are provided, AutoMLSearch will not use Elastic Net or XGBoost when there are more than 75 multiclass targets and will not use CatBoost when there are more than 150 multiclass targets. Defaults to False. features (list)[FeatureBase]: List of features to run DFS on in AutoML pipelines. Defaults to None. Features will only be computed if the columns used by the feature exist in the input and if the feature has not been computed yet. run_feature_selection (bool): If True, will run a separate feature selection pipeline and only use selected features in subsequent batches. If False, will use all of the features for every pipeline. Only used for default algorithm. verbose (boolean): Whether or not to display logging information regarding pipeline building. Defaults to False. exclude_featurizers (list[str]): A list of featurizer components to exclude from the pipelines built by DefaultAlgorithm. Valid options are "DatetimeFeaturizer", "EmailFeaturizer", "URLFeaturizer", "NaturalLanguageFeaturizer", "TimeSeriesFeaturizer" allowed_model_families (list(str, ModelFamily)): The model families to search. The default of None searches over all model families. Run evalml.pipelines.components.utils.allowed_model_families("binary") to see options. Change `binary` to `multiclass` or `regression` depending on the problem type. For default algorithm, this only applies to estimators in the non-naive batches. excluded_model_families (list(str, ModelFamily)): A list of model families to exclude from the estimators used when building pipelines. For default algorithm, this only excludes estimators in the non-naive batches. """ def __init__( self, X, y, problem_type, sampler_name, allowed_model_families=None, excluded_model_families=None, tuner_class=None, random_seed=0, search_parameters=None, n_jobs=1, text_in_ensembling=False, top_n=3, ensembling=False, num_long_explore_pipelines=50, num_long_pipelines_per_batch=10, allow_long_running_models=False, features=None, run_feature_selection=True, verbose=False, exclude_featurizers=None, ): super().__init__( allowed_pipelines=[], allowed_model_families=allowed_model_families, excluded_model_families=excluded_model_families, allowed_component_graphs=None, search_parameters=search_parameters, tuner_class=None, random_seed=random_seed, ) self.X = infer_feature_types(X) self.y = infer_feature_types(y) self.problem_type = problem_type self.sampler_name = sampler_name self.n_jobs = n_jobs self._best_pipeline_info = {} self.text_in_ensembling = text_in_ensembling self.search_parameters = search_parameters or {} self._top_n_pipelines = None self.num_long_explore_pipelines = num_long_explore_pipelines self.num_long_pipelines_per_batch = num_long_pipelines_per_batch self.top_n = top_n self.verbose = verbose self._selected_cat_cols = [] self._split = False self.allow_long_running_models = allow_long_running_models self._X_with_cat_cols = None self._X_without_cat_cols = None self.features = features self.run_feature_selection = run_feature_selection self.ensembling = ensembling self.exclude_featurizers = exclude_featurizers or [] if allowed_model_families is not None and excluded_model_families is not None: raise ValueError( "Both `allowed_model_families` and `excluded_model_families` cannot be set.", ) self.allowed_model_families = allowed_model_families self.excluded_model_families = excluded_model_families # TODO remove on resolution of 3186 if is_time_series(self.problem_type) and self.ensembling: raise ValueError( "Ensembling is not available for time series problems in DefaultAlgorithm.", ) if verbose: self.logger = get_logger(f"{__name__}.verbose") else: self.logger = logging.getLogger(__name__) if search_parameters and not isinstance(search_parameters, dict): raise ValueError( f"If search_parameters provided, must be of type dict. Received {type(search_parameters)}", ) self._set_additional_pipeline_params() self._separate_hyperparameters_from_parameters() @property def default_max_batches(self): """Returns the number of max batches AutoMLSearch should run by default.""" if self.ensembling: return 3 elif is_multiseries(self.problem_type): return 1 else: return 2
[docs] def num_pipelines_per_batch(self, batch_number): """Return the number of pipelines in the nth batch. Args: batch_number (int): which batch to calculate the number of pipelines for. Returns: int: number of pipelines in the given batch. """ if batch_number == 0: return len(self._naive_estimators()) elif batch_number == 1: return len(self._non_naive_estimators()) if self.ensembling: if batch_number % 2 == 0: return 1 elif batch_number == 3: return self.num_long_explore_pipelines * self.top_n else: if batch_number == 2: return self.num_long_explore_pipelines * self.top_n return self.num_long_pipelines_per_batch * self.top_n
def _naive_estimators(self): if is_regression(self.problem_type): naive_estimators = [ "Random Forest Regressor", ] else: naive_estimators = [ "Random Forest Classifier", ] estimators = [ handle_component_class(estimator) for estimator in naive_estimators ] return estimators def _non_naive_estimators(self): return [ est for est in get_estimators( self.problem_type, model_families=self.allowed_model_families, excluded_model_families=self.excluded_model_families, ) if est not in self._naive_estimators() ] def _init_pipelines_with_starter_params(self, pipelines): next_batch = [] for pipeline in pipelines: self._create_tuner(pipeline) starting_parameters = self._tuners[pipeline.name].get_starting_parameters( self._hyperparameters, self.random_seed, ) parameters = self._transform_parameters(pipeline, starting_parameters) next_batch.append( pipeline.new(parameters=parameters, random_seed=self.random_seed), ) return next_batch def _create_naive_pipelines(self, use_features=False): feature_selector = None if use_features: feature_selector = [ ( RFRegressorSelectFromModel if is_regression(self.problem_type) else RFClassifierSelectFromModel ), ] else: feature_selector = [] estimators = self._naive_estimators() pipelines = [ make_pipeline( X=self.X, y=self.y, estimator=estimator, problem_type=self.problem_type, sampler_name=self.sampler_name, extra_components_after=feature_selector, parameters=self._pipeline_parameters, known_in_advance=self._pipeline_parameters.get("pipeline", {}).get( "known_in_advance", None, ), features=self.features, exclude_featurizers=self.exclude_featurizers, ) for estimator in estimators ] pipelines = self._add_without_pipelines(pipelines, estimators, feature_selector) pipelines = self._init_pipelines_with_starter_params(pipelines) return pipelines def _add_without_pipelines(self, pipelines, estimators, feature_selector=[]): if ( len(pipelines) and "STL Decomposer" in pipelines[-1].component_graph.compute_order ): without_pipelines = [ make_pipeline( X=self.X, y=self.y, estimator=estimator, problem_type=self.problem_type, sampler_name=self.sampler_name, extra_components_after=feature_selector, parameters=self._pipeline_parameters, known_in_advance=self._pipeline_parameters.get("pipeline", {}).get( "known_in_advance", None, ), features=self.features, exclude_featurizers=self.exclude_featurizers, include_decomposer=False, ) for estimator in estimators ] pipelines = pipelines + without_pipelines return pipelines def _find_component_names(self, original_name, pipeline): names = [] for component in pipeline.component_graph.compute_order: split = component.split(" - ") split = split[1] if len(split) > 1 else split[0] if original_name == split: names.append(component) return names def _create_split_select_parameters(self): parameters = { "Categorical Pipeline - Select Columns Transformer": { "columns": self._selected_cat_cols, }, "Numeric Pipeline - Select Columns By Type Transformer": { "column_types": ["category", "EmailAddress", "URL"], "exclude": True, }, "Numeric Pipeline - Select Columns Transformer": { "columns": self._selected_cols, }, } return parameters def _create_select_parameters(self): parameters = {} if self._selected_cols: parameters = { "Select Columns Transformer": {"columns": self._selected_cols}, } elif self._selected_cat_cols: parameters = { "Select Columns Transformer": {"columns": self._selected_cat_cols}, } if self._split: parameters = self._create_split_select_parameters() return parameters def _find_component_names_from_parameters(self, old_names, pipelines): new_names = {} for component_name in old_names: for pipeline in pipelines: new_name = self._find_component_names(component_name, pipeline) if new_name: for name in new_name: if name not in new_names: new_names[name] = old_names[component_name] return new_names def _rename_pipeline_search_parameters(self, pipelines): names_to_value_pipeline_params = self._find_component_names_from_parameters( self.search_parameters, pipelines, ) self.search_parameters.update(names_to_value_pipeline_params) self._separate_hyperparameters_from_parameters() def _create_fast_final(self): estimators = self._non_naive_estimators() estimators = self._filter_estimators( estimators, self.problem_type, self.allow_long_running_models, None, self.y.nunique(), self.logger, ) pipelines = self._make_pipelines_helper(estimators) if self._split: self._rename_pipeline_search_parameters(pipelines) next_batch = self._create_n_pipelines( pipelines, 1, create_starting_parameters=True, ) return next_batch def _create_n_pipelines(self, pipelines, n, create_starting_parameters=False): next_batch = [] for _ in range(n): for pipeline in pipelines: if pipeline.name not in self._tuners: self._create_tuner(pipeline) parameters = ( self._tuners[pipeline.name].get_starting_parameters( self._hyperparameters, self.random_seed, ) if create_starting_parameters else self._tuners[pipeline.name].propose() ) parameters = self._transform_parameters(pipeline, parameters) if self.run_feature_selection: select_parameters = self._create_select_parameters() parameters.update(select_parameters) next_batch.append( pipeline.new(parameters=parameters, random_seed=self.random_seed), ) return next_batch def _create_long_exploration(self, n): estimators = [ (pipeline_dict["pipeline"].estimator, pipeline_dict["mean_cv_score"]) for pipeline_dict in self._best_pipeline_info.values() ] estimators.sort(key=lambda x: x[1]) estimators = estimators[:n] estimators = [estimator[0].__class__ for estimator in estimators] pipelines = self._make_pipelines_helper(estimators) self._top_n_pipelines = pipelines return self._create_n_pipelines(pipelines, self.num_long_explore_pipelines) def _make_pipelines_helper(self, estimators): pipelines = [] if is_time_series(self.problem_type): pipelines = [ make_pipeline( X=self.X, y=self.y, estimator=estimator, problem_type=self.problem_type, sampler_name=self.sampler_name, parameters=self._pipeline_parameters, known_in_advance=self.search_parameters.get("pipeline", {}).get( "known_in_advance", None, ), features=self.features, exclude_featurizers=self.exclude_featurizers, ) for estimator in estimators ] pipelines = self._add_without_pipelines(pipelines, estimators) else: pipelines = [ self._make_split_pipeline(estimator) for estimator in estimators ] return pipelines
[docs] def next_batch(self): """Get the next batch of pipelines to evaluate. Returns: list(PipelineBase): a list of instances of PipelineBase subclasses, ready to be trained and evaluated. """ if self.ensembling: if self._batch_number == 0: next_batch = self._create_naive_pipelines( use_features=self.run_feature_selection, ) elif self._batch_number == 1: next_batch = self._create_fast_final() elif self.batch_number == 2: next_batch = self._create_ensemble( self._pipeline_parameters.get("Label Encoder", {}), ) elif self.batch_number == 3: next_batch = self._create_long_exploration(n=self.top_n) elif self.batch_number % 2 == 0: next_batch = self._create_ensemble( self._pipeline_parameters.get("Label Encoder", {}), ) else: next_batch = self._create_n_pipelines( self._top_n_pipelines, self.num_long_pipelines_per_batch, ) # this logic needs to be updated once time series also supports ensembling elif is_time_series(self.problem_type): # Skip the naive batch for multiseries time series batch = ( self._batch_number if not is_multiseries(self.problem_type) else self._batch_number + 1 ) if batch == 0: next_batch = self._create_naive_pipelines() elif batch == 1: next_batch = self._create_fast_final() elif batch == 2: next_batch = self._create_long_exploration(n=self.top_n) else: next_batch = self._create_n_pipelines( self._top_n_pipelines, self.num_long_pipelines_per_batch, ) else: if self._batch_number == 0: next_batch = self._create_naive_pipelines( use_features=self.run_feature_selection, ) elif self._batch_number == 1: next_batch = self._create_fast_final() elif self.batch_number == 2: next_batch = self._create_long_exploration(n=self.top_n) else: next_batch = self._create_n_pipelines( self._top_n_pipelines, self.num_long_pipelines_per_batch, ) self._pipeline_number += len(next_batch) self._batch_number += 1 return next_batch
def _get_feature_provenance_and_remove_engineered_features( self, pipeline, component_name, to_be_removed, to_be_added, ): component = pipeline.get_component(component_name) feature_provenance = component._get_feature_provenance() for original_col in feature_provenance: selected = False for encoded_col in feature_provenance[original_col]: if encoded_col in to_be_removed: selected = True to_be_removed.remove(encoded_col) if selected: to_be_added.append(original_col) def _parse_selected_categorical_features(self, pipeline): # Ordinal will always be categorical in nature, but it won't have OneHotEncoded features made for it if list(self.X.ww.select("Ordinal", return_schema=True).columns): self._get_feature_provenance_and_remove_engineered_features( pipeline, OrdinalEncoder.name, self._selected_cols, self._selected_cat_cols, ) elif list(self.X.ww.select("category", return_schema=True).columns): self._get_feature_provenance_and_remove_engineered_features( pipeline, OneHotEncoder.name, self._selected_cols, self._selected_cat_cols, ) if ( list(self.X.ww.select("URL", return_schema=True).columns) and "URLFeaturizer" not in self.exclude_featurizers ): self._get_feature_provenance_and_remove_engineered_features( pipeline, URLFeaturizer.name, self._selected_cat_cols, self._selected_cat_cols, ) if ( list(self.X.ww.select("EmailAddress", return_schema=True).columns) and "EmailFeaturizer" not in self.exclude_featurizers ): self._get_feature_provenance_and_remove_engineered_features( pipeline, EmailFeaturizer.name, self._selected_cat_cols, self._selected_cat_cols, )
[docs] def add_result( self, score_to_minimize, pipeline, trained_pipeline_results, cached_data=None, ): """Register results from evaluating a pipeline. In batch number 2, the selected column names from the feature selector are taken to be used in a column selector. Information regarding the best pipeline is updated here as well. Args: score_to_minimize (float): The score obtained by this pipeline on the primary objective, converted so that lower values indicate better pipelines. pipeline (PipelineBase): The trained pipeline object which was used to compute the score. trained_pipeline_results (dict): Results from training a pipeline. cached_data (dict): A dictionary of cached data, where the keys are the model family. Expected to be of format {model_family: {hash1: trained_component_graph, hash2: trained_component_graph...}...}. Defaults to None. """ cached_data = cached_data or {} if pipeline.model_family != ModelFamily.ENSEMBLE: if self.batch_number >= 2: super().add_result( score_to_minimize, pipeline, trained_pipeline_results, ) if ( self.batch_number == 1 and self._selected_cols is None and not is_time_series(self.problem_type) ): if is_regression(self.problem_type): self._selected_cols = pipeline.get_component( "RF Regressor Select From Model", ).get_names() else: self._selected_cols = pipeline.get_component( "RF Classifier Select From Model", ).get_names() self._parse_selected_categorical_features(pipeline) current_best_score = self._best_pipeline_info.get( pipeline.model_family, {}, ).get("mean_cv_score", np.inf) if ( score_to_minimize is not None and score_to_minimize < current_best_score and pipeline.model_family != ModelFamily.ENSEMBLE ): self._best_pipeline_info.update( { pipeline.model_family: { "mean_cv_score": score_to_minimize, "pipeline": pipeline, "parameters": pipeline.parameters, "id": trained_pipeline_results["id"], "cached_data": cached_data, }, }, )
def _make_split_pipeline(self, estimator, pipeline_name=None): # Should be category, not categorical so that we make sure to exclude # all logical types with the "category" tag numeric_exclude_types = ["category", "EmailAddress", "URL"] if ( self.run_feature_selection and self._X_with_cat_cols is None or self._X_without_cat_cols is None ): self._X_without_cat_cols = self.X.ww.select( exclude=numeric_exclude_types, ) self._X_with_cat_cols = self.X.ww[self._selected_cat_cols] if ( self.run_feature_selection and self._selected_cat_cols and self._selected_cols ): self._split = True categorical_pipeline_parameters = { "Select Columns Transformer": {"columns": self._selected_cat_cols}, } numeric_pipeline_parameters = { "Select Columns Transformer": {"columns": self._selected_cols}, "Select Columns By Type Transformer": { "column_types": numeric_exclude_types, "exclude": True, }, } categorical_pipeline = make_pipeline( self._X_with_cat_cols, self.y, estimator, self.problem_type, sampler_name=None, parameters=categorical_pipeline_parameters, extra_components_before=[SelectColumns], use_estimator=False, exclude_featurizers=self.exclude_featurizers, ) numeric_pipeline = make_pipeline( self._X_without_cat_cols, self.y, estimator, self.problem_type, sampler_name=None, parameters=numeric_pipeline_parameters, extra_components_before=[SelectByType], extra_components_after=[SelectColumns], use_estimator=False, exclude_featurizers=self.exclude_featurizers, ) pre_pipeline_components = ( {"DFS Transformer": ["DFS Transformer", "X", "y"]} if self.features else {} ) if self.sampler_name: sampler = _get_sampler( self.X, self.y, self.problem_type, estimator, self.sampler_name, )[0] post_pipelines_components = {sampler.name: [sampler.name, "X", "y"]} else: post_pipelines_components = None input_pipelines = [numeric_pipeline, categorical_pipeline] sub_pipeline_names = { numeric_pipeline.name: "Numeric", categorical_pipeline.name: "Categorical", } return _make_pipeline_from_multiple_graphs( input_pipelines, estimator, self.problem_type, pipeline_name=pipeline_name, random_seed=self.random_seed, sub_pipeline_names=sub_pipeline_names, pre_pipeline_components=pre_pipeline_components, post_pipelines_components=post_pipelines_components, ) elif ( self.run_feature_selection and self._selected_cat_cols and not self._selected_cols ): categorical_pipeline_parameters = { "Select Columns Transformer": {"columns": self._selected_cat_cols}, } categorical_pipeline = make_pipeline( self._X_with_cat_cols, self.y, estimator, self.problem_type, sampler_name=self.sampler_name, parameters=categorical_pipeline_parameters, extra_components_before=[SelectColumns], features=self.features, exclude_featurizers=self.exclude_featurizers, ) return categorical_pipeline elif self.run_feature_selection: numeric_pipeline_parameters = { "Select Columns Transformer": {"columns": self._selected_cols}, } numeric_pipeline = make_pipeline( self.X, self.y, estimator, self.problem_type, sampler_name=self.sampler_name, parameters=numeric_pipeline_parameters, extra_components_after=[SelectColumns], features=self.features, exclude_featurizers=self.exclude_featurizers, ) return numeric_pipeline else: pipeline = make_pipeline( self.X, self.y, estimator, self.problem_type, sampler_name=self.sampler_name, exclude_featurizers=self.exclude_featurizers, ) return pipeline