From 895240613d749c1290cdcacf704c1b70096c0a9a Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 26 Nov 2024 18:52:11 +0000 Subject: [PATCH 1/3] move execute_wait to parsl.utils --- parsl/channels/base.py | 27 +------------- parsl/channels/local/local.py | 35 ------------------- parsl/providers/cluster_provider.py | 3 +- parsl/providers/local/local.py | 10 +++--- .../tests/test_channels/test_large_output.py | 8 ++--- .../tests/test_channels/test_local_channel.py | 5 ++- parsl/utils.py | 35 +++++++++++++++++++ 7 files changed, 48 insertions(+), 75 deletions(-) diff --git a/parsl/channels/base.py b/parsl/channels/base.py index 05241b878d..d05ac561d7 100644 --- a/parsl/channels/base.py +++ b/parsl/channels/base.py @@ -1,5 +1,4 @@ -from abc import ABCMeta, abstractmethod, abstractproperty -from typing import Tuple +from abc import ABCMeta, abstractproperty class Channel(metaclass=ABCMeta): @@ -8,32 +7,8 @@ class Channel(metaclass=ABCMeta): For certain resources such as campus clusters or supercomputers at research laboratories, resource requirements may require authentication. - - The only remaining Channel, *LocalChannel*, executes commands locally in a - shell. - - Channels provide the ability to execute commands remotely, using the - execute_wait method, and manipulate the remote file system using methods - such as push_file, pull_file and makedirs. - - Channels should ensure that each launched command runs in a new process - group, so that providers (such as LocalProvider) which terminate long - running commands using process groups can do so. """ - @abstractmethod - def execute_wait(self, cmd: str, walltime: int = 0) -> Tuple[int, str, str]: - ''' Executes the cmd, with a defined walltime. - - Args: - - cmd (string): Command string to execute over the channel - - walltime (int) : Timeout in seconds - - Returns: - - (exit_code, stdout, stderr) (int, string, string) - ''' - pass - @abstractproperty def script_dir(self) -> str: ''' This is a property. Returns the directory assigned for storing all internal scripts such as diff --git a/parsl/channels/local/local.py b/parsl/channels/local/local.py index 6ef014ac19..4c1712ef30 100644 --- a/parsl/channels/local/local.py +++ b/parsl/channels/local/local.py @@ -1,6 +1,5 @@ import logging import os -import subprocess from parsl.channels.base import Channel from parsl.utils import RepresentationMixin @@ -21,40 +20,6 @@ def __init__(self): ''' self.script_dir = None - def execute_wait(self, cmd, walltime=None): - ''' Synchronously execute a commandline string on the shell. - - Args: - - cmd (string) : Commandline string to execute - - walltime (int) : walltime in seconds - - Returns: - - retcode : Return code from the execution - - stdout : stdout string - - stderr : stderr string - ''' - try: - logger.debug("Creating process with command '%s'", cmd) - proc = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True, - preexec_fn=os.setpgrp - ) - logger.debug("Created process with pid %s. Performing communicate", proc.pid) - (stdout, stderr) = proc.communicate(timeout=walltime) - retcode = proc.returncode - logger.debug("Process %s returned %s", proc.pid, proc.returncode) - - except Exception: - logger.exception(f"Execution of command failed:\n{cmd}") - raise - else: - logger.debug("Execution of command in process %s completed normally", proc.pid) - - return (retcode, stdout.decode("utf-8"), stderr.decode("utf-8")) - @property def script_dir(self): return self._script_dir diff --git a/parsl/providers/cluster_provider.py b/parsl/providers/cluster_provider.py index 6bc76bdf22..f2feb2ddf3 100644 --- a/parsl/providers/cluster_provider.py +++ b/parsl/providers/cluster_provider.py @@ -6,6 +6,7 @@ from parsl.launchers.errors import BadLauncher from parsl.providers.base import ExecutionProvider from parsl.providers.errors import SchedulerMissingArgs, ScriptPathError +from parsl.utils import execute_wait logger = logging.getLogger(__name__) @@ -76,7 +77,7 @@ def execute_wait(self, cmd, timeout=None): t = self.cmd_timeout if timeout is not None: t = timeout - return self.channel.execute_wait(cmd, t) + return execute_wait(cmd, t) def _write_submit_script(self, template, script_filename, job_name, configs): """Generate submit script and write it to a file. diff --git a/parsl/providers/local/local.py b/parsl/providers/local/local.py index 5ecf174df2..6357c85cba 100644 --- a/parsl/providers/local/local.py +++ b/parsl/providers/local/local.py @@ -11,7 +11,7 @@ ScriptPathError, SubmitException, ) -from parsl.utils import RepresentationMixin +from parsl.utils import RepresentationMixin, execute_wait logger = logging.getLogger(__name__) @@ -118,7 +118,7 @@ def status(self, job_ids): return [self.resources[jid]['status'] for jid in job_ids] def _is_alive(self, job_dict): - retcode, stdout, stderr = self.channel.execute_wait( + retcode, stdout, stderr = execute_wait( 'ps -p {} > /dev/null 2> /dev/null; echo "STATUS:$?" '.format( job_dict['remote_pid']), self.cmd_timeout) for line in stdout.split('\n'): @@ -223,11 +223,11 @@ def submit(self, command, tasks_per_node, job_name="parsl.localprovider"): # cancel the task later. # # We need to do the >/dev/null 2>&1 so that bash closes stdout, otherwise - # channel.execute_wait hangs reading the process stdout until all the + # execute_wait hangs reading the process stdout until all the # background commands complete. cmd = '/bin/bash -c \'echo - >{0}.ec && {{ {{ bash {0} 1>{0}.out 2>{0}.err ; ' \ 'echo $? > {0}.ec ; }} >/dev/null 2>&1 & echo "PID:$!" ; }}\''.format(script_path) - retcode, stdout, stderr = self.channel.execute_wait(cmd, self.cmd_timeout) + retcode, stdout, stderr = execute_wait(cmd, self.cmd_timeout) if retcode != 0: raise SubmitException(job_name, "Launch command exited with code {0}".format(retcode), stdout, stderr) @@ -258,7 +258,7 @@ def cancel(self, job_ids): job_dict['cancelled'] = True logger.debug("Terminating job/process ID: {0}".format(job)) cmd = "kill -- -$(ps -o pgid= {} | grep -o '[0-9]*')".format(job_dict['remote_pid']) - retcode, stdout, stderr = self.channel.execute_wait(cmd, self.cmd_timeout) + retcode, stdout, stderr = execute_wait(cmd, self.cmd_timeout) if retcode != 0: logger.warning("Failed to kill PID: {} and child processes on {}".format(job_dict['remote_pid'], self.label)) diff --git a/parsl/tests/test_channels/test_large_output.py b/parsl/tests/test_channels/test_large_output.py index bfc96f38bc..ee02d351f2 100644 --- a/parsl/tests/test_channels/test_large_output.py +++ b/parsl/tests/test_channels/test_large_output.py @@ -1,20 +1,18 @@ import pytest -from parsl.channels.local.local import LocalChannel +from parsl.utils import execute_wait @pytest.mark.local def test_local_large_output_2210(): """Regression test for #2210. - The local channel was hanging if the specified command gave too + execute_wait was hanging if the specified command gave too much output, due to a race condition between process exiting and pipes filling up. """ - c = LocalChannel() - # this will output 128kb of stdout - c.execute_wait("yes | dd count=128 bs=1024", walltime=60) + execute_wait("yes | dd count=128 bs=1024", walltime=60) # if this test fails, execute_wait should raise a timeout # exception. diff --git a/parsl/tests/test_channels/test_local_channel.py b/parsl/tests/test_channels/test_local_channel.py index a3f55d096d..39094d0235 100644 --- a/parsl/tests/test_channels/test_local_channel.py +++ b/parsl/tests/test_channels/test_local_channel.py @@ -1,6 +1,6 @@ import pytest -from parsl.channels.local.local import LocalChannel +from parsl.utils import execute_wait @pytest.mark.local @@ -8,8 +8,7 @@ def test_env(): ''' Regression testing for issue #27 ''' - lc = LocalChannel() - rc, stdout, stderr = lc.execute_wait("env", 1) + rc, stdout, stderr = execute_wait("env", 1) stdout = stdout.split('\n') x = [s for s in stdout if s.startswith("PATH=")] diff --git a/parsl/utils.py b/parsl/utils.py index 0ea5d7d9eb..b6544d63d2 100644 --- a/parsl/utils.py +++ b/parsl/utils.py @@ -458,3 +458,38 @@ def sanitize_dns_subdomain_rfc1123(raw_string: str) -> str: raise ValueError(f"Sanitized DNS subdomain is empty for input '{raw_string}'") return sanitized + + +def execute_wait(cmd: str, walltime: Optional[int] = None) -> Tuple[int, str, str]: + ''' Synchronously execute a commandline string on the shell. + + Args: + - cmd (string) : Commandline string to execute + - walltime (int) : walltime in seconds + + Returns: + - retcode : Return code from the execution + - stdout : stdout string + - stderr : stderr string + ''' + try: + logger.debug("Creating process with command '%s'", cmd) + proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=True, + preexec_fn=os.setpgrp + ) + logger.debug("Created process with pid %s. Performing communicate", proc.pid) + (stdout, stderr) = proc.communicate(timeout=walltime) + retcode = proc.returncode + logger.debug("Process %s returned %s", proc.pid, proc.returncode) + + except Exception: + logger.exception(f"Execution of command failed:\n{cmd}") + raise + else: + logger.debug("Execution of command in process %s completed normally", proc.pid) + + return (retcode, stdout.decode("utf-8"), stderr.decode("utf-8")) From b4a8fd50667d8bf4f56600dddb3601b3f366cbc3 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 26 Nov 2024 18:55:37 +0000 Subject: [PATCH 2/3] rename/rearrange execute_wait tests --- parsl/tests/test_channels/__init__.py | 0 .../tests/test_channels/test_local_channel.py | 18 ------------------ .../test_execute_wait.py} | 17 ++++++++++++++++- 3 files changed, 16 insertions(+), 19 deletions(-) delete mode 100644 parsl/tests/test_channels/__init__.py delete mode 100644 parsl/tests/test_channels/test_local_channel.py rename parsl/tests/{test_channels/test_large_output.py => test_utils/test_execute_wait.py} (58%) diff --git a/parsl/tests/test_channels/__init__.py b/parsl/tests/test_channels/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/parsl/tests/test_channels/test_local_channel.py b/parsl/tests/test_channels/test_local_channel.py deleted file mode 100644 index 39094d0235..0000000000 --- a/parsl/tests/test_channels/test_local_channel.py +++ /dev/null @@ -1,18 +0,0 @@ -import pytest - -from parsl.utils import execute_wait - - -@pytest.mark.local -def test_env(): - ''' Regression testing for issue #27 - ''' - - rc, stdout, stderr = execute_wait("env", 1) - - stdout = stdout.split('\n') - x = [s for s in stdout if s.startswith("PATH=")] - assert x, "PATH not found" - - x = [s for s in stdout if s.startswith("HOME=")] - assert x, "HOME not found" diff --git a/parsl/tests/test_channels/test_large_output.py b/parsl/tests/test_utils/test_execute_wait.py similarity index 58% rename from parsl/tests/test_channels/test_large_output.py rename to parsl/tests/test_utils/test_execute_wait.py index ee02d351f2..44488c239c 100644 --- a/parsl/tests/test_channels/test_large_output.py +++ b/parsl/tests/test_utils/test_execute_wait.py @@ -4,7 +4,22 @@ @pytest.mark.local -def test_local_large_output_2210(): +def test_env(): + ''' Regression testing for issue #27 + ''' + + rc, stdout, stderr = execute_wait("env", 1) + + stdout = stdout.split('\n') + x = [s for s in stdout if s.startswith("PATH=")] + assert x, "PATH not found" + + x = [s for s in stdout if s.startswith("HOME=")] + assert x, "HOME not found" + + +@pytest.mark.local +def test_large_output_2210(): """Regression test for #2210. execute_wait was hanging if the specified command gave too much output, due to a race condition between process exiting and From 37d9a12f0e0266012cc4b83b992165bff9c0d062 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 26 Nov 2024 18:56:28 +0000 Subject: [PATCH 3/3] remove last bit of doc string --- parsl/channels/base.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/parsl/channels/base.py b/parsl/channels/base.py index d05ac561d7..0024d45a6b 100644 --- a/parsl/channels/base.py +++ b/parsl/channels/base.py @@ -2,13 +2,6 @@ class Channel(metaclass=ABCMeta): - """Channels are abstractions that enable ExecutionProviders to talk to - resource managers of remote compute facilities. - - For certain resources such as campus clusters or supercomputers at - research laboratories, resource requirements may require authentication. - """ - @abstractproperty def script_dir(self) -> str: ''' This is a property. Returns the directory assigned for storing all internal scripts such as