Source code for evalml.pipelines.components.estimators.regressors.arima_regressor

"""Autoregressive Integrated Moving Average Model. The three parameters (p, d, q) are the AR order, the degree of differencing, and the MA order. More information here: https://www.statsmodels.org/devel/generated/statsmodels.tsa.arima.model.ARIMA.html."""
from typing import Dict, Hashable, List, Optional, Tuple, Union

import numpy as np
import pandas as pd
from pandas.api.types import is_integer_dtype
from skopt.space import Integer
from sktime.forecasting.base import ForecastingHorizon

from evalml.model_family import ModelFamily
from evalml.pipelines.components.estimators import Estimator
from evalml.pipelines.components.utils import convert_bool_to_double, match_indices
from evalml.problem_types import ProblemTypes
from evalml.utils import (
    import_or_raise,
    infer_feature_types,
)


[docs]class ARIMARegressor(Estimator): """Autoregressive Integrated Moving Average Model. The three parameters (p, d, q) are the AR order, the degree of differencing, and the MA order. More information here: https://www.statsmodels.org/devel/generated/statsmodels.tsa.arima.model.ARIMA.html. Currently ARIMARegressor isn't supported via conda install. It's recommended that it be installed via PyPI. Args: time_index (str): Specifies the name of the column in X that provides the datetime objects. Defaults to None. trend (str): Controls the deterministic trend. Options are ['n', 'c', 't', 'ct'] where 'c' is a constant term, 't' indicates a linear trend, and 'ct' is both. Can also be an iterable when defining a polynomial, such as [1, 1, 0, 1]. start_p (int): Minimum Autoregressive order. Defaults to 2. d (int): Minimum Differencing degree. Defaults to 0. start_q (int): Minimum Moving Average order. Defaults to 2. max_p (int): Maximum Autoregressive order. Defaults to 5. max_d (int): Maximum Differencing degree. Defaults to 2. max_q (int): Maximum Moving Average order. Defaults to 5. seasonal (boolean): Whether to fit a seasonal model to ARIMA. Defaults to True. sp (int or str): Period for seasonal differencing, specifically the number of periods in each season. If "detect", this model will automatically detect this parameter (given the time series is a standard frequency) and will fall back to 1 (no seasonality) if it cannot be detected. Defaults to 1. n_jobs (int or None): Non-negative integer describing level of parallelism used for pipelines. Defaults to -1. random_seed (int): Seed for the random number generator. Defaults to 0. """ name = "ARIMA Regressor" hyperparameter_ranges = { "start_p": Integer(1, 3), "d": Integer(0, 2), "start_q": Integer(1, 3), "max_p": Integer(3, 10), "max_d": Integer(2, 5), "max_q": Integer(3, 10), "seasonal": [True, False], } """{ "start_p": Integer(1, 3), "d": Integer(0, 2), "start_q": Integer(1, 3), "max_p": Integer(3, 10), "max_d": Integer(2, 5), "max_q": Integer(3, 10), "seasonal": [True, False], }""" model_family = ModelFamily.ARIMA """ModelFamily.ARIMA""" supported_problem_types = [ProblemTypes.TIME_SERIES_REGRESSION] """[ProblemTypes.TIME_SERIES_REGRESSION]""" max_rows = 1000 max_cols = 7 def __init__( self, time_index: Optional[Hashable] = None, trend: Optional[str] = None, start_p: int = 2, d: int = 0, start_q: int = 2, max_p: int = 5, max_d: int = 2, max_q: int = 5, seasonal: bool = True, sp: int = 1, n_jobs: int = -1, random_seed: Union[int, float] = 0, maxiter: int = 10, use_covariates: bool = True, **kwargs, ): self.preds_95_upper = None self.preds_95_lower = None parameters = { "trend": trend, "start_p": start_p, "d": d, "start_q": start_q, "max_p": max_p, "max_d": max_d, "max_q": max_q, "seasonal": seasonal, "maxiter": maxiter, "n_jobs": n_jobs, } parameters.update(kwargs) arima_model_msg = ( "sktime is not installed. Please install using `pip install sktime.`" ) sktime_arima = import_or_raise( "sktime.forecasting.arima", error_msg=arima_model_msg, ) arima_model = sktime_arima.AutoARIMA(**parameters) parameters["use_covariates"] = use_covariates parameters["time_index"] = time_index self.sp = sp self.use_covariates = use_covariates super().__init__( parameters=parameters, component_obj=arima_model, random_seed=random_seed, ) def _remove_datetime( self, data: pd.DataFrame, features: bool = False, ) -> pd.DataFrame: if data is None: return None data_no_dt = data.ww.copy() if isinstance( data_no_dt.index, (pd.DatetimeIndex, pd.PeriodIndex, pd.IntervalIndex), ): data_no_dt = data_no_dt.ww.reset_index(drop=True) if features: data_no_dt = data_no_dt.ww.select(exclude=["Datetime"]) return data_no_dt def _set_forecast(self, X: pd.DataFrame): # we can only calculate the difference if the indices are of the same type units_diff = 1 if isinstance(X.index[0], type(self.last_X_index)) and isinstance( X.index, pd.DatetimeIndex, ): dates_diff = pd.date_range( start=self.last_X_index, end=X.index[0], freq=X.index.freq, ) units_diff = len(dates_diff) - 1 elif is_integer_dtype(type(X.index[0])) and is_integer_dtype( type(self.last_X_index), ): units_diff = X.index[0] - self.last_X_index fh_ = ForecastingHorizon( [units_diff + i for i in range(len(X))], is_relative=True, ) return fh_ def _get_sp(self, X: pd.DataFrame) -> int: if X is None: return 1 freq_mappings = { "D": 7, "M": 12, "Q": 4, } time_index = self._parameters.get("time_index", None) sp = self.sp if sp == "detect": inferred_freqs = X.ww.infer_temporal_frequencies() freq = inferred_freqs.get(time_index, None) sp = 1 if freq is not None: sp = freq_mappings.get(freq[:1], 1) return sp
[docs] def fit(self, X: pd.DataFrame, y: Optional[pd.Series] = None): """Fits ARIMA regressor to data. Args: X (pd.DataFrame): The input training data of shape [n_samples, n_features]. y (pd.Series): The target training data of length [n_samples]. Returns: self Raises: ValueError: If y was not passed in. """ X, y = self._manage_woodwork(X, y) if X is not None: X = X.ww.fillna(X.mean()) if y is None: raise ValueError("ARIMA Regressor requires y as input.") sp = self._get_sp(X) self._component_obj.sp = sp self.last_X_index = X.index[-1] if X is not None else y.index[-1] X = self._remove_datetime(X, features=True) if X is not None: X = convert_bool_to_double(X) y = self._remove_datetime(y) X, y = match_indices(X, y) if X is not None and not X.empty and self.use_covariates: self._component_obj.fit(y=y, X=X) else: self._component_obj.fit(y=y) return self
def _manage_types_and_forecast(self, X: pd.DataFrame) -> tuple: fh_ = self._set_forecast(X) X = X.ww.select(exclude=["Datetime"]) X = convert_bool_to_double(X) return X, fh_ @staticmethod def _parse_prediction_intervals( y_pred_intervals: pd.DataFrame, conf_int: float, ) -> Tuple[pd.Series, pd.Series]: coverage_name = y_pred_intervals.columns[0][0] preds_lower = y_pred_intervals.loc(axis=1)[(coverage_name, conf_int, "lower")] preds_upper = y_pred_intervals.loc(axis=1)[(coverage_name, conf_int, "upper")] preds_lower.name = None preds_upper.name = None return preds_lower, preds_upper
[docs] def predict(self, X: pd.DataFrame, y: Optional[pd.Series] = None) -> pd.Series: """Make predictions using fitted ARIMA regressor. Args: X (pd.DataFrame): Data of shape [n_samples, n_features]. y (pd.Series): Target data. Returns: pd.Series: Predicted values. Raises: ValueError: If X was passed to `fit` but not passed in `predict`. """ X, y = self._manage_woodwork(X, y) X, fh_ = self._manage_types_and_forecast(X=X) if not X.empty and self.use_covariates: if fh_[0] != 1: # pmdarima (which sktime uses under the hood) only forecasts off the training data # but sktime circumvents this by predicting everything from the end of training data to the date / periods requested # and only returning the values for dates / periods given to sktime. Because of this, # pmdarima requires the number of covariate rows to equal the length of the total number of periods (X.shape[0] == fh_[-1]) if covariates are used. # We circument this by adding arbitrary rows to the start of X since sktime discards these values when predicting. num_rows_diff = fh_[-1] - X.shape[0] filler = pd.DataFrame( columns=X.columns, index=range(num_rows_diff), ).fillna(0) X_ = pd.concat([filler, X], ignore_index=True) X_.ww.init(schema=X.ww.schema) else: X_ = X y_pred_intervals = self._component_obj.predict_interval( fh=fh_, X=X_, coverage=[0.95], ) else: y_pred_intervals = self._component_obj.predict_interval( fh=fh_, coverage=[0.95], ) y_pred_intervals.index = X.index ( self.preds_95_lower, self.preds_95_upper, ) = ARIMARegressor._parse_prediction_intervals(y_pred_intervals, 0.95) y_pred = pd.concat((self.preds_95_lower, self.preds_95_upper), axis=1).mean( axis=1, ) return infer_feature_types(y_pred)
[docs] def get_prediction_intervals( self, X: pd.DataFrame, y: pd.Series = None, coverage: List[float] = None, predictions: pd.Series = None, ) -> Dict[str, pd.Series]: """Find the prediction intervals using the fitted ARIMARegressor. Args: X (pd.DataFrame): Data of shape [n_samples, n_features]. y (pd.Series): Target data. Optional. coverage (list[float]): A list of floats between the values 0 and 1 that the upper and lower bounds of the prediction interval should be calculated for. predictions (pd.Series): Not used for ARIMA regressor. Returns: dict: Prediction intervals, keys are in the format {coverage}_lower or {coverage}_upper. """ if coverage is None: coverage = [0.95] X, y = self._manage_woodwork(X, y) X, fh_ = self._manage_types_and_forecast(X=X) prediction_interval_result = {} if not X.empty and self.use_covariates: y_pred_intervals = self._component_obj.predict_interval( fh=fh_, X=X, coverage=coverage, ) else: y_pred_intervals = self._component_obj.predict_interval( fh=fh_, coverage=coverage, ) y_pred_intervals.index = X.index for conf_int in coverage: if ( conf_int == 0.95 and self.preds_95_lower is not None and self.preds_95_upper is not None ): prediction_interval_result[f"{conf_int}_lower"] = self.preds_95_lower prediction_interval_result[f"{conf_int}_upper"] = self.preds_95_upper continue preds_lower, preds_upper = ARIMARegressor._parse_prediction_intervals( y_pred_intervals, conf_int, ) prediction_interval_result[f"{conf_int}_lower"] = preds_lower prediction_interval_result[f"{conf_int}_upper"] = preds_upper return prediction_interval_result
@property def feature_importance(self) -> np.ndarray: """Returns array of 0's with a length of 1 as feature_importance is not defined for ARIMA regressor.""" return np.zeros(1)