Source code for evalml.pipelines.utils

"""Utility methods for EvalML pipelines."""
import logging

from woodwork import logical_types

from . import (
    TimeSeriesBinaryClassificationPipeline,
    TimeSeriesMulticlassClassificationPipeline,
    TimeSeriesRegressionPipeline,
)
from .binary_classification_pipeline import BinaryClassificationPipeline
from .multiclass_classification_pipeline import (
    MulticlassClassificationPipeline,
)
from .pipeline_base import PipelineBase
from .regression_pipeline import RegressionPipeline

from evalml.data_checks import DataCheckActionCode
from evalml.model_family import ModelFamily
from evalml.pipelines.components import (  # noqa: F401
    CatBoostClassifier,
    CatBoostRegressor,
    ComponentBase,
    DateTimeFeaturizer,
    DelayedFeatureTransformer,
    DropColumns,
    DropNullColumns,
    DropRowsTransformer,
    EmailFeaturizer,
    Estimator,
    Imputer,
    LogTransformer,
    OneHotEncoder,
    Oversampler,
    RandomForestClassifier,
    SklearnStackedEnsembleClassifier,
    SklearnStackedEnsembleRegressor,
    StackedEnsembleClassifier,
    StackedEnsembleRegressor,
    StandardScaler,
    TargetImputer,
    TextFeaturizer,
    Undersampler,
    URLFeaturizer,
)
from evalml.pipelines.components.utils import (
    get_estimators,
    handle_component_class,
)
from evalml.problem_types import (
    ProblemTypes,
    handle_problem_types,
    is_classification,
    is_time_series,
)
from evalml.utils import import_or_raise, infer_feature_types

logger = logging.getLogger(__name__)


def _get_preprocessing_components(
    X, y, problem_type, estimator_class, sampler_name=None
):
    """Given input data, target data and an estimator class, construct a recommended preprocessing chain to be combined with the estimator and trained on the provided data.

    Args:
        X (pd.DataFrame): The input data of shape [n_samples, n_features].
        y (pd.Series): The target data of length [n_samples].
        problem_type (ProblemTypes or str): Problem type.
        estimator_class (class): A class which subclasses Estimator estimator for pipeline.
        sampler_name (str): The name of the sampler component to add to the pipeline. Defaults to None.

    Returns:
        list[Transformer]: A list of applicable preprocessing components to use with the estimator.
    """
    pp_components = []

    all_null_cols = X.columns[X.isnull().all()]
    if len(all_null_cols) > 0:
        pp_components.append(DropNullColumns)

    index_and_unknown_columns = list(
        X.ww.select(["index", "unknown"], return_schema=True).columns
    )
    if len(index_and_unknown_columns) > 0:
        pp_components.append(DropColumns)

    email_columns = list(X.ww.select("EmailAddress", return_schema=True).columns)
    if len(email_columns) > 0:
        pp_components.append(EmailFeaturizer)

    url_columns = list(X.ww.select("URL", return_schema=True).columns)
    if len(url_columns) > 0:
        pp_components.append(URLFeaturizer)

    input_logical_types = {type(lt) for lt in X.ww.logical_types.values()}
    types_imputer_handles = {
        logical_types.Boolean,
        logical_types.Categorical,
        logical_types.Double,
        logical_types.Integer,
        logical_types.URL,
        logical_types.EmailAddress,
    }

    text_columns = list(X.ww.select("NaturalLanguage", return_schema=True).columns)
    if len(text_columns) > 0:
        pp_components.append(TextFeaturizer)

    if len(input_logical_types.intersection(types_imputer_handles)) or len(
        text_columns
    ):
        pp_components.append(Imputer)

    datetime_cols = list(X.ww.select(["Datetime"], return_schema=True).columns)

    add_datetime_featurizer = len(datetime_cols) > 0
    if add_datetime_featurizer and estimator_class.model_family not in [
        ModelFamily.ARIMA,
        ModelFamily.PROPHET,
    ]:
        pp_components.append(DateTimeFeaturizer)

    if (
        is_time_series(problem_type)
        and estimator_class.model_family != ModelFamily.ARIMA
    ):
        pp_components.append(DelayedFeatureTransformer)

    # The URL and EmailAddress Featurizers will create categorical columns
    categorical_cols = list(
        X.ww.select(["category", "URL", "EmailAddress"], return_schema=True).columns
    )
    if len(categorical_cols) > 0 and estimator_class not in {
        CatBoostClassifier,
        CatBoostRegressor,
    }:
        pp_components.append(OneHotEncoder)

    sampler_components = {
        "Undersampler": Undersampler,
        "Oversampler": Oversampler,
    }
    if sampler_name is not None:
        try:
            import_or_raise(
                "imblearn.over_sampling", error_msg="imbalanced-learn is not installed"
            )
            pp_components.append(sampler_components[sampler_name])
        except ImportError:
            logger.warning(
                "Could not import imblearn.over_sampling, so defaulting to use Undersampler"
            )
            pp_components.append(Undersampler)

    if estimator_class.model_family == ModelFamily.LINEAR_MODEL:
        pp_components.append(StandardScaler)

    return pp_components


