Skip to content

Commit

Permalink
Add a flag to ensure that engine is started before it accepts task su…
Browse files Browse the repository at this point in the history
…bmission

* Updated tests to test for updated error string
* Tests that mock the executor are now updated to set `engine._engine_ready=True` for unit testing
* Removed `test_proc_pool_engine_not_started` which is made obsolete by `test_engine_start` that parametrizes over all engines.
  • Loading branch information
yadudoc committed Nov 13, 2024
1 parent 1cd2f0d commit 250be46
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 15 deletions.
2 changes: 2 additions & 0 deletions compute_endpoint/globus_compute_endpoint/engines/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def __init__(
self.results_passthrough: queue.Queue[dict[str, bytes | str | None]] = (
queue.Queue()
)
self._engine_ready: bool = False

@abstractmethod
def start(
Expand Down Expand Up @@ -251,6 +252,7 @@ def submit(
-------
future
"""
assert self._engine_ready, "Engine not started and cannot execute tasks"

if task_id not in self._retry_table:
self._retry_table[task_id] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ def start(
# Add executor to poller *after* executor has started
self.job_status_poller = JobStatusPoller(**self._job_status_kwargs)
self.job_status_poller.add_executors([self.executor])
self._engine_ready = True

def _submit(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def start(
self.set_working_dir(run_dir=run_dir)

self._status_report_thread.start()
self._engine_ready = True

def get_status_report(self) -> EPStatusReport:
assert self.executor, "The engine has not been started"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def start(
self.set_working_dir(run_dir=run_dir)
# mypy think the thread can be none
self._status_report_thread.start()
self._engine_ready = True

def get_status_report(self) -> EPStatusReport:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def mock_gce(tmp_path):
def test_success_after_1_fail(mock_gce, serde, ez_pack_task):
engine = mock_gce
engine.max_retries_on_system_failure = 2
engine._engine_ready = True
q = engine.results_passthrough
task_id = uuid.uuid1()
num = random.randint(1, 10000)
Expand All @@ -77,6 +78,7 @@ def test_repeated_fail(mock_gce, ez_pack_task):
fail_count = 2
engine = mock_gce
engine.max_retries_on_system_failure = fail_count
engine._engine_ready = True
q = engine.results_passthrough
task_id = uuid.uuid1()

Expand Down
45 changes: 30 additions & 15 deletions compute_endpoint/tests/unit/test_engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,30 @@ def test_result_message_packing(serde, task_uuid):
assert serde.deserialize(unpacked.data) == result


@pytest.mark.parametrize(
"engine_type", (ProcessPoolEngine, ThreadPoolEngine, GlobusComputeEngine)
)
def test_engine_start(
engine_type: GlobusComputeEngineBase, engine_runner, endpoint_uuid, tmp_path
):
"""Engine.submit should fail before engine is started"""

engine = engine_type()
assert not engine._engine_ready, "Engine should not be ready before start"

engine.executor = mock.Mock()
engine.executor.status_polling_interval = 0

# task submit should raise Exception if it was not started
with pytest.raises(AssertionError):
engine.submit(str(endpoint_uuid), b"", {})

engine.start(endpoint_id=endpoint_uuid, run_dir=tmp_path)
assert engine._engine_ready, "Engine should be ready after start"

engine.shutdown()


@pytest.mark.parametrize(
"engine_type", (ProcessPoolEngine, ThreadPoolEngine, GlobusComputeEngine)
)
Expand Down Expand Up @@ -147,20 +171,6 @@ def test_engine_submit_internal(
break


def test_proc_pool_engine_not_started(task_uuid, ez_pack_task):
engine = ProcessPoolEngine(max_workers=1)
task_bytes = ez_pack_task(double, 10)

with pytest.raises(AssertionError) as pyt_exc:
future = engine.submit(str(task_uuid), task_bytes, resource_specification={})
future.result()
assert "engine has not been started" in str(pyt_exc)

with pytest.raises(AssertionError):
engine.get_status_report()
assert "engine has not been started" in str(pyt_exc)


def test_gc_engine_system_failure(ez_pack_task, task_uuid, engine_runner):
"""Test behavior of engine failure killing task"""
engine = engine_runner(GlobusComputeEngine, max_retries_on_system_failure=0)
Expand Down Expand Up @@ -295,6 +305,7 @@ def test_gcengine_executor_exception_passthrough(randomstring):

def test_gcengine_bad_state_futures_failed_immediately(randomstring, task_uuid):
gce = GlobusComputeEngine(address="127.0.0.1")
gce._engine_ready = True
exc_text = randomstring()
gce.executor.set_bad_state_and_fail_all(ZeroDivisionError(exc_text))

Expand All @@ -310,6 +321,7 @@ def test_gcengine_bad_state_futures_failed_immediately(randomstring, task_uuid):

def test_gcengine_exception_report_from_bad_state(task_uuid):
gce = GlobusComputeEngine(address="127.0.0.1")
gce._engine_ready = True
gce.executor.set_bad_state_and_fail_all(ZeroDivisionError())

gce.submit(
Expand Down Expand Up @@ -343,7 +355,9 @@ def test_gcengine_rejects_mpi_mode(randomstring):

def test_gcengine_rejects_resource_specification(task_uuid):
with pytest.raises(ValueError) as pyt_exc:
GlobusComputeEngine(address="127.0.0.1").submit(
gce = GlobusComputeEngine(address="127.0.0.1")
gce._engine_ready = True
gce.submit(
str(task_uuid),
packed_task=b"packed_task",
resource_specification={"foo": "bar"},
Expand Down Expand Up @@ -374,6 +388,7 @@ def test_gcmpiengine_accepts_resource_specification(task_uuid, randomstring):
mock_ex.__name__ = "ClassName"
mock_ex.return_value = mock.Mock(launch_cmd="")
engine = GlobusMPIEngine(address="127.0.0.1")
engine._engine_ready = True
engine.submit(str(task_uuid), b"some task", resource_specification=spec)

assert engine.executor.submit.called, "Verify test: correct internal method invoked"
Expand Down

0 comments on commit 250be46

Please sign in to comment.