Source code for evalml.automl.engine.sequential_engine

from evalml.automl.engine.engine_base import (
    EngineBase,
    EngineComputation,
    evaluate_pipeline,
    score_pipeline,
    train_pipeline,
)
from evalml.objectives.utils import get_objective


[docs]class SequentialComputation(EngineComputation): """A Future-like api for jobs created by the SequentialEngine, an Engine that sequentially computes the submitted jobs. In order to separate the engine from the AutoMLSearch loop, we need the sequential computations to behave the same way as concurrent computations from AutoMLSearch's point-of-view. One way to do this is by delaying the computation in the sequential engine until get_result is called. Since AutoMLSearch will call get_result only when the computation is "done", by always returning True in done() we make sure that get_result is called in the order that the jobs are submitted. So the computations happen sequentially! Arguments: work (callable): Computation that should be done by the engine. """ def __init__(self, work, **kwargs): self.work = work self.kwargs = kwargs self.meta_data = {}
[docs] def done(self): """Whether the computation is done.""" return True
[docs] def get_result(self): """Gets the computation result. Will block until the computation is finished. Raises Exception: If computation fails. Returns traceback. """ return self.work(**self.kwargs)
[docs] def cancel(self): """Cancel the current computation."""
[docs]class SequentialEngine(EngineBase): """The default engine for the AutoML search. Trains and scores pipelines locally and sequentially."""
[docs] def submit_evaluation_job(self, automl_config, pipeline, X, y): logger = self.setup_job_log() return SequentialComputation( work=evaluate_pipeline, pipeline=pipeline, automl_config=automl_config, X=X, y=y, logger=logger, )
[docs] def submit_training_job(self, automl_config, pipeline, X, y): return SequentialComputation( work=train_pipeline, pipeline=pipeline, X=X, y=y, automl_config=automl_config, schema=False, )
[docs] def submit_scoring_job(self, automl_config, pipeline, X, y, objectives): objectives = [get_objective(o, return_instance=True) for o in objectives] computation = SequentialComputation( work=score_pipeline, pipeline=pipeline, X=X, y=y, objectives=objectives, X_schema=X.ww.schema, y_schema=y.ww.schema, ) computation.meta_data["pipeline_name"] = pipeline.name return computation
[docs] def close(self): pass