[docs]defimport_or_raise(library,error_msg=None,warning=False):"""Attempts to import the requested library by name. If the import fails, raises an ImportError or warning. Args: library (str): The name of the library. error_msg (str): Error message to return if the import fails. warning (bool): If True, import_or_raise gives a warning instead of ImportError. Defaults to False. Returns: Returns the library if importing succeeded. Raises: ImportError: If attempting to import the library fails because the library is not installed. Exception: If importing the library fails. """try:returnimportlib.import_module(library)exceptImportError:iferror_msgisNone:error_msg=""msg=f"Missing optional dependency '{library}'. Please use pip to install {library}. {error_msg}"ifwarning:warnings.warn(msg)else:raiseImportError(msg)exceptExceptionasex:msg=f"An exception occurred while trying to import `{library}`: {str(ex)}"ifwarning:warnings.warn(msg)else:raiseException(msg)
[docs]defconvert_to_seconds(input_str):"""Converts a string describing a length of time to its length in seconds. Args: input_str (str): The string to be parsed and converted to seconds. Returns: Returns the library if importing succeeded. Raises: AssertionError: If an invalid unit is used. Examples: >>> assert convert_to_seconds("10 hr") == 36000.0 >>> assert convert_to_seconds("30 minutes") == 1800.0 >>> assert convert_to_seconds("2.5 min") == 150.0 """hours={"h","hr","hour","hours"}minutes={"m","min","minute","minutes"}seconds={"s","sec","second","seconds"}value,unit=input_str.split()ifunit[-1]=="s"andlen(unit)!=1:unit=unit[:-1]ifunitinseconds:returnfloat(value)elifunitinminutes:returnfloat(value)*60elifunitinhours:returnfloat(value)*3600else:msg=("Invalid unit. Units must be hours, mins, or seconds. Received '{}'".format(unit,))raiseAssertionError(msg)
# specifies the min and max values a seed to np.random.RandomState is allowed to take.# these limits were chosen to fit in the numpy.int32 datatype to avoid issues with 32-bit systems# see https://docs.scipy.org/doc/numpy-1.15.0/reference/generated/numpy.random.RandomState.htmlSEED_BOUNDS=namedtuple("SEED_BOUNDS",("min_bound","max_bound"))(0,2**31-1)
[docs]defget_random_state(seed):"""Generates a numpy.random.RandomState instance using seed. Args: seed (None, int, np.random.RandomState object): seed to use to generate numpy.random.RandomState. Must be between SEED_BOUNDS.min_bound and SEED_BOUNDS.max_bound, inclusive. Raises: ValueError: If the input seed is not within the acceptable range. Returns: A numpy.random.RandomState instance. """ifisinstance(seed,(int,np.integer))and(seed<SEED_BOUNDS.min_boundorSEED_BOUNDS.max_bound<seed):raiseValueError('Seed "{}" is not in the range [{}, {}], inclusive'.format(seed,SEED_BOUNDS.min_bound,SEED_BOUNDS.max_bound,),)returncheck_random_state(seed)
[docs]defget_random_seed(random_state,min_bound=SEED_BOUNDS.min_bound,max_bound=SEED_BOUNDS.max_bound,):"""Given a numpy.random.RandomState object, generate an int representing a seed value for another random number generator. Or, if given an int, return that int. To protect against invalid input to a particular library's random number generator, if an int value is provided, and it is outside the bounds "[min_bound, max_bound)", the value will be projected into the range between the min_bound (inclusive) and max_bound (exclusive) using modular arithmetic. Args: random_state (int, numpy.random.RandomState): random state min_bound (None, int): if not default of None, will be min bound when generating seed (inclusive). Must be less than max_bound. max_bound (None, int): if not default of None, will be max bound when generating seed (exclusive). Must be greater than min_bound. Returns: int: Seed for random number generator Raises: ValueError: If boundaries are not valid. """ifnotmin_bound<max_bound:raiseValueError("Provided min_bound {} is not less than max_bound {}".format(min_bound,max_bound,),)ifisinstance(random_state,np.random.RandomState):returnrandom_state.randint(min_bound,max_bound)ifrandom_state<min_boundorrandom_state>=max_bound:return((random_state-min_bound)%(max_bound-min_bound))+min_boundreturnrandom_state
[docs]classclassproperty:"""Allows function to be accessed as a class level property. Example: .. code-block:: class LogisticRegressionBinaryPipeline(PipelineBase): component_graph = ['Simple Imputer', 'Logistic Regression Classifier'] @classproperty def summary(cls): summary = "" for component in cls.component_graph: component = handle_component_class(component) summary += component.name + " + " return summary assert LogisticRegressionBinaryPipeline.summary == "Simple Imputer + Logistic Regression Classifier + " assert LogisticRegressionBinaryPipeline().summary == "Simple Imputer + Logistic Regression Classifier + " """def__init__(self,func):self.func=funcdef__get__(self,_,klass):"""Get property value."""returnself.func(klass)
def_get_subclasses(base_class):"""Gets all of the leaf nodes in the hiearchy tree for a given base class. Args: base_class (abc.ABCMeta): Class to find all of the children for. Returns: subclasses (list): List of all children that are not base classes. """classes_to_check=base_class.__subclasses__()subclasses=[]whileclasses_to_check:subclass=classes_to_check.pop()children=subclass.__subclasses__()ifchildren:classes_to_check.extend(children)else:subclasses.append(subclass)returnsubclasses_not_used_in_automl={"BaselineClassifier","BaselineRegressor","TimeSeriesBaselineEstimator","MultiseriesTimeSeriesBaselineRegressor","StackedEnsembleClassifier","StackedEnsembleRegressor","KNeighborsClassifier","SVMClassifier","SVMRegressor","LinearRegressor","DecisionTreeClassifier","DecisionTreeRegressor","CatBoostRegressor","CatBoostClassifier",}
[docs]defget_importable_subclasses(base_class,used_in_automl=True):"""Get importable subclasses of a base class. Used to list all of our estimators, transformers, components and pipelines dynamically. Args: base_class (abc.ABCMeta): Base class to find all of the subclasses for. used_in_automl: Not all components/pipelines/estimators are used in automl search. If True, only include those subclasses that are used in the search. This would mean excluding classes related to ExtraTrees, ElasticNet, and Baseline estimators. Returns: List of subclasses. """all_classes=_get_subclasses(base_class)classes=[]forclsinall_classes:if"evalml.pipelines"notincls.__module__:continuetry:cls()classes.append(cls)except(ImportError,MissingComponentError,TypeError):logger.debug(f"Could not import class {cls.__name__} in get_importable_subclasses",)ifused_in_automl:classes=[clsforclsinclassesifcls.__name__notin_not_used_in_automl]returnclasses
def_rename_column_names_to_numeric(X):"""Used in LightGBM and XGBoost estimator classes to rename column names when the input is a pd.DataFrame in case it has column names that contain symbols ([, ], <) that these estimators cannot natively handle. Args: X (pd.DataFrame): The input training data of shape [n_samples, n_features] Returns: Transformed X where column names are renamed to numerical values """ifisinstance(X,(np.ndarray,list)):returnpd.DataFrame(X)X_renamed=X.copy()logical_types=X.ww.logical_typesiflen(X.columns)>0andisinstance(X.columns,pd.MultiIndex):flat_col_names=list(map(str,X_renamed.columns))X_renamed.columns=flat_col_nameslogical_types={str(k):vfork,vinlogical_types.items()}rename_cols_dict=dict((str(col),col_num)forcol_num,colinenumerate(list(X.columns)))else:rename_cols_dict=dict((col,col_num)forcol_num,colinenumerate(list(X.columns)))X_renamed.rename(columns=rename_cols_dict,inplace=True)X_renamed.ww.init(logical_types={rename_cols_dict[k]:vfork,vinlogical_types.items()},)returnX_renamed
[docs]defjupyter_check():"""Get whether or not the code is being run in a Ipython environment (such as Jupyter Notebook or Jupyter Lab). Returns: boolean: True if Ipython, False otherwise. """try:ipy=import_or_raise("IPython")returnipy.core.getipython.get_ipython()exceptException:returnFalse
[docs]defsafe_repr(value):"""Convert the given value into a string that can safely be used for repr. Args: value: The item to convert Returns: String representation of the value """ifisinstance(value,float):ifpd.isna(value):return"np.nan"ifnp.isinf(value):returnf"float('{repr(value)}')"returnrepr(value)
[docs]defis_all_numeric(df):"""Checks if the given DataFrame contains only numeric values. Args: df (pd.DataFrame): The DataFrame to check data types of. Returns: True if all the columns are numeric and are not missing any values, False otherwise. """forcol_tagsindf.ww.semantic_tags.values():if"numeric"notincol_tags:returnFalseifdf.isnull().any().any():returnFalsereturnTrue
[docs]defpad_with_nans(pd_data,num_to_pad):"""Pad the beginning num_to_pad rows with nans. Args: pd_data (pd.DataFrame or pd.Series): Data to pad. num_to_pad (int): Number of nans to pad. Returns: pd.DataFrame or pd.Series """ifisinstance(pd_data,pd.Series):padding=pd.Series([np.nan]*num_to_pad,name=pd_data.name)else:padding=pd.DataFrame({col:[np.nan]*num_to_padforcolinpd_data.columns})padded=pd.concat([padding,pd_data],ignore_index=True)# By default, pd.concat will convert all types to object if there are mixed numerics and objects# The call to convert_dtypes ensures numerics stay numerics in the new dataframe.returnpadded.convert_dtypes(infer_objects=True,convert_string=False,convert_floating=False,convert_integer=False,convert_boolean=False,)
def_get_rows_without_nans(*data):"""Compute a boolean array marking where all entries in the data are non-nan. Args: *data (sequence of pd.Series or pd.DataFrame): data Returns: np.ndarray: mask where each entry is True if and only if all corresponding entries in that index in data are non-nan. """def_not_nan(pd_data):ifpd_dataisNoneorlen(pd_data)==0:returnnp.array([True])ifisinstance(pd_data,pd.Series):return~pd_data.isna().valueselifisinstance(pd_data,pd.DataFrame):return~pd_data.isna().any(axis=1).valueselse:returnpd_datamask=reduce(lambdaa,b:np.logical_and(_not_nan(a),_not_nan(b)),data)returnmask
[docs]defdrop_rows_with_nans(*pd_data):"""Drop rows that have any NaNs in all dataframes or series. Args: *pd_data: sequence of pd.Series or pd.DataFrame or None Returns: list of pd.DataFrame or pd.Series or None """mask=_get_rows_without_nans(*pd_data)def_subset(pd_data):ifpd_dataisnotNoneandnotpd_data.empty:returnpd_data.iloc[mask]returnpd_datareturn[_subset(data)fordatainpd_data]
def_file_path_check(filepath=None,format="png",interactive=False,is_plotly=False):"""Helper function to check the filepath being passed. Args: filepath (str or Path, optional): Location to save file. format (str): Extension for figure to be saved as. Defaults to 'png'. interactive (bool, optional): If True and fig is of type plotly.Figure, sets the format to 'html'. is_plotly (bool, optional): Check to see if the fig being passed is of type plotly.Figure. Returns: String representing the final filepath the image will be saved to. """iffilepath:filepath=str(filepath)path_and_name,extension=os.path.splitext(filepath)extension=extension[1:].lower()ifextensionelseNoneifis_plotlyandinteractive:format_="html"elifnotextensionandnotinteractive:format_=formatelse:format_=extensionfilepath=f"{path_and_name}.{format_}"try:f=open(filepath,"w")f.close()except(IOError,FileNotFoundError):raiseValueError(("Specified filepath is not writeable: {}".format(filepath)),)returnfilepath
[docs]defsave_plot(fig,filepath=None,format="png",interactive=False,return_filepath=False,):"""Saves fig to filepath if specified, or to a default location if not. Args: fig (Figure): Figure to be saved. filepath (str or Path, optional): Location to save file. Default is with filename "test_plot". format (str): Extension for figure to be saved as. Ignored if interactive is True and fig is of type plotly.Figure. Defaults to 'png'. interactive (bool, optional): If True and fig is of type plotly.Figure, saves the fig as interactive instead of static, and format will be set to 'html'. Defaults to False. return_filepath (bool, optional): Whether to return the final filepath the image is saved to. Defaults to False. Returns: String representing the final filepath the image was saved to if return_filepath is set to True. Defaults to None. """plotly_=import_or_raise("plotly",error_msg="Cannot find dependency plotly")graphviz_=import_or_raise("graphviz",error_msg="Please install graphviz to visualize trees.",)matplotlib=import_or_raise("matplotlib",error_msg="Cannot find dependency matplotlib",)plt_=matplotlib.pyplotaxes_=matplotlib.axesis_plotly=Falseis_graphviz=Falseis_plt=Falseis_seaborn=Falseformat=formatifformatelse"png"ifisinstance(fig,plotly_.graph_objects.Figure):is_plotly=Trueelifisinstance(fig,graphviz_.Source):is_graphviz=Trueelifisinstance(fig,plt_.Figure):is_plt=Trueelifisinstance(fig,axes_.SubplotBase):is_seaborn=Trueifnotfilepath:extension="html"ifinteractiveandis_plotlyelseformatfilepath=os.path.join(os.getcwd(),f"test_plot.{extension}")filepath=_file_path_check(filepath,format=format,interactive=interactive,is_plotly=is_plotly,)ifis_plotlyandinteractive:fig.write_html(file=filepath)elifis_plotlyandnotinteractive:fig.write_image(file=filepath,engine="kaleido")elifis_graphviz:filepath_,format_=os.path.splitext(filepath)fig.format="png"filepath=f"{filepath_}.png"fig.render(filename=filepath_,view=False,cleanup=True)elifis_plt:fig.savefig(fname=filepath)elifis_seaborn:fig=fig.figurefig.savefig(fname=filepath)ifreturn_filepath:returnfilepath
[docs]defdeprecate_arg(old_arg,new_arg,old_value,new_value):"""Helper to raise warnings when a deprecated arg is used. Args: old_arg (str): Name of old/deprecated argument. new_arg (str): Name of new argument. old_value (Any): Value the user passed in for the old argument. new_value (Any): Value the user passed in for the new argument. Returns: old_value if not None, else new_value """value_to_use=new_valueifold_valueisnotNone:warnings.warn(f"Argument '{old_arg}' has been deprecated in favor of '{new_arg}'. "f"Passing '{old_arg}' in future versions will result in an error.",)value_to_use=old_valuereturnvalue_to_use
[docs]defcontains_all_ts_parameters(problem_configuration):"""Validates that the problem configuration contains all required keys. Args: problem_configuration (dict): Problem configuration. Returns: bool, str: True if the configuration contains all parameters. If False, msg is a non-empty string with error message. """required_parameters={"time_index","gap","max_delay","forecast_horizon"}msg=""if(notproblem_configurationornotall(pinproblem_configurationforpinrequired_parameters)orproblem_configuration["time_index"]isNone):msg=("problem_configuration must be a dict containing values for at least the time_index, gap, max_delay, "f"and forecast_horizon parameters, and time_index cannot be None. Received {problem_configuration}.")returnnot(msg),msg
[docs]defare_ts_parameters_valid_for_split(gap,max_delay,forecast_horizon,n_obs,n_splits,):"""Validates the time series parameters in problem_configuration are compatible with split sizes. Args: gap (int): gap value. max_delay (int): max_delay value. forecast_horizon (int): forecast_horizon value. n_obs (int): Number of observations in the dataset. n_splits (int): Number of cross validation splits. Returns: TsParameterValidationResult - named tuple with four fields is_valid (bool): True if parameters are valid. msg (str): Contains error message to display. Empty if is_valid. smallest_split_size (int): Smallest split size given n_obs and n_splits. max_window_size (int): Max window size given gap, max_delay, forecast_horizon. """eval_size=forecast_horizon*n_splitstrain_size=n_obs-eval_sizewindow_size=gap+max_delay+forecast_horizonmsg=""iftrain_size<=window_size:msg=(f"Since the data has {n_obs} observations, n_splits={n_splits}, and a forecast horizon of {forecast_horizon}, "f"the smallest split would have {train_size} observations. "f"Since {gap+max_delay+forecast_horizon} (gap + max_delay + forecast_horizon) >= {train_size}, ""then at least one of the splits would be empty by the time it reaches the pipeline. ""Please use a smaller number of splits, reduce one or more these parameters, or collect more data.")return_validation_result(notmsg,msg,train_size,window_size,n_obs,n_splits)
[docs]defare_datasets_separated_by_gap_time_index(train,test,pipeline_params,freq=None):"""Determine if the train and test datasets are separated by gap number of units using the time_index. This will be true when users are predicting on unseen data but not during cross validation since the target is known. Args: train (pd.DataFrame): Training data. test (pd.DataFrame): Data of shape [n_samples, n_features]. pipeline_params (dict): Dictionary of time series parameters. freq (str): Frequency of time index. Returns: bool: True if the difference in time units is equal to gap + 1. """gap_difference=pipeline_params["gap"]+1train_copy=train.copy()test_copy=test.copy()train_copy.ww.init(time_index=pipeline_params["time_index"])test_copy.ww.init(time_index=pipeline_params["time_index"])ifnotfreq:X_frequency_dict=train_copy.ww.infer_temporal_frequencies(temporal_columns=[train_copy.ww.time_index],)freq=X_frequency_dict[test_copy.ww.time_index]iffreqisNone:returnTruefirst_testing_date=test_copy[test_copy.ww.time_index].iloc[0]last_training_date=train_copy[train_copy.ww.time_index].iloc[-1]return(to_offset(freq)*gap_difference)+last_training_date==first_testing_date
[docs]defget_time_index(X:pd.DataFrame,y:pd.Series,time_index_name:str):"""Determines the column in the given data that should be used as the time index."""# Prefer the user's provided time_index, if it existsiftime_index_nameandtime_index_nameinX.columns:dt_col=X[time_index_name]# If user's provided time_index doesn't exist, log it and find some datetimes to useelif(time_index_nameisNone)ortime_index_namenotinX.columns:iftime_index_nameisnotNone:logger.warning(f"Could not find requested time_index {time_index_name}",)# Use the feature data's index, preferentiallynum_datetime_features=X.ww.select("Datetime").shape[1]ifisinstance(X.index,pd.DatetimeIndex):dt_col=X.indexelifisinstance(y.index,pd.DatetimeIndex):dt_col=y.indexelifnum_datetime_features==0:raiseValueError("There are no Datetime features in the feature data and neither the feature nor the target data have a DateTime index.",)# Use a datetime column of the features if there's only oneelifnum_datetime_features==1:dt_col=X.ww.select("Datetime").squeeze()# With more than one datetime column, use the time_index parameter, if provided.elifnum_datetime_features>1:iftime_index_nameisNone:raiseValueError("Too many Datetime features provided in data but no time_index column specified during __init__.",)eliftime_index_namenotinX:raiseValueError(f"Too many Datetime features provided in data and provided time_index column {time_index_name} not present in data.",)ifnotisinstance(dt_col,pd.DatetimeIndex)ordt_col.freqisNone:dt_col=pd.DatetimeIndex(dt_col,freq="infer")time_index=dt_col.rename(y.index.name)returntime_index
[docs]defvalidate_holdout_datasets(X,X_train,pipeline_params):"""Validate the holdout datasets match our expectations. This function is run before calling predict in a time series pipeline. It verifies that X (the holdout set) is gap units away from the training set and is less than or equal to the forecast_horizon. Args: X (pd.DataFrame): Data of shape [n_samples, n_features]. X_train (pd.DataFrame): Training data. pipeline_params (dict): Dictionary of time series parameters with gap, forecast_horizon, and time_index being required. Returns: TSHoldoutValidationResult - named tuple with three fields is_valid (bool): True if holdout data is valid. error_messages (list): List of error messages to display. Empty if is_valid. error_codes (list): List of error codes to display. Empty if is_valid. """forecast_horizon=pipeline_params["forecast_horizon"]gap=pipeline_params["gap"]time_index=pipeline_params["time_index"]right_length=len(X)<=forecast_horizonX_separated_by_gap=are_datasets_separated_by_gap_time_index(X_train,X,pipeline_params,)errors=[]error_msg=[]ifnotright_length:errors.append(ValidationErrorCode.INVALID_HOLDOUT_LENGTH)error_msg.append(f"Holdout data X must have {forecast_horizon} rows (value of forecast horizon) "f"Data received - Length X: {len(X)}",)ifnotX_separated_by_gap:errors.append(ValidationErrorCode.INVALID_HOLDOUT_GAP_SEPARATION)error_msg.append(f"The first value indicated by the column {time_index} needs to start {gap+1} "f"units ahead of the training data. "f"X value start: {X[time_index].iloc[0]}, X_train value end {X_train[time_index].iloc[-1]}.",)return_holdout_validation_result(noterrors,error_msg,errors)