Source code for evalml.model_understanding.permutation_importance

"""Permutation importance methods."""
import numpy as np
import pandas as pd
from joblib import Parallel, delayed

from evalml.objectives.utils import get_objective
from evalml.problem_types import is_classification
from evalml.utils import infer_feature_types


[docs]def calculate_permutation_importance( pipeline, X, y, objective, n_repeats=5, n_jobs=None, random_seed=0 ): """Calculates permutation importance for features. Args: pipeline (PipelineBase or subclass): Fitted pipeline. X (pd.DataFrame): The input data used to score and compute permutation importance. y (pd.Series): The target data. objective (str, ObjectiveBase): Objective to score on. n_repeats (int): Number of times to permute a feature. Defaults to 5. n_jobs (int or None): Non-negative integer describing level of parallelism used for pipelines. None and 1 are equivalent. If set to -1, all CPUs are used. For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. Defaults to None. random_seed (int): Seed for the random number generator. Defaults to 0. Returns: pd.DataFrame: Mean feature importance scores over a number of shuffles. Raises: ValueError: If objective cannot be used with the given pipeline. """ X = infer_feature_types(X) y = infer_feature_types(y) objective = get_objective(objective, return_instance=True) if not objective.is_defined_for_problem_type(pipeline.problem_type): raise ValueError( f"Given objective '{objective.name}' cannot be used with '{pipeline.name}'" ) if pipeline._supports_fast_permutation_importance: precomputed_features = pipeline.compute_estimator_features(X, y) perm_importance = _fast_permutation_importance( pipeline, X, y, objective, precomputed_features, n_repeats=n_repeats, n_jobs=n_jobs, random_seed=random_seed, ) else: perm_importance = _slow_permutation_importance( pipeline, X, y, objective, n_repeats=n_repeats, n_jobs=n_jobs, random_seed=random_seed, ) mean_perm_importance = perm_importance["importances_mean"] feature_names = list(X.columns) mean_perm_importance = list(zip(feature_names, mean_perm_importance)) mean_perm_importance.sort(key=lambda x: x[1], reverse=True) return pd.DataFrame(mean_perm_importance, columns=["feature", "importance"])
[docs]def calculate_permutation_importance_one_column( pipeline, X, y, col_name, objective, n_repeats=5, fast=True, precomputed_features=None, random_seed=0, ): """Calculates permutation importance for one column in the original dataframe. Args: pipeline (PipelineBase or subclass): Fitted pipeline. X (pd.DataFrame): The input data used to score and compute permutation importance. y (pd.Series): The target data. col_name (str, int): The column in X to calculate permutation importance for. objective (str, ObjectiveBase): Objective to score on. n_repeats (int): Number of times to permute a feature. Defaults to 5. fast (bool): Whether to use the fast method of calculating the permutation importance or not. Defaults to True. precomputed_features (pd.DataFrame): Precomputed features necessary to calculate permutation importance using the fast method. Defaults to None. random_seed (int): Seed for the random number generator. Defaults to 0. Returns: float: Mean feature importance scores over a number of shuffles. Raises: ValueError: If pipeline does not support fast permutation importance calculation. ValueError: If precomputed_features is None. """ X = infer_feature_types(X) y = infer_feature_types(y) objective = get_objective(objective, return_instance=True) if fast: if not pipeline._supports_fast_permutation_importance: raise ValueError( "Pipeline does not support fast permutation importance calculation" ) if precomputed_features is None: raise ValueError( "Fast method of calculating permutation importance requires precomputed_features" ) permutation_importance = _fast_permutation_importance( pipeline, X, y, objective, precomputed_features, col_name=col_name, n_repeats=n_repeats, random_seed=random_seed, ) else: permutation_importance = _slow_permutation_importance( pipeline, X, y, objective, col_name=col_name, n_repeats=n_repeats, random_seed=random_seed, ) return permutation_importance["importances_mean"]
def _fast_permutation_importance( pipeline, X, y, objective, precomputed_features, col_name=None, n_repeats=5, n_jobs=None, random_seed=None, ): """Calculate permutation importance faster by only computing the estimator features once. Only used for pipelines that support this optimization. """ if is_classification(pipeline.problem_type): y = pipeline._encode_targets(y) baseline_score = _fast_scorer(pipeline, precomputed_features, X, y, objective) if col_name is None: scores = Parallel(n_jobs=n_jobs)( delayed(_calculate_permutation_scores_fast)( pipeline, precomputed_features, y, objective, col_name, random_seed, n_repeats, _fast_scorer, baseline_score, ) for col_name in X.columns ) importances = baseline_score - np.array(scores) return {"importances_mean": np.mean(importances, axis=1)} else: scores = _calculate_permutation_scores_fast( pipeline, precomputed_features, y, objective, col_name, random_seed, n_repeats, _fast_scorer, baseline_score, ) importances = baseline_score - np.array(scores) importances_mean = ( np.mean(importances, axis=1) if col_name is None else np.mean(importances) ) return {"importances_mean": importances_mean} def _calculate_permutation_scores_fast( pipeline, precomputed_features, y, objective, col_name, random_seed, n_repeats, scorer, baseline_score, ): """Calculate the permutation score when `col_name` is permuted.""" random_state = np.random.RandomState(random_seed) scores = np.zeros(n_repeats) # If column is not in the features or provenance, assume the column was dropped if ( col_name not in precomputed_features.columns and col_name not in pipeline._get_feature_provenance() ): return scores + baseline_score if col_name in precomputed_features.columns: col_idx = precomputed_features.columns.get_loc(col_name) else: col_idx = [ precomputed_features.columns.get_loc(col) for col in pipeline._get_feature_provenance()[col_name] ] return _shuffle_and_score_helper( pipeline, precomputed_features, y, objective, col_idx, n_repeats, scorer, random_state, is_fast=True, ) def _slow_permutation_importance( pipeline, X, y, objective, col_name=None, n_repeats=5, n_jobs=None, random_seed=None ): """If `col_name` is not None, calculates permutation importance for only the column with that name. Otherwise, calculates the permutation importance for all columns in the input dataframe. """ baseline_score = _slow_scorer(pipeline, X, y, objective) if col_name is None: scores = Parallel(n_jobs=n_jobs)( delayed(_calculate_permutation_scores_slow)( pipeline, X, y, col_idx, objective, _slow_scorer, n_repeats, random_seed ) for col_idx in range(X.shape[1]) ) else: baseline_score = _slow_scorer(pipeline, X, y, objective) scores = _calculate_permutation_scores_slow( pipeline, X, y, col_name, objective, _slow_scorer, n_repeats, random_seed ) importances = baseline_score - np.array(scores) importances_mean = ( np.mean(importances, axis=1) if col_name is None else np.mean(importances) ) return {"importances_mean": importances_mean} def _calculate_permutation_scores_slow( estimator, X, y, col_name, objective, scorer, n_repeats, random_seed ): """Calculate score when `col_idx` is permuted.""" random_state = np.random.RandomState(random_seed) col_idx = col_name if col_name in X.columns: col_idx = X.columns.get_loc(col_name) return _shuffle_and_score_helper( estimator, X, y, objective, col_idx, n_repeats, scorer, random_state, is_fast=False, ) def _shuffle_and_score_helper( pipeline, X_features, y, objective, col_idx, n_repeats, scorer, random_state, is_fast=True, ): scores = np.zeros(n_repeats) # This is what sk_permutation_importance does. Useful for thread safety X_permuted = X_features.copy() shuffling_idx = np.arange(X_features.shape[0]) for n_round in range(n_repeats): random_state.shuffle(shuffling_idx) col = X_permuted.iloc[shuffling_idx, col_idx] col.index = X_permuted.index X_permuted.iloc[:, col_idx] = col X_permuted.ww.init(schema=X_features.ww.schema) if is_fast: feature_score = scorer(pipeline, X_permuted, X_features, y, objective) else: feature_score = scorer(pipeline, X_permuted, y, objective) scores[n_round] = feature_score return scores def _slow_scorer(pipeline, X, y, objective): scores = pipeline.score(X, y, objectives=[objective]) return ( scores[objective.name] if objective.greater_is_better else -scores[objective.name] ) def _fast_scorer(pipeline, features, X, y, objective): if objective.score_needs_proba: preds = pipeline.estimator.predict_proba(features) else: preds = pipeline.estimator.predict(features) preds = pipeline.inverse_transform(preds) score = pipeline._score(X, y, preds, objective) return score if objective.greater_is_better else -score