diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index c4097500f1..0e0ea9c892 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -790,7 +790,8 @@ def status(self) -> Dict[str, JobStatus]: connected_blocks = self.connected_blocks() for job_id in job_status: job_info = job_status[job_id] - if job_info.terminal and job_id not in connected_blocks: + if job_info.terminal and job_id not in connected_blocks and job_info.state != JobState.SCALED_IN: + logger.debug("Rewriting job %s from status %s to MISSING", job_id, job_info) job_status[job_id].state = JobState.MISSING if job_status[job_id].message is None: job_status[job_id].message = ( diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index cd7d0596a9..d61c76fed2 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -483,7 +483,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: # Check if there are tasks that could be sent to managers logger.debug( - "Managers count (interesting/total): {}/{}", + "Managers count (interesting/total): %d/%d", len(interesting_managers), len(self._ready_managers) ) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 34db2300f6..615f09de78 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -347,7 +347,10 @@ def scale_in_facade(self, n: int, max_idletime: Optional[float] = None) -> List[ if block_ids is not None: new_status = {} for block_id in block_ids: - new_status[block_id] = JobStatus(JobState.CANCELLED) - del self._status[block_id] + logger.debug("Marking block %s as SCALED_IN", block_id) + s = JobStatus(JobState.SCALED_IN) + new_status[block_id] = s + self._status[block_id] = s + self._simulated_status[block_id] = s self.send_monitoring_info(new_status) return block_ids diff --git a/parsl/jobs/states.py b/parsl/jobs/states.py index 7ba4aae94e..792a515bac 100644 --- a/parsl/jobs/states.py +++ b/parsl/jobs/states.py @@ -46,12 +46,17 @@ class JobState(IntEnum): bad worker environment or network connectivity issues. """ + SCALED_IN = 9 + """This job has been deliberately scaled in. Scaling code should not be concerned + that the job never ran (for example for error handling purposes). + """ + def __str__(self) -> str: return f"{self.__class__.__name__}.{self.name}" TERMINAL_STATES = [JobState.CANCELLED, JobState.COMPLETED, JobState.FAILED, - JobState.TIMEOUT, JobState.MISSING] + JobState.TIMEOUT, JobState.MISSING, JobState.SCALED_IN] class JobStatus: diff --git a/parsl/tests/test_htex/test_multiple_disconnected_blocks.py b/parsl/tests/test_htex/test_multiple_disconnected_blocks.py index 159c20f58d..4168f41b79 100644 --- a/parsl/tests/test_htex/test_multiple_disconnected_blocks.py +++ b/parsl/tests/test_htex/test_multiple_disconnected_blocks.py @@ -21,16 +21,14 @@ def local_config(): poll_period=100, max_workers_per_node=1, provider=LocalProvider( - worker_init="conda deactivate; export PATH=''; which python; exit 0", - init_blocks=2, - max_blocks=4, - min_blocks=0, + worker_init="exit 0", + init_blocks=2 ), ) ], run_dir="/tmp/test_htex", max_idletime=0.5, - strategy='htex_auto_scale', + strategy='none', ) diff --git a/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py b/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py index eb7a25003b..ada972e747 100644 --- a/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py +++ b/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py @@ -78,6 +78,6 @@ def test_row_counts(tmpd_cwd, strategy): (c, ) = result.first() assert c == 1, "There should be a single pending status" - result = connection.execute(text("SELECT COUNT(*) FROM block WHERE block_id = 0 AND status = 'CANCELLED' AND run_id = :run_id"), binds) + result = connection.execute(text("SELECT COUNT(*) FROM block WHERE block_id = 0 AND status = 'SCALED_IN' AND run_id = :run_id"), binds) (c, ) = result.first() assert c == 1, "There should be a single cancelled status" diff --git a/parsl/tests/test_scaling/test_regression_3568_scaledown_vs_MISSING.py b/parsl/tests/test_scaling/test_regression_3568_scaledown_vs_MISSING.py new file mode 100644 index 0000000000..a56b53af10 --- /dev/null +++ b/parsl/tests/test_scaling/test_regression_3568_scaledown_vs_MISSING.py @@ -0,0 +1,85 @@ +import time + +import pytest + +import parsl +from parsl.channels import LocalChannel +from parsl.config import Config +from parsl.executors import HighThroughputExecutor +from parsl.launchers import WrappedLauncher +from parsl.providers import LocalProvider + + +def local_config(): + # see the comments inside test_regression for reasoning about why each + # of these parameters is set why it is. + return Config( + max_idletime=1, + + strategy='htex_auto_scale', + strategy_period=1, + + executors=[ + HighThroughputExecutor( + label="htex_local", + encrypted=True, + provider=LocalProvider( + init_blocks=1, + min_blocks=0, + max_blocks=1, + launcher=WrappedLauncher(prepend="sleep inf ; "), + ), + ) + ], + ) + + +@parsl.python_app +def task(): + return 7 + + +@pytest.mark.local +def test_regression(try_assert): + # The above config means that we should start scaling out one initial + # block, but then scale it back in after a second or so if the executor + # is kept idle (which this test does using try_assert). + + # Because of 'sleep inf' in the WrappedLaucher, the block will not ever + # register. + + # The bug being tested is about mistreatment of blocks which are scaled in + # before they have a chance to register, and the above forces that to + # happen. + + # After that scaling in has happened, we should see that we have one block + # and it should be in a terminal state. The below try_assert waits for + # that to become true. + + # At that time, we should also see htex reporting no blocks registered - as + # mentioned above, that is a necessary part of the bug being tested here. + + # Give 10 strategy periods for the above to happen: each step of scale up, + # and scale down due to idleness isn't guaranteed to happen in exactly one + # scaling step. + + htex = parsl.dfk().executors['htex_local'] + + try_assert(lambda: len(htex.status_facade) == 1 and htex.status_facade['0'].terminal, + timeout_ms=10000) + + assert htex.connected_blocks() == [], "No block should have connected to interchange" + + # Now we can reconfigure the launcher to let subsequent blocks launch ok, + # and run a trivial task. That trivial task will scale up a new block and + # run the task successfully. + + # Prior to issue #3568, the bug was that the scale in of the first + # block earlier in the test case would have incorrectly been treated as a + # failure, and then the block error handler would have treated that failure + # as a permanent htex failure, and so the task execution below would raise + # a BadStateException rather than attempt to run the task. + + assert htex.provider.launcher.prepend != "", "Pre-req: prepend attribute should exist and be non-empty" + htex.provider.launcher.prepend = "" + assert task().result() == 7