def _get_pipeline_base_class(problem_type):
    """Returns pipeline base class for problem_type."""
    if problem_type == ProblemTypes.BINARY:
        return BinaryClassificationPipeline
    elif problem_type == ProblemTypes.MULTICLASS:
        return MulticlassClassificationPipeline
    elif problem_type == ProblemTypes.REGRESSION:
        return RegressionPipeline
    elif problem_type == ProblemTypes.TIME_SERIES_REGRESSION:
        return TimeSeriesRegressionPipeline
    elif problem_type == ProblemTypes.TIME_SERIES_BINARY:
        return TimeSeriesBinaryClassificationPipeline
    else:
        return TimeSeriesMulticlassClassificationPipeline


[docs]def make_pipeline( X, y, estimator, problem_type, parameters=None, sampler_name=None, extra_components=None, ): """Given input data, target data, an estimator class and the problem type, generates a pipeline class with a preprocessing chain which was recommended based on the inputs. The pipeline will be a subclass of the appropriate pipeline base class for the specified problem_type. Args: X (pd.DataFrame): The input data of shape [n_samples, n_features]. y (pd.Series): The target data of length [n_samples]. estimator (Estimator): Estimator for pipeline. problem_type (ProblemTypes or str): Problem type for pipeline to generate. parameters (dict): Dictionary with component names as keys and dictionary of that component's parameters as values. An empty dictionary or None implies using all default values for component parameters. sampler_name (str): The name of the sampler component to add to the pipeline. Only used in classification problems. Defaults to None extra_components (list[ComponentBase]): List of extra components to be added after preprocessing components. Defaults to None. Returns: PipelineBase object: PipelineBase instance with dynamically generated preprocessing components and specified estimator. Raises: ValueError: If estimator is not valid for the given problem type, or sampling is not supported for the given problem type. """ X = infer_feature_types(X) y = infer_feature_types(y) problem_type = handle_problem_types(problem_type) if estimator not in get_estimators(problem_type): raise ValueError(f"{estimator.name} is not a valid estimator for problem type") if not is_classification(problem_type) and sampler_name is not None: raise ValueError( f"Sampling is unsupported for problem_type {str(problem_type)}" ) preprocessing_components = _get_preprocessing_components( X, y, problem_type, estimator, sampler_name ) extra_components = extra_components or [] complete_component_list = preprocessing_components + extra_components + [estimator] component_graph = PipelineBase._make_component_dict_from_component_list( complete_component_list ) base_class = _get_pipeline_base_class(problem_type) return base_class( component_graph, parameters=parameters, )
[docs]def generate_pipeline_code(element): """Creates and returns a string that contains the Python imports and code required for running the EvalML pipeline. Args: element (pipeline instance): The instance of the pipeline to generate string Python code. Returns: str: String representation of Python code that can be run separately in order to recreate the pipeline instance. Does not include code for custom component implementation. Raises: ValueError: If element is not a pipeline, or if the pipeline is nonlinear. """ # hold the imports needed and add code to end code_strings = [] if not isinstance(element, PipelineBase): raise ValueError( "Element must be a pipeline instance, received {}".format(type(element)) ) if isinstance(element.component_graph, dict): raise ValueError("Code generation for nonlinear pipelines is not supported yet") code_strings.append( "from {} import {}".format( element.__class__.__module__, element.__class__.__name__ ) ) code_strings.append(repr(element)) return "\n".join(code_strings)
def _make_stacked_ensemble_pipeline( input_pipelines, problem_type, final_estimator=None, n_jobs=-1, random_seed=0, use_sklearn=False, ): """Creates a pipeline with a stacked ensemble estimator. Args: input_pipelines (list(PipelineBase or subclass obj)): List of pipeline instances to use as the base estimators for the stacked ensemble. This must not be None or an empty list or else EnsembleMissingPipelinesError will be raised. problem_type (ProblemType): problem type of pipeline final_estimator (Estimator): Metalearner to use for the ensembler. Defaults to None. n_jobs (int or None): Integer describing level of parallelism used for pipelines. None and 1 are equivalent. If set to -1, all CPUs are used. For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. Defaults to -1. use_sklearn (bool): If True, instantiates a pipeline with the scikit-learn ensembler. Defaults to False. Returns: Pipeline with appropriate stacked ensemble estimator. """ parameters = {} if is_classification(problem_type): if use_sklearn: parameters = { "Sklearn Stacked Ensemble Classifier": { "input_pipelines": input_pipelines, "final_estimator": final_estimator, "n_jobs": n_jobs, } } estimator = SklearnStackedEnsembleClassifier pipeline_name = "Sklearn Stacked Ensemble Classification Pipeline" else: parameters = { "Stacked Ensemble Classifier": { "n_jobs": n_jobs, } } estimator = StackedEnsembleClassifier pipeline_name = "Stacked Ensemble Classification Pipeline" else: if use_sklearn: parameters = { "Sklearn Stacked Ensemble Regressor": { "input_pipelines": input_pipelines, "final_estimator": final_estimator, "n_jobs": n_jobs, } } estimator = SklearnStackedEnsembleRegressor pipeline_name = "Sklearn Stacked Ensemble Regression Pipeline" else: parameters = { "Stacked Ensemble Regressor": { "n_jobs": n_jobs, } } estimator = StackedEnsembleRegressor pipeline_name = "Stacked Ensemble Regression Pipeline" pipeline_class = { ProblemTypes.BINARY: BinaryClassificationPipeline, ProblemTypes.MULTICLASS: MulticlassClassificationPipeline, ProblemTypes.REGRESSION: RegressionPipeline, }[problem_type] if not use_sklearn: def _make_new_component_name(model_type, component_name, idx=None): idx = " " + str(idx) if idx is not None else "" return f"{str(model_type)} Pipeline{idx} - {component_name}" component_graph = {} final_components = [] used_model_families = [] problem_type = None for pipeline in input_pipelines: model_family = pipeline.component_graph[-1].model_family model_family_idx = ( used_model_families.count(model_family) + 1 if used_model_families.count(model_family) > 0 else None ) used_model_families.append(model_family) final_component = None ensemble_y = "y" for name, component_list in pipeline.component_graph.component_dict.items(): new_component_list = [] new_component_name = _make_new_component_name( model_family, name, model_family_idx ) for i, item in enumerate(component_list): if i == 0: fitted_comp = handle_component_class(item) new_component_list.append(fitted_comp) parameters[new_component_name] = pipeline.parameters.get( name, {} ) elif isinstance(item, str) and item not in ["X", "y"]: new_component_list.append( _make_new_component_name( model_family, item, model_family_idx ) ) else: new_component_list.append(item) if i != 0 and item.endswith(".y"): ensemble_y = _make_new_component_name( model_family, item, model_family_idx ) component_graph[new_component_name] = new_component_list final_component = new_component_name final_components.append(final_component) component_graph[estimator.name] = ( [estimator] + [comp + ".x" for comp in final_components] + [ensemble_y] ) return pipeline_class( [estimator] if use_sklearn else component_graph, parameters=parameters, custom_name=pipeline_name, random_seed=random_seed, ) def _make_component_list_from_actions(actions): """Creates a list of components from the input DataCheckAction list. Args: actions (list(DataCheckAction)): List of DataCheckAction objects used to create list of components Returns: list(ComponentBase): List of components used to address the input actions """ components = [] cols_to_drop = [] for action in actions: if action.action_code == DataCheckActionCode.DROP_COL: cols_to_drop.append(action.metadata["column"]) elif action.action_code == DataCheckActionCode.IMPUTE_COL: metadata = action.metadata if metadata["is_target"]: components.append( TargetImputer(impute_strategy=metadata["impute_strategy"]) ) elif action.action_code == DataCheckActionCode.DROP_ROWS: indices = action.metadata["indices"] components.append(DropRowsTransformer(indices_to_drop=indices)) if cols_to_drop: components.append(DropColumns(columns=cols_to_drop)) return components
[docs]def make_timeseries_baseline_pipeline(problem_type, gap, forecast_horizon): """Make a baseline pipeline for time series regression problems. Args: problem_type: One of TIME_SERIES_REGRESSION, TIME_SERIES_MULTICLASS, TIME_SERIES_BINARY gap (int): Non-negative gap parameter. forecast_horizon (int): Positive forecast_horizon parameter. Returns: TimeSeriesPipelineBase, a time series pipeline corresponding to the problem type. """ pipeline_class, pipeline_name = { ProblemTypes.TIME_SERIES_REGRESSION: ( TimeSeriesRegressionPipeline, "Time Series Baseline Regression Pipeline", ), ProblemTypes.TIME_SERIES_MULTICLASS: ( TimeSeriesMulticlassClassificationPipeline, "Time Series Baseline Multiclass Pipeline", ), ProblemTypes.TIME_SERIES_BINARY: ( TimeSeriesBinaryClassificationPipeline, "Time Series Baseline Binary Pipeline", ), }[problem_type] baseline = pipeline_class( component_graph=[ "Delayed Feature Transformer", "Time Series Baseline Estimator", ], custom_name=pipeline_name, parameters={ "pipeline": { "date_index": None, "gap": gap, "max_delay": 0, "forecast_horizon": forecast_horizon, }, "Delayed Feature Transformer": { "max_delay": 0, "gap": gap, "forecast_horizon": forecast_horizon, "delay_target": True, "delay_features": False, }, "Time Series Baseline Estimator": { "gap": gap, "forecast_horizon": forecast_horizon, }, }, ) return baseline