Source code for evalml.pipelines.components.utils

"""Utility methods for EvalML components."""
import inspect

from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin
from sklearn.utils.multiclass import unique_labels
from sklearn.utils.validation import check_is_fitted

from evalml.exceptions import MissingComponentError
from evalml.model_family.utils import ModelFamily, handle_model_family
from evalml.pipelines.components.component_base import ComponentBase
from evalml.pipelines.components.estimators.estimator import Estimator
from evalml.pipelines.components.transformers.transformer import Transformer
from evalml.problem_types import ProblemTypes, handle_problem_types
from evalml.utils import get_importable_subclasses


def _all_estimators():
    return get_importable_subclasses(Estimator, used_in_automl=False)


def _all_estimators_used_in_search():
    return get_importable_subclasses(Estimator, used_in_automl=True)


def _all_transformers():
    return get_importable_subclasses(Transformer, used_in_automl=False)


[docs]def all_components(): """Get all available components.""" return _all_estimators() + _all_transformers()
[docs]def allowed_model_families(problem_type): """List the model types allowed for a particular problem type. Args: problem_type (ProblemTypes or str): ProblemTypes enum or string. Returns: list[ModelFamily]: A list of model families. """ estimators = [] problem_type = handle_problem_types(problem_type) for estimator in _all_estimators_used_in_search(): if problem_type in set( handle_problem_types(problem) for problem in estimator.supported_problem_types ): estimators.append(estimator) return list(set([e.model_family for e in estimators]))
[docs]def get_estimators(problem_type, model_families=None): """Returns the estimators allowed for a particular problem type. Can also optionally filter by a list of model types. Args: problem_type (ProblemTypes or str): Problem type to filter for. model_families (list[ModelFamily] or list[str]): Model families to filter for. Returns: list[class]: A list of estimator subclasses. Raises: TypeError: If the model_families parameter is not a list. RuntimeError: If a model family is not valid for the problem type. """ if model_families is not None and not isinstance(model_families, list): raise TypeError("model_families parameter is not a list.") problem_type = handle_problem_types(problem_type) if model_families is None: model_families = allowed_model_families(problem_type) model_families = [ handle_model_family(model_family) for model_family in model_families ] all_model_families = allowed_model_families(problem_type) for model_family in model_families: if model_family not in all_model_families: raise RuntimeError( "Unrecognized model type for problem type %s: %s" % (problem_type, model_family) ) estimator_classes = [] for estimator_class in _all_estimators_used_in_search(): if problem_type not in [ handle_problem_types(supported_pt) for supported_pt in estimator_class.supported_problem_types ]: continue if estimator_class.model_family not in model_families: continue estimator_classes.append(estimator_class) return estimator_classes
[docs]def estimator_unable_to_handle_nans(estimator_class): """If True, provided estimator class is unable to handle NaN values as an input. Args: estimator_class (Estimator): Estimator class Raises: ValueError: If estimator is not a valid estimator class. Returns: bool: True if estimator class is unable to process NaN values, False otherwise. """ if not hasattr(estimator_class, "model_family"): raise ValueError("`estimator_class` must have a `model_family` attribute.") return estimator_class.model_family in [ ModelFamily.EXTRA_TREES, ModelFamily.RANDOM_FOREST, ModelFamily.LINEAR_MODEL, ModelFamily.DECISION_TREE, ]
[docs]def handle_component_class(component_class): """Standardizes input from a string name to a ComponentBase subclass if necessary. If a str is provided, will attempt to look up a ComponentBase class by that name and return a new instance. Otherwise if a ComponentBase subclass or Component instance is provided, will return that without modification. Args: component_class (str, ComponentBase): Input to be standardized. Returns: ComponentBase Raises: ValueError: If input is not a valid component class. MissingComponentError: If the component cannot be found. Examples: >>> from evalml.pipelines.components.estimators.regressors.decision_tree_regressor import DecisionTreeRegressor >>> handle_component_class(DecisionTreeRegressor) <class 'evalml.pipelines.components.estimators.regressors.decision_tree_regressor.DecisionTreeRegressor'> >>> handle_component_class("Random Forest Regressor") <class 'evalml.pipelines.components.estimators.regressors.rf_regressor.RandomForestRegressor'> """ if isinstance(component_class, ComponentBase) or ( inspect.isclass(component_class) and issubclass(component_class, ComponentBase) ): return component_class if not isinstance(component_class, str): raise ValueError( ( "component_class may only contain str or ComponentBase subclasses, not '{}'" ).format(type(component_class)) ) component_classes = {component.name: component for component in all_components()} if component_class not in component_classes: raise MissingComponentError( 'Component "{}" was not found'.format(component_class) ) component_class = component_classes[component_class] return component_class
[docs]class WrappedSKClassifier(BaseEstimator, ClassifierMixin): """Scikit-learn classifier wrapper class.""" def __init__(self, pipeline): """Scikit-learn classifier wrapper class. Takes an EvalML pipeline as input and returns a scikit-learn classifier class wrapping that pipeline. Args: pipeline (PipelineBase or subclass obj): EvalML pipeline. """ self.pipeline = pipeline self._estimator_type = "classifier" if pipeline._is_fitted: self._is_fitted = True self.classes_ = pipeline.classes_
[docs] def fit(self, X, y): """Fits component to data. Args: X (pd.DataFrame or np.ndarray): The input training data of shape [n_samples, n_features]. y (pd.Series, optional): The target training data of length [n_samples]. Returns: self """ self.classes_ = unique_labels(y) self.X_ = X self.y_ = y self.is_fitted_ = True self.pipeline.fit(X, y) return self
[docs] def predict(self, X): """Make predictions using selected features. Args: X (pd.DataFrame): Features Returns: np.ndarray: Predicted values. """ check_is_fitted(self, "is_fitted_") return self.pipeline.predict(X).to_numpy()
[docs] def predict_proba(self, X): """Make probability estimates for labels. Args: X (pd.DataFrame): Features. Returns: np.ndarray: Probability estimates. """ return self.pipeline.predict_proba(X).to_numpy()
[docs]class WrappedSKRegressor(BaseEstimator, RegressorMixin): """Scikit-learn regressor wrapper class.""" def __init__(self, pipeline): """Scikit-learn regressor wrapper class. Takes an EvalML pipeline as input and returns a scikit-learn regressor class wrapping that pipeline. Args: pipeline (PipelineBase or subclass obj): EvalML pipeline. """ self.pipeline = pipeline self._estimator_type = "regressor" self._is_fitted_ = True # We need an attribute that ends in an underscore for scikit-learn to treat as fitted
[docs] def fit(self, X, y): """Fits component to data. Args: X (pd.DataFrame or np.ndarray): the input training data of shape [n_samples, n_features] y (pd.Series, optional): the target training data of length [n_samples] Returns: self """ self.pipeline.fit(X, y) return self
[docs] def predict(self, X): """Make predictions using selected features. Args: X (pd.DataFrame): Features. Returns: np.ndarray: Predicted values. """ return self.pipeline.predict(X).to_numpy()
[docs]def scikit_learn_wrapped_estimator(evalml_obj): """Wraps an EvalML object as a scikit-learn estimator.""" from evalml.pipelines.pipeline_base import PipelineBase """Wrap an EvalML pipeline or estimator in a scikit-learn estimator.""" if isinstance(evalml_obj, PipelineBase): if evalml_obj.problem_type in [ ProblemTypes.REGRESSION, ProblemTypes.TIME_SERIES_REGRESSION, ]: return WrappedSKRegressor(evalml_obj) elif ( evalml_obj.problem_type == ProblemTypes.BINARY or evalml_obj.problem_type == ProblemTypes.MULTICLASS ): return WrappedSKClassifier(evalml_obj) else: # EvalML Estimator if evalml_obj.supported_problem_types == [ ProblemTypes.REGRESSION, ProblemTypes.TIME_SERIES_REGRESSION, ]: return WrappedSKRegressor(evalml_obj) elif evalml_obj.supported_problem_types == [ ProblemTypes.BINARY, ProblemTypes.MULTICLASS, ProblemTypes.TIME_SERIES_BINARY, ProblemTypes.TIME_SERIES_MULTICLASS, ]: return WrappedSKClassifier(evalml_obj) raise ValueError("Could not wrap EvalML object in scikit-learn wrapper.")
[docs]def generate_component_code(element): r"""Creates and returns a string that contains the Python imports and code required for running the EvalML component. Args: element (component instance): The instance of the component to generate string Python code for. Returns: String representation of Python code that can be run separately in order to recreate the component instance. Does not include code for custom component implementation. Raises: ValueError: If the input element is not a component instance. Examples: >>> from evalml.pipelines.components.estimators.regressors.decision_tree_regressor import DecisionTreeRegressor >>> assert generate_component_code(DecisionTreeRegressor()) == "from evalml.pipelines.components.estimators.regressors.decision_tree_regressor import DecisionTreeRegressor\n\ndecisionTreeRegressor = DecisionTreeRegressor(**{'criterion': 'mse', 'max_features': 'auto', 'max_depth': 6, 'min_samples_split': 2, 'min_weight_fraction_leaf': 0.0})" ... >>> from evalml.pipelines.components.transformers.imputers.simple_imputer import SimpleImputer >>> assert generate_component_code(SimpleImputer()) == "from evalml.pipelines.components.transformers.imputers.simple_imputer import SimpleImputer\n\nsimpleImputer = SimpleImputer(**{'impute_strategy': 'most_frequent', 'fill_value': None})" """ # hold the imports needed and add code to end code_strings = [] base_string = "" if not isinstance(element, ComponentBase): raise ValueError( "Element must be a component instance, received {}".format(type(element)) ) if element.__class__ in all_components(): code_strings.append( "from {} import {}\n".format( element.__class__.__module__, element.__class__.__name__ ) ) component_parameters = element.parameters name = element.name[0].lower() + element.name[1:].replace(" ", "") base_string += "{0} = {1}(**{2})".format( name, element.__class__.__name__, component_parameters ) code_strings.append(base_string) return "\n".join(code_strings)
[docs]def make_balancing_dictionary(y, sampling_ratio): """Makes dictionary for oversampler components. Find ratio of each class to the majority. If the ratio is smaller than the sampling_ratio, we want to oversample, otherwise, we don't want to sample at all, and we leave the data as is. Args: y (pd.Series): Target data. sampling_ratio (float): The balanced ratio we want the samples to meet. Returns: dict: Dictionary where keys are the classes, and the corresponding values are the counts of samples for each class that will satisfy sampling_ratio. Raises: ValueError: If sampling ratio is not in the range (0, 1] or the target is empty. Examples: >>> import pandas as pd >>> y = pd.Series([1] * 4 + [2] * 8 + [3]) >>> assert make_balancing_dictionary(y, 0.5) == {2: 8, 1: 4, 3: 4} >>> assert make_balancing_dictionary(y, 0.9) == {2: 8, 1: 7, 3: 7} >>> assert make_balancing_dictionary(y, 0.1) == {2: 8, 1: 4, 3: 1} """ if sampling_ratio <= 0 or sampling_ratio > 1: raise ValueError( "Sampling ratio must be in range (0, 1], received {}".format(sampling_ratio) ) if len(y) == 0: raise ValueError("Target data must not be empty") value_counts = y.value_counts() ratios = value_counts / value_counts.values[0] class_dic = {} sample_amount = int(value_counts.values[0] * sampling_ratio) for index, value in ratios.items(): if value < sampling_ratio: # we want to oversample this class class_dic[index] = sample_amount else: # this class is already larger than the ratio, don't change class_dic[index] = value_counts[index] return class_dic