Source code for evalml.pipelines.multiseries_regression_pipeline

"""Pipeline base class for time series regression problems."""

import pandas as pd
from woodwork.statistics_utils import infer_frequency

from evalml.pipelines.time_series_regression_pipeline import (
    TimeSeriesRegressionPipeline,
)
from evalml.problem_types import ProblemTypes
from evalml.utils import infer_feature_types


[docs]class MultiseriesRegressionPipeline(TimeSeriesRegressionPipeline): """Pipeline base class for multiseries time series regression problems. Args: component_graph (ComponentGraph, list, dict): ComponentGraph instance, list of components in order, or dictionary of components. parameters (dict): Dictionary with component names as keys and dictionary of that component's parameters as values. An empty dictionary {} implies using all default values for component parameters. Pipeline-level parameters such as time_index, gap, and max_delay must be specified with the "pipeline" key. For example: Pipeline(parameters={"pipeline": {"time_index": "Date", "max_delay": 4, "gap": 2}}). custom_name (str): Custom name for the pipeline. Defaults to None. random_seed (int): Seed for the random number generator. Defaults to 0. """ problem_type = ProblemTypes.MULTISERIES_TIME_SERIES_REGRESSION """ProblemTypes.MULTISERIES_TIME_SERIES_REGRESSION""" def __init__( self, component_graph, parameters=None, custom_name=None, random_seed=0, ): if not parameters or "pipeline" not in parameters: raise ValueError( "time_index, gap, max_delay, and forecast_horizon parameters cannot be omitted from the parameters dict. " "Please specify them as a dictionary with the key 'pipeline'.", ) if "series_id" not in parameters["pipeline"]: raise ValueError( "series_id must be defined for multiseries time series pipelines. Please specify it as a key in the pipeline " "parameters dict.", ) self.series_id = parameters["pipeline"]["series_id"] super().__init__( component_graph, custom_name=custom_name, parameters=parameters, random_seed=random_seed, )
[docs] def fit(self, X, y): """Fit a multiseries time series pipeline. Args: X (pd.DataFrame): The input training data of shape [n_samples, n_features]. y (pd.Series): The target training targets of length [n_samples*n_series]. Returns: self Raises: ValueError: If the target is not numeric. """ self._fit(X, y) return self
def _fit(self, X, y): from evalml.pipelines.utils import unstack_multiseries self.input_target_name = y.name X_unstacked, y_unstacked = unstack_multiseries( X, y, self.series_id, self.time_index, self.input_target_name, ) self.frequency = infer_frequency(X_unstacked[self.time_index]) self.component_graph.fit(X_unstacked, y_unstacked) self.input_feature_names = self.component_graph.input_feature_names self.series_id_target_names = y_unstacked.columns
[docs] def predict_in_sample( self, X, y, X_train, y_train, objective=None, calculating_residuals=False, include_series_id=False, ): """Predict on future data where the target is known, e.g. cross validation. Args: X (pd.DataFrame or np.ndarray): Future data of shape [n_samples, n_features] y (pd.Series, np.ndarray): Future target of shape [n_samples] X_train (pd.DataFrame, np.ndarray): Data the pipeline was trained on of shape [n_samples_train, n_feautures] y_train (pd.Series, np.ndarray): Targets used to train the pipeline of shape [n_samples_train] objective (ObjectiveBase, str, None): Objective used to threshold predicted probabilities, optional. calculating_residuals (bool): Whether we're calling predict_in_sample to calculate the residuals. This means the X and y arguments are not future data, but actually the train data. include_series_id (bool): If true, include the series ID value in the prediction results Returns: pd.Series: Estimated labels. Raises: ValueError: If final component is not an Estimator. """ from evalml.pipelines.utils import stack_data, unstack_multiseries X_unstacked, y_unstacked = unstack_multiseries( X, y, self.series_id, self.time_index, self.input_target_name, ) X_train_unstacked, y_train_unstacked = unstack_multiseries( X_train, y_train, self.series_id, self.time_index, self.input_target_name, ) # Order series columns to be same as expected input feature names # and filter to only include features in `X_unstacked`. input_features = list(self.input_feature_names.values())[0] X_unstacked = X_unstacked[ [feature for feature in input_features if feature in X_unstacked.columns] ] X_train_unstacked = X_train_unstacked[ [ feature for feature in input_features if feature in X_train_unstacked.columns ] ] y_overlapping_features = [ feature for feature in self.series_id_target_names if feature in y_unstacked.columns ] y_unstacked = y_unstacked[y_overlapping_features] y_train_unstacked = y_train_unstacked[y_overlapping_features] X_train_unstacked = infer_feature_types(X_train_unstacked) y_train_unstacked = infer_feature_types(y_train_unstacked) X_unstacked = infer_feature_types(X_unstacked) y_unstacked = infer_feature_types(y_unstacked) unstacked_predictions = super().predict_in_sample( X_unstacked, y_unstacked, X_train_unstacked, y_train_unstacked, objective, calculating_residuals, ) unstacked_predictions = unstacked_predictions[ [ series_id_target for series_id_target in self.series_id_target_names if series_id_target in unstacked_predictions.columns ] ] # Add `time_index` column to index for generating stacked datetime column in `stack_data()` unstacked_predictions.index = X_unstacked[self.time_index] stacked_predictions = stack_data( unstacked_predictions, include_series_id=True, series_id_name=self.series_id, ) # Move datetime index into separate date column to use when merging later stacked_predictions = stacked_predictions.reset_index(drop=False) sp_dtypes = { self.time_index: X[self.time_index].dtype, self.series_id: X[self.series_id].dtype, self.input_target_name: y.dtype, } stacked_predictions = stacked_predictions.astype(sp_dtypes) # Order prediction based on input (date, series_id) output_cols = ( [self.series_id, self.input_target_name] if include_series_id else [self.input_target_name] ) stacked_predictions = pd.merge( X, stacked_predictions, on=[self.time_index, self.series_id], )[output_cols] # Index will start at the unstacked index, so we need to reset it to the original index stacked_predictions.index = X.index if not include_series_id: return infer_feature_types(stacked_predictions[self.input_target_name]) else: return infer_feature_types(stacked_predictions)
[docs] def get_forecast_period(self, X): """Generates all possible forecasting time points based on latest data point in X. For the multiseries case, each time stamp is duplicated for each unique value in `X`'s `series_id` column. Input data must be stacked in order to properly generate unique periods. Args: X (pd.DataFrame, np.ndarray): Stacked data the pipeline was trained on of shape [n_samples_train * n_series_ids, n_features]. Raises: ValueError: If pipeline is not trained. Returns: pd.DataFrame: Dataframe containing a column with datetime periods from `gap` to `forecast_horizon + gap` per unique `series_id` value. """ dates = super().get_forecast_period(X) dates.name = self.time_index series_id_values = X[self.series_id].unique() new_period_df = dates.to_frame().merge( pd.Series(series_id_values, name=self.series_id), how="cross", ) # Generate new numeric index start_idx = dates.index[0] + (self.gap * len(series_id_values)) num_idx = pd.Series(range(start_idx, start_idx + len(new_period_df))) new_period_df.index = num_idx return new_period_df