Source code for evalml.guardrails.utils

import pandas as pd
from sklearn.ensemble import IsolationForest


[docs]def detect_label_leakage(X, y, threshold=.95): """Check if any of the features are highly correlated with the target. Currently only supports binary and numeric targets and features Args: X (pd.DataFrame): The input features to check y (pd.Series): the labels threshold (float): the correlation threshold to be considered leakage. Defaults to .95 Returns: leakage, dictionary of features with leakage and corresponding threshold Example: >>> X = pd.DataFrame({ ... 'leak': [10, 42, 31, 51, 61], ... 'x': [42, 54, 12, 64, 12], ... 'y': [12, 5, 13, 74, 24], ... }) >>> y = pd.Series([10, 42, 31, 51, 40]) >>> detect_label_leakage(X, y, threshold=0.8) {'leak': 0.8827072320669518} """ # only select numeric numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64', 'bool'] X = X.select_dtypes(include=numerics) if len(X.columns) == 0: return {} corrs = {label: abs(y.corr(col)) for label, col in X.iteritems() if abs(y.corr(col)) >= threshold} return corrs
[docs]def detect_highly_null(X, percent_threshold=.95): """ Checks if there are any highly-null columns in a dataframe. Args: X (pd.DataFrame) : features percent_threshold(float): Require that percentage of null values to be considered "highly-null", defaults to .95 Returns: A dictionary of features with column name or index and their percentage of null values Example: >>> df = pd.DataFrame({ ... 'lots_of_null': [None, None, None, None, 5], ... 'no_null': [1, 2, 3, 4, 5] ... }) >>> detect_highly_null(df, percent_threshold=0.8) {'lots_of_null': 0.8} """ if not isinstance(X, pd.DataFrame): X = pd.DataFrame(X) percent_null = (X.isnull().mean()).to_dict() highly_null_cols = {key: value for key, value in percent_null.items() if value >= percent_threshold} return highly_null_cols
[docs]def detect_outliers(X, random_state=0): """ Checks if there are any outliers in a dataframe by using first Isolation Forest to obtain the anomaly score of each index and then using IQR to determine score anomalies. Indices with score anomalies are considered outliers. Args: X (pd.DataFrame): features Returns: A set of indices that may have outlier data. Example: >>> df = pd.DataFrame({ ... 'x': [1, 2, 3, 40, 5], ... 'y': [6, 7, 8, 990, 10], ... 'z': [-1, -2, -3, -1201, -4] ... }) >>> detect_outliers(df) [3] """ if not isinstance(X, pd.DataFrame): X = pd.DataFrame(X) # only select numeric numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64'] X = X.select_dtypes(include=numerics) if len(X.columns) == 0: return {} def get_IQR(df, k=2.0): q1 = df.quantile(0.25) q3 = df.quantile(0.75) iqr = q3 - q1 lower_bound = q1 - (k * iqr) upper_bound = q3 + (k * iqr) return (lower_bound, upper_bound) clf = IsolationForest(random_state=random_state, behaviour="new", contamination=0.1) clf.fit(X) scores = pd.Series(clf.decision_function(X)) lower_bound, upper_bound = get_IQR(scores, k=2) outliers = (scores < lower_bound) | (scores > upper_bound) outliers_indices = outliers[outliers].index.values.tolist() return outliers_indices
[docs]def detect_id_columns(X, threshold=1.0): """Check if any of the features are ID columns. Currently performs these simple checks: - column name is "id" - column name ends in "_id" - column contains all unique values (and is not float / boolean) Args: X (pd.DataFrame): The input features to check threshold (float): the probability threshold to be considered an ID column. Defaults to 1.0 Returns: A dictionary of features with column name or index and their probability of being ID columns Example: >>> df = pd.DataFrame({ ... 'df_id': [0, 1, 2, 3, 4], ... 'x': [10, 42, 31, 51, 61], ... 'y': [42, 54, 12, 64, 12] ... }) >>> detect_id_columns(df) {'df_id': 1.0} """ col_names = [str(col) for col in X.columns.tolist()] cols_named_id = [col for col in col_names if (col.lower() == "id")] # columns whose name is "id" id_cols = {col: 0.95 for col in cols_named_id} non_id_types = ['float16', 'float32', 'float64', 'bool'] X = X.select_dtypes(exclude=non_id_types) check_all_unique = (X.nunique() == len(X)) cols_with_all_unique = check_all_unique[check_all_unique].index.tolist() # columns whose values are all unique id_cols.update([(str(col), 1.0) if col in id_cols else (str(col), 0.95) for col in cols_with_all_unique]) col_ends_with_id = [col for col in col_names if str(col).lower().endswith("_id")] # columns whose name ends with "_id" id_cols.update([(col, 1.0) if col in id_cols else (col, 0.95) for col in col_ends_with_id]) id_cols_above_threshold = {key: value for key, value in id_cols.items() if value >= threshold} return id_cols_above_threshold