Source code for evalml.pipelines.components.transformers.preprocessing.decomposer
"""Component that removes trends from time series and returns the decomposed components."""from__future__importannotationsimportrefromabcimportabstractmethodfromtypingimportUnionimportmatplotlib.pyplotaspltimportnumpyasnpimportpandasaspdimportstatsmodels.apiassmfromscipy.signalimportargrelextremafromevalml.pipelines.components.transformers.transformerimportTransformerfromevalml.utilsimportget_time_index,infer_feature_types
[docs]classDecomposer(Transformer):"""Component that removes trends and seasonality from time series and returns the decomposed components. Args: parameters (dict): Dictionary of parameters to pass to component object. component_obj (class) : Instance of a detrender/deseasonalizer class. random_seed (int): Seed for the random number generator. Defaults to 0. degree (int) : Currently the degree of the PolynomialDecomposer, not used for STLDecomposer. period (int) : The best guess, in units, for the period of the seasonal signal. seasonal_smoother (int): The seasonal smoothing parameter for STLDecomposer, not used for PolynomialDecomposer. time_index (str) : The column name of the feature matrix (X) that the datetime information should be pulled from. """name="Decomposer"hyperparameter_ranges=Nonemodifies_features=Falsemodifies_target=Trueneeds_fitting=Trueinvalid_frequencies=[]# Incompatibility: https://github.com/alteryx/evalml/issues/4103# TODO: Remove when support is added https://github.com/pandas-dev/pandas/issues/52127_integer_nullable_incompatibilities=["y"]def__init__(self,component_obj=None,random_seed:int=0,degree:int=1,period:int=-1,seasonal_smoother:int=7,time_index:str=None,**kwargs,):degree=self._raise_typeerror_if_not_int("degree",degree)self.seasonal_smoother=self._raise_typeerror_if_not_int("seasonal_smoother",seasonal_smoother,)self.period=periodself.time_index=time_indexparameters={"degree":degree,"period":period,"seasonal_smoother":self.seasonal_smoother,"time_index":time_index,}parameters.update(kwargs)super().__init__(parameters=parameters,component_obj=component_obj,random_seed=random_seed,**kwargs,)def_raise_typeerror_if_not_int(self,var_name:str,var_value):ifnotisinstance(var_value,int):ifisinstance(var_value,float)andvar_value.is_integer():var_value=int(var_value)else:raiseTypeError(f"Parameter {var_name} must be an integer!: Received {type(var_value).__name__}",)returnvar_valuedef_set_time_index(self,X:pd.DataFrame,y:pd.Series):"""Ensures that target data has a pandas.DatetimeIndex that matches feature data."""dt_df=infer_feature_types(X)time_index_name=self.time_indexorself.parameters.get("time_index",None)time_index=get_time_index(dt_df,y,time_index_name)returny.set_axis(time_index)
[docs]deffit_transform(self,X:pd.DataFrame,y:pd.Series=None,)->tuple[pd.DataFrame,pd.Series]:"""Removes fitted trend and seasonality from target variable. Args: X (pd.DataFrame, optional): Ignored. y (pd.Series): Target variable to detrend and deseasonalize. Returns: tuple of pd.DataFrame, pd.Series: The first element are the input features returned without modification. The second element is the target variable y with the fitted trend removed. """returnself.fit(X,y).transform(X,y)
[docs]@classmethoddefis_freq_valid(cls,freq:str):"""Determines if the given string represents a valid frequency for this decomposer. Args: freq (str): A frequency to validate. See the pandas docs at https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases for options. Returns: boolean representing whether the frequency is valid or not. """match=re.match(r"(^\d+)?([A-Z]+)-?([A-Z]+)?",freq)_,freq,_=match.groups()returnfreqnotincls.invalid_frequencies
[docs]@abstractmethoddefget_trend_dataframe(self,y:pd.Series):"""Return a list of dataframes, each with 3 columns: trend, seasonality, residual."""
[docs]@abstractmethoddefinverse_transform(self,y:pd.Series):"""Add the trend + seasonality back to y."""
[docs]@classmethoddefdetermine_periodicity(cls,X:pd.DataFrame,y:pd.Series,acf_threshold:float=0.01,rel_max_order:int=5,):"""Function that uses autocorrelative methods to determine the likely most signficant period of the seasonal signal. Args: X (pandas.DataFrame): The feature data of the time series problem. y (pandas.Series): The target data of a time series problem. acf_threshold (float) : The threshold for the autocorrelation function to determine the period. Any values below the threshold are considered to be 0 and will not be considered for the period. Defaults to 0.01. rel_max_order (int) : The order of the relative maximum to determine the period. Defaults to 5. Returns: int: The integer number of entries in time series data over which the seasonal part of the target data repeats. If the time series data is in days, then this is the number of days that it takes the target's seasonal signal to repeat. Note: the target data can contain multiple seasonal signals. This function will only return the stronger. E.g. if the target has both weekly and yearly seasonality, the function may return either "7" or "365", depending on which seasonality is more strongly autocorrelated. If no period is detected, returns None. """# Only need to handle nullable types on pandas < 2. Kept for backwards compatibility with pandas 1.x.ifint(pd.__version__.split(".")[0])<2:X,y=cls._handle_nullable_types(cls,X,y)def_get_rel_max_from_acf(y):"""Determines the relative maxima of the target's autocorrelation."""acf=sm.tsa.acf(y,nlags=np.maximum(400,len(y)))# Filter out small values to avoid picking up noisefilter_acf=[acf[i]if(acf[i]>acf_threshold)else0foriinrange(len(acf))]rel_max=argrelextrema(np.array(filter_acf),np.greater,order=rel_max_order,# considers `order` points on either side to determine rel max)[0]iflen(rel_max)==0:returnNonemax_acfs=[acf[i]foriinrel_max]returnrel_max[np.argmax(max_acfs)]def_detrend_on_fly(X,y):"""Uses a moving average to determine the target's trend and remove it."""# A larger moving average will be less likely to remove the seasonal signal# but we need to make sure we're not passing in a window that's larger than the datamoving_avg=min(51,len(y)//3)y_trend_estimate=y.rolling(moving_avg).mean().dropna()y_detrended=y-y_trend_estimatereturnround(y_detrended.dropna(),10,)# round to 10 decimal places to avoid floating point errors# Make the data more stationary by detrendingy_detrended=_detrend_on_fly(X,y)relative_maxima=_get_rel_max_from_acf(y_detrended)returnrelative_maxima
[docs]defset_period(self,X:pd.DataFrame,y:pd.Series,acf_threshold:float=0.01,rel_max_order:int=5,):"""Function to set the component's seasonal period based on the target's seasonality. Args: X (pandas.DataFrame): The feature data of the time series problem. y (pandas.Series): The target data of a time series problem. acf_threshold (float) : The threshold for the autocorrelation function to determine the period. Any values below the threshold are considered to be 0 and will not be considered for the period. Defaults to 0.01. rel_max_order (int) : The order of the relative maximum to determine the period. Defaults to 5. """self.period=self.determine_periodicity(X,y,acf_threshold,rel_max_order)self.update_parameters({"period":self.period})
def_check_oos_past(self,y):"""Function to check whether provided target data is out-of-sample and in the past."""index=self._choose_proper_index(y)ify.index[0]<index[0]:raiseValueError(f"STLDecomposer cannot transform/inverse transform data out of sample and before the data used"f"to fit the decomposer."f"\nRequested range: {str(y.index[0])}:{str(y.index[-1])}."f"\nSample range: {str(index[0])}:{str(index[-1])}.",)def_map_dt_to_integer(self,original_index,dt_index):"""Function to generate an initial mapping of integer indices to datetime indices."""# Set an initial mapping of integers <-> datetimes at fitifisinstance(original_index,pd.DatetimeIndex):int_index=pd.RangeIndex(len(original_index))# Standardize the integer index as a RangeIndex and use existing integer indiceselifisinstance(original_index,pd.RangeIndex)ororiginal_index.is_numeric():int_index=pd.RangeIndex(start=original_index[0],stop=original_index[-1]+1,)assertisinstance(dt_index,pd.DatetimeIndex)assertlen(original_index)==len(dt_index)self.in_sample_integer_index=int_indexself.in_sample_datetime_index=dt_indexdef_int_to_dt(self,integer_index_value):"""Function to convert an integer index value to a datetime value based on the mapping made during fit."""try:dt=self.in_sample_datetime_index[self.in_sample_integer_index.get_loc(integer_index_value)]exceptKeyError:more_than=integer_index_value-self.in_sample_integer_index[-1]dt=(self.in_sample_datetime_index.freq*more_than+self.in_sample_datetime_index[-1])returndtdef_convert_int_index_to_dt_index(self,integer_index):"""Function to convert an entire index full of integers to datetimes."""dts=[self._int_to_dt(integer)forintegerininteger_index]dt_index=pd.DatetimeIndex(dts,freq=self.frequency)returndt_indexdef_choose_proper_index(self,y):"""Function that provides support for targets with integer and datetime indices."""ifisinstance(y.index,pd.RangeIndex)ory.index.is_numeric():index=self.in_sample_integer_indexelifisinstance(y.index,pd.DatetimeIndex):index=self.in_sample_datetime_indexelse:raiseValueError(f"Decomposer doesn't support target data with index of type ({type(y.index)})",)returnindexdef_project_seasonal(self,y:pd.Series,periodic_signal:pd.Series,periodicity:pd.Series,frequency:str,):"""Projects the seasonal signal forward to cover the target data. Args: y (pandas.Series): Target data to be transformed periodic_signal (pandas.Series): Single period of the detected seasonal signal periodicity (int): Number of time units in a single cycle of the seasonal signal frequency (str): String representing the detected frequency of the time series data. Uses the same codes as the freqstr attribute of a pandas Series with DatetimeIndex. e.g. "D", "M", "Y" for day, month and year respectively See: https://pandas.pydata.org/docs/user_guide/timeseries.html#timeseries-offset-aliases See: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.to_timedelta.html Returns: pandas.Series: the seasonal signal extended to cover the target data to be transformed """index=self._choose_proper_index(y)# Determine where the seasonality startsifisinstance(y.index,pd.DatetimeIndex):transform_first_ind=(len(pd.date_range(start=index[0],end=y.index[0],freq=frequency))%periodicity-1)elifisinstance(y.index,pd.RangeIndex)ory.index.is_numeric():first_index_diff=y.index[0]-index[0]transform_first_ind=first_index_diff%periodicity# Cycle the sample of seasonal data so the transformed data's effective index is firstrotated_seasonal_sample=np.roll(periodic_signal.T.values,-transform_first_ind,)# Repeat the single, rotated period of seasonal data to cover the entirety of the data# to be transformed.seasonal=np.tile(rotated_seasonal_sample,len(y)//periodicity+1).T[:len(y)]# The extrapolated seasonal data will be too long, so truncate.# Add the date times back in.returnpd.Series(seasonal,index=y.index)
[docs]defplot_decomposition(self,X:pd.DataFrame,y:Union[pd.Series,pd.DataFrame],show:bool=False,)->Union[tuple[plt.Figure,list],dict[str,tuple[plt.Figure]]]:"""Plots the decomposition of the target signal. Args: X (pd.DataFrame): Input data with time series data in index. y (pd.Series or pd.DataFrame): Target variable data provided as a Series for univariate problems or a DataFrame for multivariate problems. show (bool): Whether to display the plot or not. Defaults to False. Returns: (Single series) matplotlib.pyplot.Figure, list[matplotlib.pyplot.Axes]: The figure and axes that have the decompositions plotted on them (Multi series) dict[str, (matplotlib.pyplot.Figure, list[matplotlib.pyplot.Axes])]: A dictionary that maps the series id to the figure and axes that have the decompositions plotted on them """ifisinstance(y,pd.Series):y=y.to_frame()plot_info={}ifself.frequencyandself.time_indexandlen(y.columns)>1:X.index=pd.DatetimeIndex(X[self.time_index],freq=self.frequency)decomposition_results=self.get_trend_dataframe(X,y)# Iterate through each series idforidiny.columns:fig,axs=plt.subplots(4)fig.set_size_inches(18.5,14.5)iflen(y.columns)>1:results=decomposition_results[id][0]else:results=decomposition_results[0]axs[0].plot(results["signal"],"r")axs[0].set_title("signal")axs[1].plot(results["trend"],"b")axs[1].set_title("trend")axs[2].plot(results["seasonality"],"g")axs[2].set_title("seasonality")axs[3].plot(results["residual"],"y")axs[3].set_title("residual")# If multiseries, return a dictionary of tuplesiflen(y.columns)>1:fig.suptitle("Decomposition for Series {}".format(id))plot_info[id]=(fig,axs)else:plot_info=(fig,axs)ifshow:# pragma: no coverplt.show()returnplot_info
def_check_target(self,X:pd.DataFrame,y:pd.Series):"""Function to ensure target is not None and has a pandas.DatetimeIndex."""ifyisNone:raiseValueError("y cannot be None for Decomposer!")# Change the y index to a matching datetimeindex or else we get a failure# in ForecastingHorizon during decomposition.ifnotisinstance(y.index,pd.DatetimeIndex):y=self._set_time_index(X,y)returnX,y