"""Permutation importance methods."""
import numpy as np
import pandas as pd
from joblib import Parallel, delayed
from evalml.objectives.utils import get_objective
from evalml.problem_types import is_classification
from evalml.problem_types.utils import is_regression
from evalml.utils import import_or_raise, infer_feature_types, jupyter_check
[docs]def calculate_permutation_importance(
pipeline,
X,
y,
objective,
n_repeats=5,
n_jobs=None,
random_seed=0,
):
"""Calculates permutation importance for features.
Args:
pipeline (PipelineBase or subclass): Fitted pipeline.
X (pd.DataFrame): The input data used to score and compute permutation importance.
y (pd.Series): The target data.
objective (str, ObjectiveBase): Objective to score on.
n_repeats (int): Number of times to permute a feature. Defaults to 5.
n_jobs (int or None): Non-negative integer describing level of parallelism used for pipelines.
None and 1 are equivalent. If set to -1, all CPUs are used. For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. Defaults to None.
random_seed (int): Seed for the random number generator. Defaults to 0.
Returns:
pd.DataFrame: Mean feature importance scores over a number of shuffles.
Raises:
ValueError: If objective cannot be used with the given pipeline.
"""
X = infer_feature_types(X)
y = infer_feature_types(y)
objective = get_objective(objective, return_instance=True)
if not objective.is_defined_for_problem_type(pipeline.problem_type):
raise ValueError(
f"Given objective '{objective.name}' cannot be used with '{pipeline.name}'",
)
if pipeline._supports_fast_permutation_importance:
precomputed_features = pipeline.transform_all_but_final(X, y)
perm_importance = _fast_permutation_importance(
pipeline,
X,
y,
objective,
precomputed_features,
n_repeats=n_repeats,
n_jobs=n_jobs,
random_seed=random_seed,
)
else:
perm_importance = _slow_permutation_importance(
pipeline,
X,
y,
objective,
n_repeats=n_repeats,
n_jobs=n_jobs,
random_seed=random_seed,
)
mean_perm_importance = perm_importance["importances_mean"]
feature_names = list(X.columns)
mean_perm_importance = list(zip(feature_names, mean_perm_importance))
mean_perm_importance.sort(key=lambda x: x[1], reverse=True)
return pd.DataFrame(mean_perm_importance, columns=["feature", "importance"])
[docs]def graph_permutation_importance(pipeline, X, y, objective, importance_threshold=0):
"""Generate a bar graph of the pipeline's permutation importance.
Args:
pipeline (PipelineBase or subclass): Fitted pipeline.
X (pd.DataFrame): The input data used to score and compute permutation importance.
y (pd.Series): The target data.
objective (str, ObjectiveBase): Objective to score on.
importance_threshold (float, optional): If provided, graph features with a permutation importance whose absolute value is larger than importance_threshold. Defaults to 0.
Returns:
plotly.Figure, a bar graph showing features and their respective permutation importance.
Raises:
ValueError: If importance_threshold is not greater than or equal to 0.
"""
go = import_or_raise(
"plotly.graph_objects",
error_msg="Cannot find dependency plotly.graph_objects",
)
if jupyter_check():
import_or_raise("ipywidgets", warning=True)
perm_importance = calculate_permutation_importance(pipeline, X, y, objective)
perm_importance["importance"] = perm_importance["importance"]
if importance_threshold < 0:
raise ValueError(
f"Provided importance threshold of {importance_threshold} must be greater than or equal to 0",
)
# Remove features with close to zero importance
perm_importance = perm_importance[
abs(perm_importance["importance"]) >= importance_threshold
]
# List is reversed to go from ascending order to descending order
perm_importance = perm_importance.iloc[::-1]
title = "Permutation Importance"
subtitle = (
"The relative importance of each input feature's "
"overall influence on the pipelines' predictions, computed using "
"the permutation importance algorithm."
)
data = [
go.Bar(
x=perm_importance["importance"],
y=perm_importance["feature"],
orientation="h",
),
]
layout = {
"title": "{0}<br><sub>{1}</sub>".format(title, subtitle),
"height": 800,
"xaxis_title": "Permutation Importance",
"yaxis_title": "Feature",
"yaxis": {"type": "category"},
}
fig = go.Figure(data=data, layout=layout)
return fig
[docs]def calculate_permutation_importance_one_column(
pipeline,
X,
y,
col_name,
objective,
n_repeats=5,
fast=True,
precomputed_features=None,
random_seed=0,
):
"""Calculates permutation importance for one column in the original dataframe.
Args:
pipeline (PipelineBase or subclass): Fitted pipeline.
X (pd.DataFrame): The input data used to score and compute permutation importance.
y (pd.Series): The target data.
col_name (str, int): The column in X to calculate permutation importance for.
objective (str, ObjectiveBase): Objective to score on.
n_repeats (int): Number of times to permute a feature. Defaults to 5.
fast (bool): Whether to use the fast method of calculating the permutation importance or not. Defaults to True.
precomputed_features (pd.DataFrame): Precomputed features necessary to calculate permutation importance using the fast method. Defaults to None.
random_seed (int): Seed for the random number generator. Defaults to 0.
Returns:
float: Mean feature importance scores over a number of shuffles.
Raises:
ValueError: If pipeline does not support fast permutation importance calculation.
ValueError: If precomputed_features is None.
"""
X = infer_feature_types(X)
y = infer_feature_types(y)
objective = get_objective(objective, return_instance=True)
if fast:
if not pipeline._supports_fast_permutation_importance:
raise ValueError(
"Pipeline does not support fast permutation importance calculation",
)
if precomputed_features is None:
raise ValueError(
"Fast method of calculating permutation importance requires precomputed_features",
)
permutation_importance = _fast_permutation_importance(
pipeline,
X,
y,
objective,
precomputed_features,
col_name=col_name,
n_repeats=n_repeats,
random_seed=random_seed,
)
else:
permutation_importance = _slow_permutation_importance(
pipeline,
X,
y,
objective,
col_name=col_name,
n_repeats=n_repeats,
random_seed=random_seed,
)
return permutation_importance["importances_mean"]
def _fast_permutation_importance(
pipeline,
X,
y,
objective,
precomputed_features,
col_name=None,
n_repeats=5,
n_jobs=None,
random_seed=None,
):
"""Calculate permutation importance faster by only computing the estimator features once.
Only used for pipelines that support this optimization.
"""
if is_classification(pipeline.problem_type):
y = pipeline._encode_targets(y)
baseline_score = _fast_scorer(pipeline, precomputed_features, X, y, objective)
if col_name is None:
scores = Parallel(n_jobs=n_jobs)(
delayed(_calculate_permutation_scores_fast)(
pipeline,
precomputed_features,
y,
objective,
col_name,
random_seed,
n_repeats,
_fast_scorer,
baseline_score,
)
for col_name in X.columns
)
importances = baseline_score - np.array(scores)
return {"importances_mean": np.mean(importances, axis=1)}
else:
scores = _calculate_permutation_scores_fast(
pipeline,
precomputed_features,
y,
objective,
col_name,
random_seed,
n_repeats,
_fast_scorer,
baseline_score,
)
importances = baseline_score - np.array(scores)
importances_mean = (
np.mean(importances, axis=1) if col_name is None else np.mean(importances)
)
return {"importances_mean": importances_mean}
def _calculate_permutation_scores_fast(
pipeline,
precomputed_features,
y,
objective,
col_name,
random_seed,
n_repeats,
scorer,
baseline_score,
):
"""Calculate the permutation score when `col_name` is permuted."""
random_state = np.random.RandomState(random_seed)
scores = np.zeros(n_repeats)
# If column is not in the features or provenance, assume the column was dropped
if (
col_name not in precomputed_features.columns
and col_name not in pipeline._get_feature_provenance()
):
return scores + baseline_score
if col_name in precomputed_features.columns:
col_idx = precomputed_features.columns.get_loc(col_name)
else:
col_idx = [
precomputed_features.columns.get_loc(col)
for col in pipeline._get_feature_provenance()[col_name]
]
return _shuffle_and_score_helper(
pipeline,
precomputed_features,
y,
objective,
col_idx,
n_repeats,
scorer,
random_state,
is_fast=True,
)
def _slow_permutation_importance(
pipeline,
X,
y,
objective,
col_name=None,
n_repeats=5,
n_jobs=None,
random_seed=None,
):
"""If `col_name` is not None, calculates permutation importance for only the column with that name.
Otherwise, calculates the permutation importance for all columns in the input dataframe.
"""
baseline_score = _slow_scorer(pipeline, X, y, objective)
if col_name is None:
scores = Parallel(n_jobs=n_jobs)(
delayed(_calculate_permutation_scores_slow)(
pipeline,
X,
y,
col_idx,
objective,
_slow_scorer,
n_repeats,
random_seed,
)
for col_idx in range(X.shape[1])
)
else:
baseline_score = _slow_scorer(pipeline, X, y, objective)
scores = _calculate_permutation_scores_slow(
pipeline,
X,
y,
col_name,
objective,
_slow_scorer,
n_repeats,
random_seed,
)
importances = baseline_score - np.array(scores)
importances_mean = (
np.mean(importances, axis=1) if col_name is None else np.mean(importances)
)
return {"importances_mean": importances_mean}
def _calculate_permutation_scores_slow(
estimator,
X,
y,
col_name,
objective,
scorer,
n_repeats,
random_seed,
):
"""Calculate score when `col_idx` is permuted."""
random_state = np.random.RandomState(random_seed)
col_idx = col_name
if col_name in X.columns:
col_idx = X.columns.get_loc(col_name)
return _shuffle_and_score_helper(
estimator,
X,
y,
objective,
col_idx,
n_repeats,
scorer,
random_state,
is_fast=False,
)
def _shuffle_and_score_helper(
pipeline,
X_features,
y,
objective,
col_idx,
n_repeats,
scorer,
random_state,
is_fast=True,
):
scores = np.zeros(n_repeats)
# This is what sk_permutation_importance does. Useful for thread safety
X_permuted = X_features.copy()
shuffling_idx = np.arange(X_features.shape[0])
for n_round in range(n_repeats):
random_state.shuffle(shuffling_idx)
col = X_permuted.iloc[shuffling_idx, col_idx]
col.index = X_permuted.index
X_permuted.iloc[:, col_idx] = col
X_permuted.ww.init(schema=X_features.ww.schema)
if is_fast:
feature_score = scorer(pipeline, X_permuted, X_features, y, objective)
else:
feature_score = scorer(pipeline, X_permuted, y, objective)
scores[n_round] = feature_score
return scores
def _slow_scorer(pipeline, X, y, objective):
scores = pipeline.score(X, y, objectives=[objective])
return (
scores[objective.name]
if objective.greater_is_better
else -scores[objective.name]
)
def _fast_scorer(pipeline, features, X, y, objective):
if objective.score_needs_proba:
preds = pipeline.estimator.predict_proba(features)
else:
preds = pipeline.estimator.predict(features)
if is_regression(pipeline.problem_type):
preds = pipeline.inverse_transform(preds)
score = pipeline._score(X, y, preds, objective)
return score if objective.greater_is_better else -score