Source code for evalml.pipelines.components.utils

"""Utility methods for EvalML components."""
import inspect

from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin
from sklearn.utils.multiclass import unique_labels
from sklearn.utils.validation import check_is_fitted

from evalml.exceptions import MissingComponentError
from evalml.model_family.utils import handle_model_family
from evalml.pipelines.components.component_base import ComponentBase
from evalml.pipelines.components.estimators.estimator import Estimator
from evalml.pipelines.components.transformers.transformer import Transformer
from evalml.problem_types import ProblemTypes, handle_problem_types
from evalml.utils import get_importable_subclasses

def _all_estimators():
    return get_importable_subclasses(Estimator, used_in_automl=False)

def _all_estimators_used_in_search():
    return get_importable_subclasses(Estimator, used_in_automl=True)

def _all_transformers():
    return get_importable_subclasses(Transformer, used_in_automl=False)

[docs]def all_components(): """Get all available components.""" return _all_estimators() + _all_transformers()
[docs]def allowed_model_families(problem_type): """List the model types allowed for a particular problem type. Args: problem_type (ProblemTypes or str): ProblemTypes enum or string. Returns: list[ModelFamily]: A list of model families. """ estimators = [] problem_type = handle_problem_types(problem_type) for estimator in _all_estimators_used_in_search(): if problem_type in set( handle_problem_types(problem) for problem in estimator.supported_problem_types ): estimators.append(estimator) return list(set([e.model_family for e in estimators]))
[docs]def get_estimators(problem_type, model_families=None): """Returns the estimators allowed for a particular problem type. Can also optionally filter by a list of model types. Args: problem_type (ProblemTypes or str): Problem type to filter for. model_families (list[ModelFamily] or list[str]): Model families to filter for. Returns: list[class]: A list of estimator subclasses. Raises: TypeError: If the model_families parameter is not a list. RuntimeError: If a model family is not valid for the problem type. """ if model_families is not None and not isinstance(model_families, list): raise TypeError("model_families parameter is not a list.") problem_type = handle_problem_types(problem_type) if model_families is None: model_families = allowed_model_families(problem_type) model_families = [ handle_model_family(model_family) for model_family in model_families ] all_model_families = allowed_model_families(problem_type) for model_family in model_families: if model_family not in all_model_families: raise RuntimeError( "Unrecognized model type for problem type %s: %s" % (problem_type, model_family) ) estimator_classes = [] for estimator_class in _all_estimators_used_in_search(): if problem_type not in [ handle_problem_types(supported_pt) for supported_pt in estimator_class.supported_problem_types ]: continue if estimator_class.model_family not in model_families: continue estimator_classes.append(estimator_class) return estimator_classes
[docs]def handle_component_class(component_class): """Standardizes input from a string name to a ComponentBase subclass if necessary. If a str is provided, will attempt to look up a ComponentBase class by that name and return a new instance. Otherwise if a ComponentBase subclass or Component instance is provided, will return that without modification. Args: component_class (str, ComponentBase): Input to be standardized. Returns: ComponentBase Raises: ValueError: If input is not a valid component class. MissingComponentError: If the component cannot be found. """ if isinstance(component_class, ComponentBase) or ( inspect.isclass(component_class) and issubclass(component_class, ComponentBase) ): return component_class if not isinstance(component_class, str): raise ValueError( ( "component_class may only contain str or ComponentBase subclasses, not '{}'" ).format(type(component_class)) ) component_classes = { component for component in all_components()} if component_class not in component_classes: raise MissingComponentError( 'Component "{}" was not found'.format(component_class) ) component_class = component_classes[component_class] return component_class
[docs]class WrappedSKClassifier(BaseEstimator, ClassifierMixin): """Scikit-learn classifier wrapper class.""" def __init__(self, pipeline): """Scikit-learn classifier wrapper class. Takes an EvalML pipeline as input and returns a scikit-learn classifier class wrapping that pipeline. Args: pipeline (PipelineBase or subclass obj): EvalML pipeline. """ self.pipeline = pipeline self._estimator_type = "classifier" if pipeline._is_fitted: self._is_fitted = True self.classes_ = pipeline.classes_
[docs] def fit(self, X, y): """Fits component to data. Args: X (pd.DataFrame or np.ndarray): The input training data of shape [n_samples, n_features]. y (pd.Series, optional): The target training data of length [n_samples]. Returns: self """ self.classes_ = unique_labels(y) self.X_ = X self.y_ = y self.is_fitted_ = True, y) return self
[docs] def predict(self, X): """Make predictions using selected features. Args: X (pd.DataFrame): Features Returns: np.ndarray: Predicted values. """ check_is_fitted(self, "is_fitted_") return self.pipeline.predict(X).to_numpy()
[docs] def predict_proba(self, X): """Make probability estimates for labels. Args: X (pd.DataFrame): Features. Returns: np.ndarray: Probability estimates. """ return self.pipeline.predict_proba(X).to_numpy()
[docs]class WrappedSKRegressor(BaseEstimator, RegressorMixin): """Scikit-learn regressor wrapper class.""" def __init__(self, pipeline): """Scikit-learn regressor wrapper class. Takes an EvalML pipeline as input and returns a scikit-learn regressor class wrapping that pipeline. Args: pipeline (PipelineBase or subclass obj): EvalML pipeline. """ self.pipeline = pipeline self._estimator_type = "regressor" self._is_fitted_ = True # We need an attribute that ends in an underscore for scikit-learn to treat as fitted
[docs] def fit(self, X, y): """Fits component to data. Args: X (pd.DataFrame or np.ndarray): the input training data of shape [n_samples, n_features] y (pd.Series, optional): the target training data of length [n_samples] Returns: self """, y) return self
[docs] def predict(self, X): """Make predictions using selected features. Args: X (pd.DataFrame): Features. Returns: np.ndarray: Predicted values. """ return self.pipeline.predict(X).to_numpy()
[docs]def scikit_learn_wrapped_estimator(evalml_obj): """Wraps an EvalML object as a scikit-learn estimator.""" from evalml.pipelines.pipeline_base import PipelineBase """Wrap an EvalML pipeline or estimator in a scikit-learn estimator.""" if isinstance(evalml_obj, PipelineBase): if evalml_obj.problem_type in [ ProblemTypes.REGRESSION, ProblemTypes.TIME_SERIES_REGRESSION, ]: return WrappedSKRegressor(evalml_obj) elif ( evalml_obj.problem_type == ProblemTypes.BINARY or evalml_obj.problem_type == ProblemTypes.MULTICLASS ): return WrappedSKClassifier(evalml_obj) else: # EvalML Estimator if evalml_obj.supported_problem_types == [ ProblemTypes.REGRESSION, ProblemTypes.TIME_SERIES_REGRESSION, ]: return WrappedSKRegressor(evalml_obj) elif evalml_obj.supported_problem_types == [ ProblemTypes.BINARY, ProblemTypes.MULTICLASS, ProblemTypes.TIME_SERIES_BINARY, ProblemTypes.TIME_SERIES_MULTICLASS, ]: return WrappedSKClassifier(evalml_obj) raise ValueError("Could not wrap EvalML object in scikit-learn wrapper.")
[docs]def generate_component_code(element): """Creates and returns a string that contains the Python imports and code required for running the EvalML component. Args: element (component instance): The instance of the component to generate string Python code for. Returns: String representation of Python code that can be run separately in order to recreate the component instance. Does not include code for custom component implementation. Raises: ValueError: If the input element is not a component instance. """ # hold the imports needed and add code to end code_strings = [] base_string = "" if not isinstance(element, ComponentBase): raise ValueError( "Element must be a component instance, received {}".format(type(element)) ) if element.__class__ in all_components(): code_strings.append( "from {} import {}\n".format( element.__class__.__module__, element.__class__.__name__ ) ) component_parameters = element.parameters name =[0].lower() +[1:].replace(" ", "") base_string += "{0} = {1}(**{2})".format( name, element.__class__.__name__, component_parameters ) code_strings.append(base_string) return "\n".join(code_strings)
[docs]def make_balancing_dictionary(y, sampling_ratio): """Makes dictionary for oversampler components. Find ratio of each class to the majority. If the ratio is smaller than the sampling_ratio, we want to oversample, otherwise, we don't want to sample at all, and we leave the data as is. Args: y (pd.Series): Target data. sampling_ratio (float): The balanced ratio we want the samples to meet. Returns: dict: Dictionary where keys are the classes, and the corresponding values are the counts of samples for each class that will satisfy sampling_ratio. Raises: ValueError: If sampling ratio is not in the range (0, 1] or the target is empty. """ if sampling_ratio <= 0 or sampling_ratio > 1: raise ValueError( "Sampling ratio must be in range (0, 1], received {}".format(sampling_ratio) ) if len(y) == 0: raise ValueError("Target data must not be empty") value_counts = y.value_counts() ratios = value_counts / value_counts.values[0] class_dic = {} sample_amount = int(value_counts.values[0] * sampling_ratio) for index, value in ratios.items(): if value < sampling_ratio: # we want to oversample this class class_dic[index] = sample_amount else: # this class is already larger than the ratio, don't change class_dic[index] = value_counts[index] return class_dic