diff --git a/parsl/channels/base.py b/parsl/channels/base.py index 05241b878d..0024d45a6b 100644 --- a/parsl/channels/base.py +++ b/parsl/channels/base.py @@ -1,39 +1,7 @@ -from abc import ABCMeta, abstractmethod, abstractproperty -from typing import Tuple +from abc import ABCMeta, abstractproperty class Channel(metaclass=ABCMeta): - """Channels are abstractions that enable ExecutionProviders to talk to - resource managers of remote compute facilities. - - For certain resources such as campus clusters or supercomputers at - research laboratories, resource requirements may require authentication. - - The only remaining Channel, *LocalChannel*, executes commands locally in a - shell. - - Channels provide the ability to execute commands remotely, using the - execute_wait method, and manipulate the remote file system using methods - such as push_file, pull_file and makedirs. - - Channels should ensure that each launched command runs in a new process - group, so that providers (such as LocalProvider) which terminate long - running commands using process groups can do so. - """ - - @abstractmethod - def execute_wait(self, cmd: str, walltime: int = 0) -> Tuple[int, str, str]: - ''' Executes the cmd, with a defined walltime. - - Args: - - cmd (string): Command string to execute over the channel - - walltime (int) : Timeout in seconds - - Returns: - - (exit_code, stdout, stderr) (int, string, string) - ''' - pass - @abstractproperty def script_dir(self) -> str: ''' This is a property. Returns the directory assigned for storing all internal scripts such as diff --git a/parsl/channels/local/local.py b/parsl/channels/local/local.py index 6ef014ac19..4c1712ef30 100644 --- a/parsl/channels/local/local.py +++ b/parsl/channels/local/local.py @@ -1,6 +1,5 @@ import logging import os -import subprocess from parsl.channels.base import Channel from parsl.utils import RepresentationMixin @@ -21,40 +20,6 @@ def __init__(self): ''' self.script_dir = None - def execute_wait(self, cmd, walltime=None): - ''' Synchronously execute a commandline string on the shell. - - Args: - - cmd (string) : Commandline string to execute - - walltime (int) : walltime in seconds - - Returns: - - retcode : Return code from the execution - - stdout : stdout string - - stderr : stderr string - ''' - try: - logger.debug("Creating process with command '%s'", cmd) - proc = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True, - preexec_fn=os.setpgrp - ) - logger.debug("Created process with pid %s. Performing communicate", proc.pid) - (stdout, stderr) = proc.communicate(timeout=walltime) - retcode = proc.returncode - logger.debug("Process %s returned %s", proc.pid, proc.returncode) - - except Exception: - logger.exception(f"Execution of command failed:\n{cmd}") - raise - else: - logger.debug("Execution of command in process %s completed normally", proc.pid) - - return (retcode, stdout.decode("utf-8"), stderr.decode("utf-8")) - @property def script_dir(self): return self._script_dir diff --git a/parsl/providers/cluster_provider.py b/parsl/providers/cluster_provider.py index 6bc76bdf22..f2feb2ddf3 100644 --- a/parsl/providers/cluster_provider.py +++ b/parsl/providers/cluster_provider.py @@ -6,6 +6,7 @@ from parsl.launchers.errors import BadLauncher from parsl.providers.base import ExecutionProvider from parsl.providers.errors import SchedulerMissingArgs, ScriptPathError +from parsl.utils import execute_wait logger = logging.getLogger(__name__) @@ -76,7 +77,7 @@ def execute_wait(self, cmd, timeout=None): t = self.cmd_timeout if timeout is not None: t = timeout - return self.channel.execute_wait(cmd, t) + return execute_wait(cmd, t) def _write_submit_script(self, template, script_filename, job_name, configs): """Generate submit script and write it to a file. diff --git a/parsl/providers/local/local.py b/parsl/providers/local/local.py index 5ecf174df2..6357c85cba 100644 --- a/parsl/providers/local/local.py +++ b/parsl/providers/local/local.py @@ -11,7 +11,7 @@ ScriptPathError, SubmitException, ) -from parsl.utils import RepresentationMixin +from parsl.utils import RepresentationMixin, execute_wait logger = logging.getLogger(__name__) @@ -118,7 +118,7 @@ def status(self, job_ids): return [self.resources[jid]['status'] for jid in job_ids] def _is_alive(self, job_dict): - retcode, stdout, stderr = self.channel.execute_wait( + retcode, stdout, stderr = execute_wait( 'ps -p {} > /dev/null 2> /dev/null; echo "STATUS:$?" '.format( job_dict['remote_pid']), self.cmd_timeout) for line in stdout.split('\n'): @@ -223,11 +223,11 @@ def submit(self, command, tasks_per_node, job_name="parsl.localprovider"): # cancel the task later. # # We need to do the >/dev/null 2>&1 so that bash closes stdout, otherwise - # channel.execute_wait hangs reading the process stdout until all the + # execute_wait hangs reading the process stdout until all the # background commands complete. cmd = '/bin/bash -c \'echo - >{0}.ec && {{ {{ bash {0} 1>{0}.out 2>{0}.err ; ' \ 'echo $? > {0}.ec ; }} >/dev/null 2>&1 & echo "PID:$!" ; }}\''.format(script_path) - retcode, stdout, stderr = self.channel.execute_wait(cmd, self.cmd_timeout) + retcode, stdout, stderr = execute_wait(cmd, self.cmd_timeout) if retcode != 0: raise SubmitException(job_name, "Launch command exited with code {0}".format(retcode), stdout, stderr) @@ -258,7 +258,7 @@ def cancel(self, job_ids): job_dict['cancelled'] = True logger.debug("Terminating job/process ID: {0}".format(job)) cmd = "kill -- -$(ps -o pgid= {} | grep -o '[0-9]*')".format(job_dict['remote_pid']) - retcode, stdout, stderr = self.channel.execute_wait(cmd, self.cmd_timeout) + retcode, stdout, stderr = execute_wait(cmd, self.cmd_timeout) if retcode != 0: logger.warning("Failed to kill PID: {} and child processes on {}".format(job_dict['remote_pid'], self.label)) diff --git a/parsl/tests/test_channels/__init__.py b/parsl/tests/test_channels/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/parsl/tests/test_channels/test_large_output.py b/parsl/tests/test_channels/test_large_output.py deleted file mode 100644 index bfc96f38bc..0000000000 --- a/parsl/tests/test_channels/test_large_output.py +++ /dev/null @@ -1,22 +0,0 @@ -import pytest - -from parsl.channels.local.local import LocalChannel - - -@pytest.mark.local -def test_local_large_output_2210(): - """Regression test for #2210. - The local channel was hanging if the specified command gave too - much output, due to a race condition between process exiting and - pipes filling up. - """ - - c = LocalChannel() - - # this will output 128kb of stdout - c.execute_wait("yes | dd count=128 bs=1024", walltime=60) - - # if this test fails, execute_wait should raise a timeout - # exception. - - # The contents out the output is not verified by this test diff --git a/parsl/tests/test_channels/test_local_channel.py b/parsl/tests/test_channels/test_local_channel.py deleted file mode 100644 index a3f55d096d..0000000000 --- a/parsl/tests/test_channels/test_local_channel.py +++ /dev/null @@ -1,19 +0,0 @@ -import pytest - -from parsl.channels.local.local import LocalChannel - - -@pytest.mark.local -def test_env(): - ''' Regression testing for issue #27 - ''' - - lc = LocalChannel() - rc, stdout, stderr = lc.execute_wait("env", 1) - - stdout = stdout.split('\n') - x = [s for s in stdout if s.startswith("PATH=")] - assert x, "PATH not found" - - x = [s for s in stdout if s.startswith("HOME=")] - assert x, "HOME not found" diff --git a/parsl/tests/test_utils/test_execute_wait.py b/parsl/tests/test_utils/test_execute_wait.py new file mode 100644 index 0000000000..44488c239c --- /dev/null +++ b/parsl/tests/test_utils/test_execute_wait.py @@ -0,0 +1,35 @@ +import pytest + +from parsl.utils import execute_wait + + +@pytest.mark.local +def test_env(): + ''' Regression testing for issue #27 + ''' + + rc, stdout, stderr = execute_wait("env", 1) + + stdout = stdout.split('\n') + x = [s for s in stdout if s.startswith("PATH=")] + assert x, "PATH not found" + + x = [s for s in stdout if s.startswith("HOME=")] + assert x, "HOME not found" + + +@pytest.mark.local +def test_large_output_2210(): + """Regression test for #2210. + execute_wait was hanging if the specified command gave too + much output, due to a race condition between process exiting and + pipes filling up. + """ + + # this will output 128kb of stdout + execute_wait("yes | dd count=128 bs=1024", walltime=60) + + # if this test fails, execute_wait should raise a timeout + # exception. + + # The contents out the output is not verified by this test diff --git a/parsl/utils.py b/parsl/utils.py index 0ea5d7d9eb..b6544d63d2 100644 --- a/parsl/utils.py +++ b/parsl/utils.py @@ -458,3 +458,38 @@ def sanitize_dns_subdomain_rfc1123(raw_string: str) -> str: raise ValueError(f"Sanitized DNS subdomain is empty for input '{raw_string}'") return sanitized + + +def execute_wait(cmd: str, walltime: Optional[int] = None) -> Tuple[int, str, str]: + ''' Synchronously execute a commandline string on the shell. + + Args: + - cmd (string) : Commandline string to execute + - walltime (int) : walltime in seconds + + Returns: + - retcode : Return code from the execution + - stdout : stdout string + - stderr : stderr string + ''' + try: + logger.debug("Creating process with command '%s'", cmd) + proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=True, + preexec_fn=os.setpgrp + ) + logger.debug("Created process with pid %s. Performing communicate", proc.pid) + (stdout, stderr) = proc.communicate(timeout=walltime) + retcode = proc.returncode + logger.debug("Process %s returned %s", proc.pid, proc.returncode) + + except Exception: + logger.exception(f"Execution of command failed:\n{cmd}") + raise + else: + logger.debug("Execution of command in process %s completed normally", proc.pid) + + return (retcode, stdout.decode("utf-8"), stderr.decode("utf-8"))