Source code for evalml.pipelines.time_series_pipeline_base
"""Pipeline base class for time-series problems."""importpandasaspdimportwoodworkaswwfromevalml.pipelinesimportPipelineBasefromevalml.pipelines.pipeline_metaimportPipelineBaseMetafromevalml.utilsimportinfer_feature_typesfromevalml.utils.gen_utilsimportare_datasets_separated_by_gap_time_index
[docs]classTimeSeriesPipelineBase(PipelineBase,metaclass=PipelineBaseMeta):"""Pipeline base class for time series problems. Args: component_graph (ComponentGraph, list, dict): ComponentGraph instance, list of components in order, or dictionary of components. Accepts strings or ComponentBase subclasses in the list. Note that when duplicate components are specified in a list, the duplicate component names will be modified with the component's index in the list. For example, the component graph [Imputer, One Hot Encoder, Imputer, Logistic Regression Classifier] will have names ["Imputer", "One Hot Encoder", "Imputer_2", "Logistic Regression Classifier"] parameters (dict): Dictionary with component names as keys and dictionary of that component's parameters as values. An empty dictionary {} implies using all default values for component parameters. Pipeline-level parameters such as time_index, gap, and max_delay must be specified with the "pipeline" key. For example: Pipeline(parameters={"pipeline": {"time_index": "Date", "max_delay": 4, "gap": 2}}). random_seed (int): Seed for the random number generator. Defaults to 0. """def__init__(self,component_graph,parameters=None,custom_name=None,random_seed=0,):ifnotparametersor"pipeline"notinparameters:raiseValueError("time_index, gap, max_delay, and forecast_horizon parameters cannot be omitted from the parameters dict. ""Please specify them as a dictionary with the key 'pipeline'.",)self.pipeline_params=parameters["pipeline"]self.gap=self.pipeline_params["gap"]self.max_delay=self.pipeline_params["max_delay"]self.forecast_horizon=self.pipeline_params["forecast_horizon"]self.time_index=self.pipeline_params["time_index"]self.frequency=Noneifself.time_indexisNone:raiseValueError("Parameter time_index cannot be None!")super().__init__(component_graph,custom_name=custom_name,parameters=parameters,random_seed=random_seed,)datetime_featurizer_included=("DateTime Featurizer"inself.component_graph.compute_orderor"Not Known In Advance Pipeline - DateTime Featurizer"inself.component_graph.compute_order)time_series_featurizer_included=("Time Series Featurizer"inself.component_graph.compute_orderor"Not Known In Advance Pipeline - Time Series Featurizer"inself.component_graph.compute_order)time_series_native_estimators=["ARIMA Regressor","Prophet Regressor","VARMAX Regressor",]self.should_skip_featurization=(notdatetime_featurizer_includedandnottime_series_featurizer_includedandself.estimatorisnotNone)self.should_drop_time_index=(self.should_skip_featurizationandself.estimator.namenotintime_series_native_estimators)@staticmethoddef_convert_to_woodwork(X,y):ifXisNone:X=pd.DataFrame()X=infer_feature_types(X)y=infer_feature_types(y)returnX,y@staticmethoddef_move_index_forward(index,gap):"""Fill in the index of the gap features and values with the right values."""ifisinstance(index,(pd.DatetimeIndex,pd.PeriodIndex,pd.TimedeltaIndex)):returnindex.shift(gap)else:returnindex+gapdef_add_training_data_to_X_Y(self,X,y,X_train,y_train):"""Append the training data to the holdout data. Need to do this so that we have all the data we need to compute lagged features on the holdout set. """last_row_of_training=self.forecast_horizon+self.max_delay+self.gapgap_features=pd.DataFrame()gap_target=pd.Series()ifisinstance(y,pd.Series)elsepd.DataFrame()if(are_datasets_separated_by_gap_time_index(X_train,X,self.pipeline_params,self.frequency,)andself.gap):# The training data does not have the gap dates so don't need to include themlast_row_of_training-=self.gap# Instead, we'll create some dummy data to represent the missing gap dates# These do not show up in the features used for predictiongap_features=X_train.iloc[[-1]*self.gap]gap_features.index=self._move_index_forward(X_train.index[-self.gap:],self.gap,)gap_target=y_train.iloc[[-1]*self.gap]gap_target.index=self._move_index_forward(y_train.index[-self.gap:],self.gap,)# Properly fill in the dates in the gaptime_index=self.pipeline_params["time_index"]correct_range=pd.date_range(start=X_train[time_index].iloc[-1],periods=self.gap+1,freq=self.frequency,)[1:]gap_features[time_index]=correct_rangefeatures_to_concat=[X_train.iloc[-last_row_of_training:],gap_features,X,]targets_to_concat=[y_train.iloc[-last_row_of_training:],gap_target,y,]padded_features=pd.concat(features_to_concat,axis=0).fillna(method="ffill")padded_target=pd.concat(targets_to_concat,axis=0).fillna(method="ffill")padded_features.ww.init(schema=X_train.ww.schema)ifisinstance(padded_target,pd.Series):padded_target=ww.init_series(padded_target,logical_type=y_train.ww.logical_type,)else:# Multiseries casepadded_target.ww.init(schema=y_train.ww.schema)returnpadded_features,padded_targetdef_drop_time_index(self,X,y):"""Helper method to drop the time index column from the data if DateTime Featurizer is not present."""ifself.should_drop_time_indexandself.time_indexinX.columns:index_name=X.index.nametime_index=pd.DatetimeIndex(X[self.time_index],freq=self.frequency)y_schema=y.ww.schemay=y.set_axis(time_index)y.ww.init(schema=y_schema)ifX.ww.schemaisnotNone:X=X.ww.copy()X.ww.set_time_index(None)X.ww.set_index(self.time_index)X=X.ww.drop(self.time_index)X.index.freq=time_index.freqelse:X.set_index(time_index)X=X.drop(self.time_index,axis=1)X.index.name=index_namey.index.name=index_namereturnX,y
[docs]deftransform_all_but_final(self,X,y=None,X_train=None,y_train=None,calculating_residuals=False,):"""Transforms the data by applying all pre-processing components. Args: X (pd.DataFrame): Input data to the pipeline to transform. y (pd.Series): Targets corresponding to the pipeline targets. X_train (pd.DataFrame): Training data used to generate generates from past observations. y_train (pd.Series): Training targets used to generate features from past observations. calculating_residuals (bool): Whether we're calling predict_in_sample to calculate the residuals. This means the X and y arguments are not future data, but actually the train data. Returns: pd.DataFrame: New transformed features. """ify_trainisNone:y_train=pd.Series()X_train,y_train=self._convert_to_woodwork(X_train,y_train)X,y=self._convert_to_woodwork(X,y)empty_training_data=X_train.emptyory_train.emptyif(empty_training_dataorself.should_skip_featurizationorcalculating_residuals):features_holdout=super().transform_all_but_final(X,y)else:padded_features,padded_target=self._add_training_data_to_X_Y(X,y,X_train,y_train,)features=super().transform_all_but_final(padded_features,padded_target)features_holdout=features.ww.iloc[-len(y):]returnfeatures_holdout
[docs]defpredict_in_sample(self,X,y,X_train,y_train,objective=None,calculating_residuals=False,):"""Predict on future data where the target is known, e.g. cross validation. Args: X (pd.DataFrame or np.ndarray): Future data of shape [n_samples, n_features] y (pd.Series, np.ndarray): Future target of shape [n_samples] X_train (pd.DataFrame, np.ndarray): Data the pipeline was trained on of shape [n_samples_train, n_feautures] y_train (pd.Series, np.ndarray): Targets used to train the pipeline of shape [n_samples_train] objective (ObjectiveBase, str, None): Objective used to threshold predicted probabilities, optional. calculating_residuals (bool): Whether we're calling predict_in_sample to calculate the residuals. This means the X and y arguments are not future data, but actually the train data. Returns: pd.Series: Estimated labels. Raises: ValueError: If final component is not an Estimator. """ifself.estimatorisNone:raiseValueError("Cannot call predict_in_sample() on a component graph because the final component is not an Estimator.",)X,y=self._drop_time_index(X,y)X_train,y_train=self._drop_time_index(X_train,y_train)X,y=self._ensure_correct_indices(X,y,X_train)target=infer_feature_types(y)features=self.transform_all_but_final(X,target,X_train,y_train,calculating_residuals=calculating_residuals,)predictions=self._estimator_predict(features)ifisinstance(predictions,pd.Series):predictions=predictions.rename(self.input_target_name)elifisinstance(predictions,pd.DataFrame):predictions=predictions.ww.rename(dict(zip(predictions.columns,y.columns)),)iflen(predictions)==len(y):predictions.index=y.indexpredictions=self.inverse_transform(predictions)ifisinstance(predictions,pd.Series):predictions=predictions.rename(self.input_target_name)returninfer_feature_types(predictions)
def_ensure_correct_indices(self,X,y,X_train):"""Ensures that X and y holdout's indices are the correct integer or time units w.r.t the training data. For predict in sample where the holdout is known to follow the training data. """ifX_train.index.is_numeric():starting_index=X_train.index[-1]+1+self.gapcorrect_index=range(starting_index,starting_index+len(y))X.index=correct_indexy.index=correct_indexreturnX,ydef_create_empty_series(self,y_train,size):returnww.init_series(pd.Series([y_train.iloc[0]]*size,name=y_train.name),logical_type=y_train.ww.logical_type,)
[docs]defpredict(self,X,objective=None,X_train=None,y_train=None):"""Predict on future data where target is not known. Args: X (pd.DataFrame, or np.ndarray): Data of shape [n_samples, n_features]. objective (Object or string): The objective to use to make predictions. X_train (pd.DataFrame or np.ndarray or None): Training data. y_train (pd.Series or None): Training labels. Raises: ValueError: If X_train and/or y_train are None or if final component is not an Estimator. Returns: Predictions. """ifX_trainisNone:raiseValueError("Make sure to include an input for X_train when calling time series' predict",)elify_trainisNone:raiseValueError("Make sure to include an input for y_train when calling time series' predict",)ifself.estimatorisNone:raiseValueError("Cannot call predict() on a component graph because the final component is not an Estimator.",)X=infer_feature_types(X)X.index=self._move_index_forward(X_train.index[-X.shape[0]:],self.gap+X.shape[0],)X,y=self._drop_time_index(X,pd.Series([0]*len(X)))X_train,y_train=self._drop_time_index(X_train,y_train)X_train,y_train=self._convert_to_woodwork(X_train,y_train)y_holdout=self._create_empty_series(y_train,X.shape[0])y_holdout=infer_feature_types(y_holdout)y_holdout.index=X.indexreturnself.predict_in_sample(X,y_holdout,X_train,y_train,objective=objective,)
def_estimator_predict(self,features):"""Get estimator predictions. This helper passes y as an argument if needed by the estimator. """returnself.estimator.predict(features)
[docs]defdates_needed_for_prediction(self,date):"""Return dates needed to forecast the given date in the future. Args: date (pd.Timestamp): Date to forecast in the future. Returns: dates_needed (tuple(pd.Timestamp)): Range of dates needed to forecast the given date. """beginning_date_num=(self.forecast_horizon+self.max_delay# include start delay for featurization+self.gap# add first gap for the actual gap from the end date+self.gap# add another gap to ensure training data is greater than gap+1# for the + 1 in the time series featurizer)beginning_date=date-pd.tseries.frequencies.to_offset(f"{beginning_date_num}{self.frequency}",)end_date_num=1+self.gapend_date=date-pd.tseries.frequencies.to_offset(f"{end_date_num}{self.frequency}",)return(beginning_date,end_date)
[docs]defdates_needed_for_prediction_range(self,start_date,end_date):"""Return dates needed to forecast the given date in the future. Args: start_date (pd.Timestamp): Start date of range to forecast in the future. end_date (pd.Timestamp): End date of range to forecast in the future. Returns: dates_needed (tuple(pd.Timestamp)): Range of dates needed to forecast the given date. Raises: ValueError: If start_date doesn't come before end_date """ifstart_date>end_date:raiseValueError("`start_date` must come before `end_date`.")start_range=self.dates_needed_for_prediction(start_date)end_range=self.dates_needed_for_prediction(end_date)return(start_range[0],end_range[1])