Skip to content

Commit

Permalink
Add a flag to ensure that engine is started before it accepts task su…
Browse files Browse the repository at this point in the history
…bmission
  • Loading branch information
yadudoc committed Nov 13, 2024
1 parent f75edc6 commit 0e175de
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 0 deletions.
2 changes: 2 additions & 0 deletions compute_endpoint/globus_compute_endpoint/engines/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def __init__(
self.results_passthrough: queue.Queue[dict[str, bytes | str | None]] = (
queue.Queue()
)
self._engine_ready: bool = False

@abstractmethod
def start(
Expand Down Expand Up @@ -223,6 +224,7 @@ def submit(
-------
future
"""
assert self._engine_ready, "Engine not started and cannot execute tasks"

if task_id not in self._retry_table:
self._retry_table[task_id] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ def start(
# Add executor to poller *after* executor has started
self.job_status_poller = JobStatusPoller(**self._job_status_kwargs)
self.job_status_poller.add_executors([self.executor])
self._engine_ready = True

def _submit(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def start(
assert self.results_passthrough

self._status_report_thread.start()
self._engine_ready = True

def get_status_report(self) -> EPStatusReport:
assert self.executor, "The engine has not been started"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def start(

# mypy think the thread can be none
self._status_report_thread.start()
self._engine_ready = True

def get_status_report(self) -> EPStatusReport:
"""
Expand Down
24 changes: 24 additions & 0 deletions compute_endpoint/tests/unit/test_engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,30 @@ def test_result_message_packing(serde, task_uuid):
assert serde.deserialize(unpacked.data) == result


@pytest.mark.parametrize(
"engine_type", (ProcessPoolEngine, ThreadPoolEngine, GlobusComputeEngine)
)
def test_engine_start(
engine_type: GlobusComputeEngineBase, engine_runner, endpoint_uuid, tmp_path
):
"""Engine.submit should fail before engine is started"""

engine = engine_type()
assert not engine._engine_ready, "Engine should not be ready before start"

engine.executor = mock.Mock()
engine.executor.status_polling_interval = 0

# task submit should raise Exception if it was not started
with pytest.raises(AssertionError):
engine.submit(str(endpoint_uuid), b"", {})

engine.start(endpoint_id=endpoint_uuid, run_dir=tmp_path)
assert engine._engine_ready, "Engine should be ready after start"

engine.shutdown()


@pytest.mark.parametrize(
"engine_type", (ProcessPoolEngine, ThreadPoolEngine, GlobusComputeEngine)
)
Expand Down

0 comments on commit 0e175de

Please sign in to comment.