"""A Future-like wrapper around jobs created by the DaskEngine."""importjoblibfromdask.distributedimportClient,LocalClusterfromevalml.automl.engine.engine_baseimport(EngineBase,EngineComputation,evaluate_pipeline,score_pipeline,train_pipeline,)
[docs]classDaskComputation(EngineComputation):"""A Future-like wrapper around jobs created by the DaskEngine. Args: dask_future (callable): Computation to do. """def__init__(self,dask_future):self.work=dask_futureself.meta_data={}
[docs]defdone(self):"""Returns whether the computation is done."""returnself.work.done()
[docs]defget_result(self):"""Gets the computation result. Will block until the computation is finished. Raises: Exception: If computation fails. Returns traceback. Returns: Computation results. """returnself.work.result()
[docs]defcancel(self):"""Cancel the current computation."""returnself.work.cancel()
@propertydefis_cancelled(self):"""Returns whether computation was cancelled."""returnself.work.status
[docs]classDaskEngine(EngineBase):"""The dask engine. Args: cluster (None or dd.Client): If None, creates a local, threaded Dask client for processing. Defaults to None. """def__init__(self,cluster=None):ifclusterisnotNoneandnotisinstance(cluster,(LocalCluster)):raiseTypeError(f"Expected dask.distributed.Client, received {type(cluster)}",)elifclusterisNone:cluster=LocalCluster(processes=False)self.cluster=clusterself.client=Client(self.cluster)self._data_futures_cache={}def__enter__(self):"""Enter runtime context."""returnselfdef__exit__(self,exc_type,exc_val,exc_tb):"""Exit runtime context."""self.close()
[docs]defsend_data_to_cluster(self,X,y):"""Send data to the cluster. The implementation uses caching so the data is only sent once. This follows dask best practices. Args: X (pd.DataFrame): Input data for modeling. y (pd.Series): Target data for modeling. Returns: dask.Future: The modeling data. """data_hash=joblib.hash(X),joblib.hash(y)ifdata_hashinself._data_futures_cache:X_future,y_future=self._data_futures_cache[data_hash]ifnot(X_future.cancelled()ory_future.cancelled()):returnX_future,y_futureself._data_futures_cache[data_hash]=self.client.scatter([X,y],broadcast=True,)returnself._data_futures_cache[data_hash]
[docs]defsubmit_evaluation_job(self,automl_config,pipeline,X,y,X_holdout=None,y_holdout=None,):"""Send evaluation job to cluster. Args: automl_config: Structure containing data passed from AutoMLSearch instance. pipeline (pipeline.PipelineBase): Pipeline to evaluate. X (pd.DataFrame): Input data for modeling. y (pd.Series): Target data for modeling. X_holdout (pd.Series): Holdout input data for holdout scoring. y_holdout (pd.Series): Holdout target data for holdout scoring. Returns: DaskComputation: An object wrapping a reference to a future-like computation occurring in the dask cluster. """logger=self.setup_job_log()X,y=self.send_data_to_cluster(X,y)dask_future=self.client.submit(evaluate_pipeline,pipeline=pipeline,automl_config=automl_config,X=X,y=y,X_holdout=X_holdout,y_holdout=y_holdout,logger=logger,)returnDaskComputation(dask_future)
[docs]defsubmit_training_job(self,automl_config,pipeline,X,y):"""Send training job to cluster. Args: automl_config: Structure containing data passed from AutoMLSearch instance. pipeline (pipeline.PipelineBase): Pipeline to train. X (pd.DataFrame): Input data for modeling. y (pd.Series): Target data for modeling. Returns: DaskComputation: An object wrapping a reference to a future-like computation occurring in the dask cluster. """X,y=self.send_data_to_cluster(X,y)dask_future=self.client.submit(train_pipeline,pipeline=pipeline,X=X,y=y,automl_config=automl_config,)returnDaskComputation(dask_future)
[docs]defsubmit_scoring_job(self,automl_config,pipeline,X,y,objectives,X_train=None,y_train=None,):"""Send scoring job to cluster. Args: automl_config: Structure containing data passed from AutoMLSearch instance. pipeline (pipeline.PipelineBase): Pipeline to train. X (pd.DataFrame): Input data for modeling. y (pd.Series): Target data for modeling. X_train (pd.DataFrame): Training features. Used for feature engineering in time series. y_train (pd.Series): Training target. Used for feature engineering in time series. objectives (list[ObjectiveBase]): List of objectives to score on. Returns: DaskComputation: An object wrapping a reference to a future-like computation occurring in the dask cluster. """# Get the schema before we lose itX_schema=X.ww.schemay_schema=y.ww.schemaX,y=self.send_data_to_cluster(X,y)X_train,y_train=self.send_data_to_cluster(X_train,y_train)dask_future=self.client.submit(score_pipeline,pipeline=pipeline,X=X,y=y,objectives=objectives,X_schema=X_schema,y_schema=y_schema,X_train=X_train,y_train=y_train,)computation=DaskComputation(dask_future)computation.meta_data["pipeline_name"]=pipeline.namereturncomputation
[docs]defclose(self):"""Closes the underlying cluster."""# TODO: Might want to rethink this if using something other than a LocalCluster.self.cluster.close()self.client.close()
@propertydefis_closed(self):"""Property that determines whether the Engine's Client's resources are shutdown."""returnself.cluster.status.value=="closed"