Source code for evalml.pipelines.components.transformers.preprocessing.stl_decomposer

"""Component that removes trends and seasonality from time series using STL."""
from __future__ import annotations

import logging
from typing import Union

import pandas as pd
from pandas import RangeIndex
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.forecasting.stl import STLForecast
from statsmodels.tsa.seasonal import STL

from evalml.pipelines.components.transformers.preprocessing.decomposer import Decomposer
from evalml.utils import infer_feature_types


[docs]class STLDecomposer(Decomposer): """Removes trends and seasonality from time series using the STL algorithm. https://www.statsmodels.org/dev/generated/statsmodels.tsa.seasonal.STL.html Args: time_index (str): Specifies the name of the column in X that provides the datetime objects. Defaults to None. series_id (str): Specifies the name of the column in X that provides the series_id objects for multiseries. Defaults to None. degree (int): Not currently used. STL 3x "degree-like" values. None are able to be set at this time. Defaults to 1. period (int): The number of entries in the time series data that corresponds to one period of a cyclic signal. For instance, if data is known to possess a weekly seasonal signal, and if the data is daily data, the period should likely be 7. For daily data with a yearly seasonal signal, the period should likely be 365. If None, statsmodels will infer the period based on the frequency. Defaults to None. seasonal_smoother (int): The length of the seasonal smoother used by the underlying STL algorithm. For compatibility, must be odd. If an even number is provided, the next, highest odd number will be used. Defaults to 7. random_seed (int): Seed for the random number generator. Defaults to 0. """ name = "STL Decomposer" hyperparameter_ranges = {} modifies_features = False modifies_target = True def __init__( self, time_index: str = None, series_id: str = None, degree: int = 1, # Currently unused. period: int = None, periods: dict = None, seasonal_smoother: int = 7, random_seed: int = 0, **kwargs, ): self.logger = logging.getLogger(__name__) self.series_id = series_id self.periods = periods # Programmatically adjust seasonal_smoother to fit underlying STL requirements, # that seasonal_smoother must be odd. if seasonal_smoother % 2 == 0: self.logger.warning( f"STLDecomposer provided with an even period of {seasonal_smoother}" f"Changing seasonal period to {seasonal_smoother+1}", ) seasonal_smoother += 1 self.forecast_summary = None parameters = { "degree": degree, "period": period, "periods": periods, "seasonal_smoother": seasonal_smoother, "time_index": time_index, "series_id": series_id, } parameters.update(kwargs) super().__init__( component_obj=None, random_seed=random_seed, **parameters, **kwargs, ) def _project_trend(self, y, trend, period): """Function to project the in-sample trend into the future.""" self._check_oos_past(y) index = self._choose_proper_index(y) # Determine how many units forward to project by finding the difference, # in index values, between the requested target and the fit data. if isinstance(y.index, pd.DatetimeIndex): units_forward = ( len( pd.date_range( start=trend.index[-1], end=y.index[-1], freq=self.frequency, ), ) - 1 ) elif isinstance(y.index, RangeIndex) or y.index.is_numeric(): units_forward = int(y.index[-1] - index[-1]) # Model the trend and project it forward stlf = STLForecast( trend, ARIMA, model_kwargs=dict(order=(1, 1, 0), trend="t"), period=period, ) stlf = stlf.fit() forecast = stlf.forecast(units_forward) # Store forecast summary for use in calculating trend prediction intervals. self.forecast_summary = stlf.get_prediction( len(trend), len(trend) + units_forward - 1, ) # Handle out-of-sample forecasts. The forecast will have additional data # between the end of the in-sample data and the beginning of the # requested out-of-sample data to inverse transform. overlapping_ind = [ind for ind in y.index if ind in forecast.index] if len(overlapping_ind) > 0: return forecast[overlapping_ind] # This branch handles the cross-validation cases where the indices are # integer indices and we know the forecast length will match the requested # transform data length. else: fore = forecast[-len(y) :] fore.index = y.index return fore def _project_trend_and_seasonality(self, y, trend, seasonality, periodicity): """Function to project both trend and seasonality forward into the future.""" projected_trend = self._project_trend(y, trend, periodicity) projected_seasonality = self._project_seasonal( y, seasonality, periodicity, self.frequency, ) return projected_trend, projected_seasonality
[docs] def fit( self, X: pd.DataFrame, y: Union[pd.Series, pd.DataFrame] = None, ) -> STLDecomposer: """Fits the STLDecomposer and determine the seasonal signal. Instantiates a statsmodels STL decompose object with the component's stored parameters and fits it. Since the statsmodels object does not fit the sklearn api, it is not saved during __init__() in _component_obj and will be re-instantiated each time fit is called. To emulate the sklearn API, when the STL decomposer is fit, the full seasonal component, a single period sample of the seasonal component, the full trend-cycle component and the residual are saved. y(t) = S(t) + T(t) + R(t) Args: X (pd.DataFrame, optional): Conditionally used to build datetime index. y (pd.Series or pd.DataFrame): Target variable to detrend and deseasonalize. Returns: self Raises: ValueError: If y is None. ValueError: If target data doesn't have DatetimeIndex AND no Datetime features in features data """ from evalml.pipelines.utils import unstack_multiseries # Warn for poor decomposition use with higher seasonal smoothers if self.seasonal_smoother > 14: self.logger.warning( f"STLDecomposer may perform poorly on data with a high seasonal smoother ({self.seasonal_smoother}).", ) # If y is a stacked pd.Series, unstack it if self.series_id is not None and isinstance(y, pd.Series): X, y = unstack_multiseries(X, y, self.series_id, self.time_index, y.name) if isinstance(y, pd.Series): y = y.to_frame() self.original_index = y.index if y is not None else None X, y = self._check_target(X, y) self._map_dt_to_integer(self.original_index, y.index) # Save the frequency of the fitted series for checking against transform data. self.frequency = y.index.freqstr or pd.infer_freq(y.index) # Iterate through each id group self.seasonals = {} self.seasonalities = {} self.trends = {} self.residuals = {} if self.periods is None: self.periods = {} for id in y.columns: series_y = y[id] # Determine the period of the seasonal component if id not in self.periods: # If the user provides a period for single series, use that period = ( self.period if len(y.columns) == 1 and self.period is not None else self.determine_periodicity(X, series_y) ) self.periods[id] = period stl = STL( series_y, seasonal=self.seasonal_smoother, period=self.periods[id], ) res = stl.fit() self.seasonals[id] = res.seasonal self.periods[id] = stl.period dist = len(series_y) % stl.period seasonality = ( res.seasonal[-(dist + stl.period) : -dist] if dist > 0 else res.seasonal[-stl.period :] ) self.seasonalities[id] = seasonality self.trends[id] = res.trend self.residuals[id] = res.resid self.update_parameters({"periods": self.periods}) return self
[docs] def transform( self, X: pd.DataFrame, y: Union[pd.Series, pd.DataFrame] = None, ) -> Union[tuple[pd.DataFrame, pd.Series], tuple[pd.DataFrame, pd.DataFrame]]: """Transforms the target data by removing the STL trend and seasonality. Uses an ARIMA model to project forward the addititve trend and removes it. Then, utilizes the first period's worth of seasonal data determined in the .fit() function to extrapolate the seasonal signal of the data to be transformed. This seasonal signal is also assumed to be additive and is removed. Args: X (pd.DataFrame, optional): Conditionally used to build datetime index. y (pd.Series or pd.DataFrame): Target variable to detrend and deseasonalize. Returns: (Single series) pd.DataFrame, pd.Series: The list of input features are returned without modification. The target variable y is detrended and deseasonalized. (Multi series) pd.DataFrame, pd.DataFrame: The list of input features are returned without modification. The target variable y is detrended and deseasonalized. Raises: ValueError: If target data doesn't have DatetimeIndex AND no Datetime features in features data """ from evalml.pipelines.utils import unstack_multiseries if y is None: return X, y # If y is a stacked pd.Series, unstack it if self.series_id is not None and isinstance(y, pd.Series): X, y = unstack_multiseries(X, y, self.series_id, self.time_index, y.name) if isinstance(y, pd.Series): y = y.to_frame() original_index = y.index X, y = self._check_target(X, y) self._check_oos_past(y) detrending_list = [] # Iterate through each id group for id in y.columns: series_y = y[id] if len(y.columns) > 1: seasonality = self.seasonalities[id] trend = self.trends[id] residual = self.residuals[id] period = self.periods[id] else: seasonality = list(self.seasonalities.values())[0] trend = list(self.trends.values())[0] residual = list(self.residuals.values())[0] period = list(self.periods.values())[0] y_in_sample = pd.Series([]) y_out_of_sample = pd.Series([]) # For partially and wholly in-sample data, retrieve stored results. if trend.index[0] <= series_y.index[0] <= trend.index[-1]: y_in_sample = residual[series_y.index[0] : series_y.index[-1]] # For out of sample data.... if series_y.index[-1] > trend.index[-1]: try: # ...that is partially out of sample and partially in sample. truncated_y = series_y[ series_y.index.get_loc(trend.index[-1]) + 1 : ] except KeyError: # ...that is entirely out of sample. truncated_y = series_y ( projected_trend, projected_seasonality, ) = self._project_trend_and_seasonality( truncated_y, trend, seasonality, period, ) y_out_of_sample = infer_feature_types( pd.Series( truncated_y - projected_trend - projected_seasonality, index=truncated_y.index, ), ) y_t = pd.concat([y_in_sample, y_out_of_sample]) y_t.index = original_index # If it is a single series time series, return tuple[pd.DataFrame, pd.Series] if len(y.columns) <= 1: return X, y_t detrending_list.append(y_t) # Convert the list to a DataFrame # For multiseries, return tuple[pd.DataFrame, pd.Dataframe] where each column is a series_id detrending_df = pd.DataFrame(detrending_list).T detrending_df.columns = y.columns return X, detrending_df
[docs] def inverse_transform( self, y_t: Union[pd.Series, pd.DataFrame], ) -> Union[pd.Series, pd.DataFrame]: """Adds back fitted trend and seasonality to target variable. The STL trend is projected to cover the entire requested target range, then added back into the signal. Then, the seasonality is projected forward to and added back into the signal. Args: y_t (pd.Series or pd.DataFrame): Target variable. Returns: pd.Series or pd.DataFrame: The target variable y with the trend and seasonality added back in. Raises: ValueError: If y is None. """ if y_t is None: raise ValueError("y_t cannot be None for Decomposer!") original_index = y_t.index y_t = infer_feature_types(y_t) self._check_oos_past(y_t) if isinstance(y_t, pd.Series): y_t = y_t.to_frame() index = self._choose_proper_index(y_t) y = [] for id in y_t.columns: y_in_sample = pd.Series([]) y_out_of_sample = pd.Series([]) series_y = y_t[id] if len(y_t.columns) > 1: old_trend = self.trends[id] old_seasonal = self.seasonals[id] old_seasonality = self.seasonalities[id] period = self.periods[id] else: old_trend = list(self.trends.values())[0] old_seasonal = list(self.seasonals.values())[0] old_seasonality = list(self.seasonalities.values())[0] period = list(self.periods.values())[0] # For partially and wholly in-sample data, retrieve stored results. if index[0] <= series_y.index[0] <= index[-1]: left_index = series_y.index[0] right_index = ( series_y.index[-1] + 1 if isinstance(series_y.index, pd.RangeIndex) or series_y.index.is_numeric() else series_y.index[-1] + 1 * series_y.index.freq ) trend = ( old_trend.reset_index(drop=True)[left_index:right_index] if isinstance(series_y.index, pd.RangeIndex) or series_y.index.is_numeric() else old_trend[left_index:right_index] ) seasonal = ( old_seasonal.reset_index(drop=True)[left_index:right_index] if isinstance(series_y.index, pd.RangeIndex) or series_y.index.is_numeric() else old_seasonal[left_index:right_index] ) y_in_sample = series_y + trend + seasonal y_in_sample = y_in_sample.dropna() # For out of sample data.... if series_y.index[-1] > index[-1]: try: # ...that is partially out of sample and partially in sample. truncated_y_t = series_y[series_y.index.get_loc(index[-1]) + 1 :] except KeyError: # ...that is entirely out of sample. truncated_y_t = series_y ( projected_trend, projected_seasonality, ) = self._project_trend_and_seasonality( truncated_y_t, old_trend, old_seasonality, period, ) y_out_of_sample = infer_feature_types( pd.Series( truncated_y_t + projected_trend + projected_seasonality, index=truncated_y_t.index, ), ) y_series = pd.concat([y_in_sample, y_out_of_sample]) # If it is a single series time series, return tuple[pd.DataFrame, pd.Series] if len(y_t.columns) <= 1: y_series.index = original_index return y_series y.append(y_series) y_df = pd.DataFrame(y).T y_df.index = original_index y_df.columns = y_t.columns return y_df
[docs] def get_trend_dataframe(self, X, y): """Return a list of dataframes with 4 columns: signal, trend, seasonality, residual. Args: X (pd.DataFrame): Input data with time series data in index. y (pd.Series or pd.DataFrame): Target variable data provided as a Series for univariate problems or a DataFrame for multivariate problems. Returns: (Single series) list of pd.DataFrame: Each DataFrame contains the columns "signal", "trend", "seasonality" and "residual," with the latter 3 column values being the decomposed elements of the target data. The "signal" column is simply the input target signal but reindexed with a datetime index to match the input features. (Multi series) dictionary of lists: Series id maps to a list of pd.DataFrames that each contain the columns "signal", "trend", "seasonality" and "residual" Raises: TypeError: If X does not have time-series data in the index. ValueError: If time series index of X does not have an inferred frequency. ValueError: If the forecaster associated with the detrender has not been fit yet. TypeError: If y is not provided as a pandas Series or DataFrame. """ X = infer_feature_types(X) if not isinstance(X.index, pd.DatetimeIndex) and not isinstance( y.index, pd.DatetimeIndex, ): raise TypeError("Provided X or y should have datetimes in the index.") # Change the y index to a matching datetimeindex or else we get a failure # in ForecastingHorizon during decomposition. if not isinstance(y.index, pd.DatetimeIndex): y = self._set_time_index(X, y) if not isinstance(X.index, pd.DatetimeIndex): X.index = y.index self._check_oos_past(y) def _decompose_target(X, y, fh): """Function to generate a single DataFrame with trend, seasonality and residual components.""" if isinstance(y, pd.Series): y = y.to_frame() if all( len(y.index) == len(self.trends[id].index) and all( y.index == self.trends[id].index, ) for id in y.columns ): # TODO: Do a better job cloning. decomposer = STLDecomposer( seasonal_smoother=self.seasonal_smoother, periods=self.periods, ) decomposer.fit(X, y) trend = decomposer.trends seasonal = decomposer.seasonals residual = decomposer.residuals else: trend = self.trends seasonal = self.seasonals residual = self.residuals result_dict = {} for id in y.columns: df = pd.DataFrame( { "signal": y[id], "trend": trend[id], "seasonality": seasonal[id], "residual": residual[id], }, ) if len(y.columns) == 1: return [df] else: result_dict[id] = [df] return result_dict return _decompose_target(X, y, None)
[docs] def get_trend_prediction_intervals(self, y, coverage=None): """Calculate the prediction intervals for the trend data. Args: y (pd.Series or pd.DataFrame): Target data. coverage (list[float]): A list of floats between the values 0 and 1 that the upper and lower bounds of the prediction interval should be calculated for. Returns: (Single series) dict of pd.Series: Prediction intervals, keys are in the format {coverage}_lower or {coverage}_upper. (Multi series) dict of dict of pd.Series: Each series id maps to a dictionary of prediction intervals """ if isinstance(y, pd.Series): y = y.to_frame() if coverage is None: coverage = [0.95] self._check_oos_past(y) series_results = {} for id in y.columns: y_series = y[id] alphas = [1 - val for val in coverage] if len(y.columns) > 1: trend = self.trends[id] seasonality = self.seasonalities[id] period = self.periods[id] else: trend = list(self.trends.values())[0] seasonality = list(self.seasonalities.values())[0] period = list(self.periods.values())[0] if not self.forecast_summary or len(y_series) != len( self.forecast_summary.predicted_mean, ): self._project_trend_and_seasonality( y_series, trend, seasonality, period, ) prediction_interval_result = {} for i, alpha in enumerate(alphas): result = self.forecast_summary.summary_frame(alpha=alpha) overlapping_ind = [ind for ind in y_series.index if ind in result.index] intervals = pd.DataFrame( { "lower": result["mean_ci_lower"] - result["mean"], "upper": result["mean_ci_upper"] - result["mean"], }, ) if len(overlapping_ind) > 0: # y.index is datetime intervals = intervals.loc[overlapping_ind] else: # y.index is not datetime (e.g. int) intervals = intervals[-len(y_series) :] intervals.index = y_series.index prediction_interval_result[f"{coverage[i]}_lower"] = intervals["lower"] prediction_interval_result[f"{coverage[i]}_upper"] = intervals["upper"] series_results[id] = prediction_interval_result # only return the dictionary if single series if len(y.columns) <= 1: return prediction_interval_result return series_results