Source code for evalml.pipelines.pipeline_base

import copy
import inspect
import os
import re
from abc import ABC, abstractmethod

import cloudpickle
import numpy as np
import pandas as pd

from .components import Estimator
from .components.utils import handle_component_class

from evalml.exceptions import IllFormattedClassNameError, MissingComponentError
from evalml.utils import (
    classproperty,
    get_logger,
    get_random_state,
    import_or_raise,
    log_subtitle,
    log_title
)

logger = get_logger(__file__)


[docs]class PipelineBase(ABC): """Base class for all pipelines.""" @property @classmethod @abstractmethod def component_graph(cls): """Returns list of components representing pipeline graph structure Returns: list(str / ComponentBase subclass): list of ComponentBase subclasses or strings denotes graph structure of this pipeline """ custom_hyperparameters = None custom_name = None problem_type = None
[docs] def __init__(self, parameters, random_state=0): """Machine learning pipeline made out of transformers and a estimator. Required Class Variables: component_graph (list): List of components in order. Accepts strings or ComponentBase subclasses in the list Arguments: parameters (dict): dictionary with component names as keys and dictionary of that component's parameters as values. An empty dictionary {} implies using all default values for component parameters. random_state (int, np.random.RandomState): The random seed/state. Defaults to 0. """ self.random_state = get_random_state(random_state) self.component_graph = [self._instantiate_component(component_class, parameters) for component_class in self.component_graph] self.input_feature_names = {} self.results = {} self.estimator = self.component_graph[-1] if isinstance(self.component_graph[-1], Estimator) else None if self.estimator is None: raise ValueError("A pipeline must have an Estimator as the last component in component_graph.") self._validate_estimator_problem_type()
@classproperty def name(cls): """Returns a name describing the pipeline. By default, this will take the class name and add a space between each capitalized word (class name should be in Pascal Case). If the pipeline has a custom_name attribute, this will be returned instead. """ if cls.custom_name: name = cls.custom_name else: rex = re.compile(r'(?<=[a-z])(?=[A-Z])') name = rex.sub(' ', cls.__name__) if name == cls.__name__: raise IllFormattedClassNameError("Pipeline Class {} needs to follow Pascal Case standards or `custom_name` must be defined.".format(cls.__name__)) return name @classproperty def summary(cls): """Returns a short summary of the pipeline structure, describing the list of components used. Example: Logistic Regression Classifier w/ Simple Imputer + One Hot Encoder """ component_graph = [handle_component_class(component_class) for component_class in copy.copy(cls.component_graph)] if len(component_graph) == 0: return "Empty Pipeline" summary = "Pipeline" component_graph[-1] = component_graph[-1] if inspect.isclass(component_graph[-1]) and issubclass(component_graph[-1], Estimator): estimator_class = component_graph.pop(-1) summary = estimator_class.name if len(component_graph) == 0: return summary component_names = [component_class.name for component_class in component_graph] return '{} w/ {}'.format(summary, ' + '.join(component_names)) def _validate_estimator_problem_type(self): """Validates this pipeline's problem_type against that of the estimator from `self.component_graph`""" estimator_problem_types = self.estimator.supported_problem_types if self.problem_type not in estimator_problem_types: raise ValueError("Problem type {} not valid for this component graph. Valid problem types include {}." .format(self.problem_type, estimator_problem_types)) def _instantiate_component(self, component_class, parameters): """Instantiates components with parameters in `parameters`""" try: component_class = handle_component_class(component_class) except MissingComponentError as e: err = "Error recieved when retrieving class for component '{}'".format(component_class) raise MissingComponentError(err) from e component_name = component_class.name try: component_parameters = parameters.get(component_name, {}) new_component = component_class(**component_parameters, random_state=self.random_state) except (ValueError, TypeError) as e: err = "Error received when instantiating component {} with the following arguments {}".format(component_name, component_parameters) raise ValueError(err) from e return new_component def __getitem__(self, index): if isinstance(index, slice): raise NotImplementedError('Slicing pipelines is currently not supported.') elif isinstance(index, int): return self.component_graph[index] else: return self.get_component(index) def __setitem__(self, index, value): raise NotImplementedError('Setting pipeline components is not supported.')
[docs] def get_component(self, name): """Returns component by name Arguments: name (str): name of component Returns: Component: component to return """ return next((component for component in self.component_graph if component.name == name), None)
[docs] def describe(self): """Outputs pipeline details including component parameters Arguments: return_dict (bool): If True, return dictionary of information about pipeline. Defaults to false Returns: dict: dictionary of all component parameters if return_dict is True, else None """ log_title(logger, self.name) logger.info("Problem Type: {}".format(self.problem_type)) logger.info("Model Family: {}".format(str(self.model_family))) if self.estimator.name in self.input_feature_names: logger.info("Number of features: {}".format(len(self.input_feature_names[self.estimator.name]))) # Summary of steps log_subtitle(logger, "Pipeline Steps") for number, component in enumerate(self.component_graph, 1): component_string = str(number) + ". " + component.name logger.info(component_string) component.describe(print_name=False)
def _transform(self, X): X_t = X for component in self.component_graph[:-1]: X_t = component.transform(X_t) return X_t def _fit(self, X, y): X_t = X y_t = y for component in self.component_graph[:-1]: self.input_feature_names.update({component.name: list(pd.DataFrame(X_t))}) X_t = component.fit_transform(X_t, y_t) self.input_feature_names.update({self.estimator.name: list(pd.DataFrame(X_t))}) self.estimator.fit(X_t, y_t)
[docs] def fit(self, X, y): """Build a model Arguments: X (pd.DataFrame or np.array): the input training data of shape [n_samples, n_features] y (pd.Series): the target training labels of length [n_samples] Returns: self """ if not isinstance(X, pd.DataFrame): X = pd.DataFrame(X) if not isinstance(y, pd.Series): y = pd.Series(y) self._fit(X, y) return self
[docs] def predict(self, X, objective=None): """Make predictions using selected features. Arguments: X (pd.DataFrame or np.array) : data of shape [n_samples, n_features] objective (Object or string): the objective to use to make predictions Returns: pd.Series : estimated labels """ if not isinstance(X, pd.DataFrame): X = pd.DataFrame(X) X_t = self._transform(X) return self.estimator.predict(X_t)
[docs] @abstractmethod def score(self, X, y, objectives): """Evaluate model performance on current and additional objectives Arguments: X (pd.DataFrame or np.array) : data of shape [n_samples, n_features] y (pd.Series) : true labels of length [n_samples] objectives (list): Non-empty list of objectives to score on Returns: dict: ordered dictionary of objective scores """
@staticmethod def _score(X, y, predictions, objective): """Given data, model predictions or predicted probabilities computed on the data, and an objective, evaluate and return the objective score. Will return `np.nan` if the objective errors. """ score = np.nan try: score = objective.score(y, predictions, X) except Exception as e: logger.error('Error in PipelineBase.score while scoring objective {}: {}'.format(objective.name, str(e))) return score @classproperty def model_family(cls): "Returns model family of this pipeline template""" component_graph = copy.copy(cls.component_graph) return handle_component_class(component_graph[-1]).model_family @classproperty def hyperparameters(cls): "Returns hyperparameter ranges from all components as a dictionary" hyperparameter_ranges = dict() component_graph = copy.copy(cls.component_graph) for component_class in component_graph: component_class = handle_component_class(component_class) component_hyperparameters = copy.copy(component_class.hyperparameter_ranges) if cls.custom_hyperparameters and component_class.name in cls.custom_hyperparameters: component_hyperparameters.update(cls.custom_hyperparameters.get(component_class.name, {})) hyperparameter_ranges[component_class.name] = component_hyperparameters return hyperparameter_ranges @property def parameters(self): """Returns parameter dictionary for this pipeline Returns: dict: dictionary of all component parameters """ return {c.name: copy.copy(c.parameters) for c in self.component_graph if c.parameters} @classproperty def default_parameters(cls): """Returns the default parameter dictionary for this pipeline. Returns: dict: dictionary of all component default parameters. """ defaults = {} for c in cls.component_graph: component = handle_component_class(c) if component.default_parameters: defaults[component.name] = component.default_parameters return defaults @property def feature_importance(self): """Return importance associated with each feature. Features dropped by feature selection are excluded""" feature_names = self.input_feature_names[self.estimator.name] importance = list(zip(feature_names, self.estimator.feature_importance)) # note: this only works for binary importance.sort(key=lambda x: -abs(x[1])) df = pd.DataFrame(importance, columns=["feature", "importance"]) return df
[docs] def graph(self, filepath=None): """Generate an image representing the pipeline graph Arguments: filepath (str, optional) : Path to where the graph should be saved. If set to None (as by default), the graph will not be saved. Returns: graphviz.Digraph: Graph object that can be directly displayed in Jupyter notebooks. """ graphviz = import_or_raise('graphviz', error_msg='Please install graphviz to visualize pipelines.') # Try rendering a dummy graph to see if a working backend is installed try: graphviz.Digraph().pipe() except graphviz.backend.ExecutableNotFound: raise RuntimeError( "To graph entity sets, a graphviz backend is required.\n" + "Install the backend using one of the following commands:\n" + " Mac OS: brew install graphviz\n" + " Linux (Ubuntu): sudo apt-get install graphviz\n" + " Windows: conda install python-graphviz\n" ) graph_format = None path_and_name = None if filepath: # Explicitly cast to str in case a Path object was passed in filepath = str(filepath) try: f = open(filepath, 'w') f.close() except (IOError, FileNotFoundError): raise ValueError(('Specified filepath is not writeable: {}'.format(filepath))) path_and_name, graph_format = os.path.splitext(filepath) graph_format = graph_format[1:].lower() # ignore the dot supported_filetypes = graphviz.backend.FORMATS if graph_format not in supported_filetypes: raise ValueError(("Unknown format '{}'. Make sure your format is one of the " + "following: {}").format(graph_format, supported_filetypes)) # Initialize a new directed graph graph = graphviz.Digraph(name=self.name, format=graph_format, graph_attr={'splines': 'ortho'}) graph.attr(rankdir='LR') # Draw components for component in self.component_graph: label = '%s\l' % (component.name) # noqa: W605 if len(component.parameters) > 0: parameters = '\l'.join([key + ' : ' + "{:0.2f}".format(val) if (isinstance(val, float)) else key + ' : ' + str(val) for key, val in component.parameters.items()]) # noqa: W605 label = '%s |%s\l' % (component.name, parameters) # noqa: W605 graph.node(component.name, shape='record', label=label) # Draw edges for i in range(len(self.component_graph[:-1])): graph.edge(self.component_graph[i].name, self.component_graph[i + 1].name) if filepath: graph.render(path_and_name, cleanup=True) return graph
[docs] def graph_feature_importance(self, show_all_features=False): """Generate a bar graph of the pipeline's feature importance Arguments: show_all_features (bool, optional) : If true, graph features with an importance value of zero. Defaults to false. Returns: plotly.Figure, a bar graph showing features and their corresponding importance """ go = import_or_raise("plotly.graph_objects", error_msg="Cannot find dependency plotly.graph_objects") feat_imp = self.feature_importance feat_imp['importance'] = abs(feat_imp['importance']) if not show_all_features: # Remove features with zero importance feat_imp = feat_imp[feat_imp['importance'] != 0] # List is reversed to go from ascending order to descending order feat_imp = feat_imp.iloc[::-1] title = 'Feature Importance' subtitle = 'May display fewer features due to feature selection' data = [go.Bar( x=feat_imp['importance'], y=feat_imp['feature'], orientation='h' )] layout = { 'title': '{0}<br><sub>{1}</sub>'.format(title, subtitle), 'height': 800, 'xaxis_title': 'Feature Importance', 'yaxis_title': 'Feature', 'yaxis': { 'type': 'category' } } fig = go.Figure(data=data, layout=layout) return fig
[docs] def save(self, file_path): """Saves pipeline at file path Arguments: file_path (str) : location to save file Returns: None """ with open(file_path, 'wb') as f: cloudpickle.dump(self, f)
[docs] @staticmethod def load(file_path): """Loads pipeline at file path Arguments: file_path (str) : location to load file Returns: PipelineBase object """ with open(file_path, 'rb') as f: return cloudpickle.load(f)
[docs] def clone(self, random_state=0): """Constructs a new pipeline with the same parameters and components. Arguments: random_state (int): the value to seed the random state with. Can also be a RandomState instance. Defaults to 0. Returns: A new instance of this pipeline with identical parameters and components """ return self.__class__(self.parameters, random_state=random_state)