"""Custom CFClient API to match Dask's CFClient and allow context management."""fromconcurrent.futuresimportProcessPoolExecutor,ThreadPoolExecutorfromevalml.automl.engine.engine_baseimport(EngineBase,EngineComputation,evaluate_pipeline,score_pipeline,train_pipeline,)
[docs]classCFClient:"""Custom CFClient API to match Dask's CFClient and allow context management. Args: pool(cf.ThreadPoolExecutor or cf.ProcessPoolExecutor): The resource pool to execute the futures work on. """def__init__(self,pool):self.pool=pooldef__enter__(self):"""Enter runtime context."""returnselfdef__exit__(self,typ,value,traceback):"""Exit runtime context."""pass
[docs]defsubmit(self,*args,**kwargs):"""Pass through to imitate Dask's Client API."""returnself.pool.submit(*args,**kwargs)
[docs]defclose(self):"""Closes the underlying Executor."""self.pool.shutdown()
@propertydefis_closed(self):"""Property that determines whether the Engine's Client's resources are closed."""ifisinstance(self.pool,ProcessPoolExecutor):returnself.pool._shutdown_threadelifisinstance(self.pool,ThreadPoolExecutor):returnself.pool._shutdown
[docs]classCFComputation(EngineComputation):"""A Future-like wrapper around jobs created by the CFEngine. Args: future(cf.Future): The concurrent.futures.Future that is desired to be executed. """def__init__(self,future):self.work=futureself.meta_data={}
[docs]defdone(self):"""Returns whether the computation is done."""returnself.work.done()
[docs]defget_result(self):"""Gets the computation result. Will block until the computation is finished. Raises: Exception: If computation fails. Returns traceback. cf.TimeoutError: If computation takes longer than default timeout time. cf.CancelledError: If computation was canceled before completing. Returns: The result of the requested job. """returnself.work.result()
[docs]defcancel(self):"""Cancel the current computation. Returns: bool: False if the call is currently being executed or finished running and cannot be cancelled. True if the call can be canceled. """returnself.work.cancel()
@propertydefis_cancelled(self):"""Returns whether computation was cancelled."""returnself.work.cancelled()
[docs]classCFEngine(EngineBase):"""The concurrent.futures (CF) engine. Args: client (None or CFClient): If None, creates a threaded pool for processing. Defaults to None. """def__init__(self,client=None):ifclientisnotNoneandnotisinstance(client,CFClient):raiseTypeError(f"Expected evalml.automl.engine.cf_engine.CFClient, received {type(client)}",)elifclientisNone:client=CFClient(ThreadPoolExecutor())self.client=clientself._data_futures_cache={}
[docs]defsubmit_evaluation_job(self,automl_config,pipeline,X,y,X_holdout=None,y_holdout=None,):"""Send evaluation job to cluster. Args: automl_config: Structure containing data passed from AutoMLSearch instance. pipeline (pipeline.PipelineBase): Pipeline to evaluate. X (pd.DataFrame): Input data for modeling. y (pd.Series): Target data for modeling. X_holdout (pd.Series): Holdout input data for holdout scoring. y_holdout (pd.Series): Holdout target data for holdout scoring. Returns: CFComputation: An object wrapping a reference to a future-like computation occurring in the resource pool """logger=self.setup_job_log()future=self.client.submit(evaluate_pipeline,pipeline=pipeline,automl_config=automl_config,X=X,y=y,X_holdout=X_holdout,y_holdout=y_holdout,logger=logger,)returnCFComputation(future)
[docs]defsubmit_training_job(self,automl_config,pipeline,X,y):"""Send training job to cluster. Args: automl_config: Structure containing data passed from AutoMLSearch instance. pipeline (pipeline.PipelineBase): Pipeline to train. X (pd.DataFrame): Input data for modeling. y (pd.Series): Target data for modeling. Returns: CFComputation: An object wrapping a reference to a future-like computation occurring in the resource pool """future=self.client.submit(train_pipeline,pipeline=pipeline,X=X,y=y,automl_config=automl_config,)returnCFComputation(future)
[docs]defsubmit_scoring_job(self,automl_config,pipeline,X,y,objectives,X_train=None,y_train=None,):"""Send scoring job to cluster. Args: automl_config: Structure containing data passed from AutoMLSearch instance. pipeline (pipeline.PipelineBase): Pipeline to train. X (pd.DataFrame): Input data for modeling. y (pd.Series): Target data for modeling. X_train (pd.DataFrame): Training features. Used for feature engineering in time series. y_train (pd.Series): Training target. Used for feature engineering in time series. objectives (list[ObjectiveBase]): Objectives to score on. Returns: CFComputation: An object wrapping a reference to a future-like computation occurring in the resource pool. """# Get the schema before we lose itX_schema=X.ww.schemay_schema=y.ww.schemafuture=self.client.submit(score_pipeline,pipeline=pipeline,X=X,y=y,objectives=objectives,X_schema=X_schema,y_schema=y_schema,X_train=X_train,y_train=y_train,)computation=CFComputation(future)computation.meta_data["pipeline_name"]=pipeline.namereturncomputation
[docs]defclose(self):"""Function to properly shutdown the Engine's Client's resources."""self.client.close()
@propertydefis_closed(self):"""Property that determines whether the Engine's Client's resources are shutdown."""returnself.client.is_closed