From 20b42fed6996cdfdf169e4c4a9c839679e9fa4d5 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 8 Nov 2024 10:17:10 +0000 Subject: [PATCH 1/5] Remove channel push/pull file --- parsl/channels/base.py | 28 --------------------------- parsl/channels/local/local.py | 36 ----------------------------------- 2 files changed, 64 deletions(-) diff --git a/parsl/channels/base.py b/parsl/channels/base.py index e8acfc1088..96fa0b2582 100644 --- a/parsl/channels/base.py +++ b/parsl/channels/base.py @@ -53,34 +53,6 @@ def script_dir(self) -> str: def script_dir(self, value: str) -> None: pass - @abstractmethod - def push_file(self, source: str, dest_dir: str) -> str: - ''' Channel will take care of moving the file from source to the destination - directory - - Args: - source (string) : Full filepath of the file to be moved - dest_dir (string) : Absolute path of the directory to move to - - Returns: - destination_path (string) - ''' - pass - - @abstractmethod - def pull_file(self, remote_source: str, local_dir: str) -> str: - ''' Transport file on the remote side to a local directory - - Args: - remote_source (string): remote_source - local_dir (string): Local directory to copy to - - - Returns: - destination_path (string) - ''' - pass - @abstractmethod def makedirs(self, path: str, mode: int = 0o511, exist_ok: bool = False) -> None: """Create a directory. diff --git a/parsl/channels/local/local.py b/parsl/channels/local/local.py index 40b7eac34f..58bd6af849 100644 --- a/parsl/channels/local/local.py +++ b/parsl/channels/local/local.py @@ -1,10 +1,8 @@ import logging import os -import shutil import subprocess from parsl.channels.base import Channel -from parsl.channels.errors import FileCopyException from parsl.utils import RepresentationMixin logger = logging.getLogger(__name__) @@ -57,40 +55,6 @@ def execute_wait(self, cmd, walltime=None): return (retcode, stdout.decode("utf-8"), stderr.decode("utf-8")) - def push_file(self, source, dest_dir): - ''' If the source files dirpath is the same as dest_dir, a copy - is not necessary, and nothing is done. Else a copy is made. - - Args: - - source (string) : Path to the source file - - dest_dir (string) : Path to the directory to which the files is to be copied - - Returns: - - destination_path (String) : Absolute path of the destination file - - Raises: - - FileCopyException : If file copy failed. - ''' - - local_dest = os.path.join(dest_dir, os.path.basename(source)) - - # Only attempt to copy if the target dir and source dir are different - if os.path.dirname(source) != dest_dir: - try: - shutil.copyfile(source, local_dest) - os.chmod(local_dest, 0o700) - - except OSError as e: - raise FileCopyException(e, "localhost") - - else: - os.chmod(local_dest, 0o700) - - return local_dest - - def pull_file(self, remote_source, local_dir): - return self.push_file(remote_source, local_dir) - def isdir(self, path): """Return true if the path refers to an existing directory. From 6a9666e143a6b8093054877e56c522a0d24677b7 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 8 Nov 2024 10:20:10 +0000 Subject: [PATCH 2/5] Remove channel push_file support from SlurmProvider This has been a basically dead code path since removal of non-remote channels: Either a user chooses to not move files (overriding move_files to False) and the push_file was not used. Or the default push_file was used, but: * this is always LocalChannel now (PR #3677) * the script directory is always the local script directory (PR #3688) and so the local channel code always skips making a copy This commit simplies all of that away into using the script path directly with putting it through the above complicated no-op. --- parsl/providers/slurm/slurm.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/parsl/providers/slurm/slurm.py b/parsl/providers/slurm/slurm.py index 2aa855bb48..865ca6a52d 100644 --- a/parsl/providers/slurm/slurm.py +++ b/parsl/providers/slurm/slurm.py @@ -107,7 +107,6 @@ class SlurmProvider(ClusterProvider, RepresentationMixin): :class:`~parsl.launchers.SingleNodeLauncher` (the default), :class:`~parsl.launchers.SrunLauncher`, or :class:`~parsl.launchers.AprunLauncher` - move_files : Optional[Bool]: should files be moved? by default, Parsl will try to move files. """ @typeguard.typechecked @@ -130,7 +129,6 @@ def __init__(self, worker_init: str = '', cmd_timeout: int = 10, exclusive: bool = True, - move_files: bool = True, launcher: Launcher = SingleNodeLauncher()): label = 'slurm' super().__init__(label, @@ -148,7 +146,6 @@ def __init__(self, self.cores_per_node = cores_per_node self.mem_per_node = mem_per_node self.exclusive = exclusive - self.move_files = move_files self.account = account self.qos = qos self.constraint = constraint @@ -293,14 +290,7 @@ def submit(self, command: str, tasks_per_node: int, job_name="parsl.slurm") -> s logger.debug("Writing submit script") self._write_submit_script(template_string, script_path, job_name, job_config) - if self.move_files: - logger.debug("moving files") - channel_script_path = self.channel.push_file(script_path, self.channel.script_dir) - else: - logger.debug("not moving files") - channel_script_path = script_path - - retcode, stdout, stderr = self.execute_wait("sbatch {0}".format(channel_script_path)) + retcode, stdout, stderr = self.execute_wait("sbatch {0}".format(script_path)) if retcode == 0: for line in stdout.split('\n'): From 38d29e14b9e15f4c911c32116f02b362bd581117 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 8 Nov 2024 10:41:46 +0000 Subject: [PATCH 3/5] Remove push file from pbspro. see slurm commit for justification. --- parsl/providers/pbspro/pbspro.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/parsl/providers/pbspro/pbspro.py b/parsl/providers/pbspro/pbspro.py index 752f504334..71c958f000 100644 --- a/parsl/providers/pbspro/pbspro.py +++ b/parsl/providers/pbspro/pbspro.py @@ -183,15 +183,13 @@ def submit(self, command, tasks_per_node, job_name="parsl"): logger.debug("Writing submit script") self._write_submit_script(self.template_string, script_path, job_name, job_config) - channel_script_path = self.channel.push_file(script_path, self.channel.script_dir) - submit_options = '' if self.queue is not None: submit_options = '{0} -q {1}'.format(submit_options, self.queue) if self.account is not None: submit_options = '{0} -A {1}'.format(submit_options, self.account) - launch_cmd = "qsub {0} {1}".format(submit_options, channel_script_path) + launch_cmd = "qsub {0} {1}".format(submit_options, script_path) retcode, stdout, stderr = self.execute_wait(launch_cmd) job_id = None From 85031e34c1861f37b6712e4b0d36f9234341f529 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 8 Nov 2024 11:06:57 +0000 Subject: [PATCH 4/5] remove push/pull/move from LocalProvider --- parsl/providers/local/local.py | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/parsl/providers/local/local.py b/parsl/providers/local/local.py index f13521466a..5ecf174df2 100644 --- a/parsl/providers/local/local.py +++ b/parsl/providers/local/local.py @@ -32,9 +32,6 @@ class LocalProvider(ExecutionProvider, RepresentationMixin): Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive scaling where as many resources as possible are used; parallelism close to 0 represents the opposite situation in which as few resources as possible (i.e., min_blocks) are used. - move_files : Optional[Bool] - Should files be moved? By default, Parsl will try to figure this out itself (= None). - If True, then will always move. If False, will never move. worker_init : str Command to be run before starting a worker, such as 'module load Anaconda; source activate env'. """ @@ -48,8 +45,7 @@ def __init__(self, max_blocks=1, worker_init='', cmd_timeout=30, - parallelism=1, - move_files=None): + parallelism=1): self.channel = channel self._label = 'local' self.nodes_per_block = nodes_per_block @@ -61,7 +57,6 @@ def __init__(self, self.parallelism = parallelism self.script_dir = None self.cmd_timeout = cmd_timeout - self.move_files = move_files # Dictionary that keeps track of jobs, keyed on job_id self.resources = {} @@ -83,7 +78,6 @@ def status(self, job_ids): if job_dict['status'] and job_dict['status'].terminal: # We already checked this and it can't change after that continue - # Script path should point to remote path if _should_move_files() is True script_path = job_dict['script_path'] alive = self._is_alive(job_dict) @@ -137,8 +131,6 @@ def _is_alive(self, job_dict): def _job_file_path(self, script_path: str, suffix: str) -> str: path = '{0}{1}'.format(script_path, suffix) - if self._should_move_files(): - path = self.channel.pull_file(path, self.script_dir) return path def _read_job_file(self, script_path: str, suffix: str) -> str: @@ -216,9 +208,6 @@ def submit(self, command, tasks_per_node, job_name="parsl.localprovider"): job_id = None remote_pid = None - if self._should_move_files(): - logger.debug("Pushing start script") - script_path = self.channel.push_file(script_path, self.channel.script_dir) logger.debug("Launching") # We need to capture the exit code and the streams, so we put them in files. We also write @@ -254,9 +243,6 @@ def submit(self, command, tasks_per_node, job_name="parsl.localprovider"): return job_id - def _should_move_files(self): - return (self.move_files is None and not isinstance(self.channel, LocalChannel)) or (self.move_files) - def cancel(self, job_ids): ''' Cancels the jobs specified by a list of job ids From 74ed1a52f73964ff6232f73823b5bed6eb79cfac Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 8 Nov 2024 11:15:22 +0000 Subject: [PATCH 5/5] do all the other providers, basically same changes as slurm --- parsl/providers/condor/condor.py | 8 +++----- parsl/providers/grid_engine/grid_engine.py | 5 ++--- parsl/providers/lsf/lsf.py | 14 ++------------ parsl/providers/torque/torque.py | 4 +--- 4 files changed, 8 insertions(+), 23 deletions(-) diff --git a/parsl/providers/condor/condor.py b/parsl/providers/condor/condor.py index a736386d38..c8142c4026 100644 --- a/parsl/providers/condor/condor.py +++ b/parsl/providers/condor/condor.py @@ -245,16 +245,14 @@ def submit(self, command, tasks_per_node, job_name="parsl.condor"): with open(userscript_path, 'w') as f: f.write(job_config["worker_init"] + '\n' + wrapped_command) - user_script_path = self.channel.push_file(userscript_path, self.channel.script_dir) - the_input_files = [user_script_path] + self.transfer_input_files + the_input_files = [userscript_path] + self.transfer_input_files job_config["input_files"] = ','.join(the_input_files) - job_config["job_script"] = os.path.basename(user_script_path) + job_config["job_script"] = os.path.basename(userscript_path) # Construct and move the submit script self._write_submit_script(template_string, script_path, job_name, job_config) - channel_script_path = self.channel.push_file(script_path, self.channel.script_dir) - cmd = "condor_submit {0}".format(channel_script_path) + cmd = "condor_submit {0}".format(script_path) try: retcode, stdout, stderr = self.execute_wait(cmd) except Exception as e: diff --git a/parsl/providers/grid_engine/grid_engine.py b/parsl/providers/grid_engine/grid_engine.py index e7db987022..ddedcaa3e8 100644 --- a/parsl/providers/grid_engine/grid_engine.py +++ b/parsl/providers/grid_engine/grid_engine.py @@ -142,11 +142,10 @@ def submit(self, command, tasks_per_node, job_name="parsl.sge"): logger.debug("Writing submit script") self._write_submit_script(template_string, script_path, job_name, job_config) - channel_script_path = self.channel.push_file(script_path, self.channel.script_dir) if self.queue is not None: - cmd = "qsub -q {0} -terse {1}".format(self.queue, channel_script_path) + cmd = "qsub -q {0} -terse {1}".format(self.queue, script_path) else: - cmd = "qsub -terse {0}".format(channel_script_path) + cmd = "qsub -terse {0}".format(script_path) retcode, stdout, stderr = self.execute_wait(cmd) if retcode == 0: diff --git a/parsl/providers/lsf/lsf.py b/parsl/providers/lsf/lsf.py index 8f18f5c879..b446b063a4 100644 --- a/parsl/providers/lsf/lsf.py +++ b/parsl/providers/lsf/lsf.py @@ -68,7 +68,6 @@ class LSFProvider(ClusterProvider, RepresentationMixin): :class:`~parsl.launchers.SingleNodeLauncher` (the default), :class:`~parsl.launchers.SrunLauncher`, or :class:`~parsl.launchers.AprunLauncher` - move_files : Optional[Bool]: should files be moved? by default, Parsl will try to move files. bsub_redirection: Bool Should a redirection symbol "<" be included when submitting jobs, i.e., Bsub < job_script. request_by_nodes: Bool @@ -92,7 +91,6 @@ def __init__(self, project=None, queue=None, cmd_timeout=120, - move_files=True, bsub_redirection=False, request_by_nodes=True, launcher=SingleNodeLauncher()): @@ -112,7 +110,6 @@ def __init__(self, self.queue = queue self.cores_per_block = cores_per_block self.cores_per_node = cores_per_node - self.move_files = move_files self.bsub_redirection = bsub_redirection self.request_by_nodes = request_by_nodes @@ -230,17 +227,10 @@ def submit(self, command, tasks_per_node, job_name="parsl.lsf"): logger.debug("Writing submit script") self._write_submit_script(template_string, script_path, job_name, job_config) - if self.move_files: - logger.debug("moving files") - channel_script_path = self.channel.push_file(script_path, self.channel.script_dir) - else: - logger.debug("not moving files") - channel_script_path = script_path - if self.bsub_redirection: - cmd = "bsub < {0}".format(channel_script_path) + cmd = "bsub < {0}".format(script_path) else: - cmd = "bsub {0}".format(channel_script_path) + cmd = "bsub {0}".format(script_path) retcode, stdout, stderr = super().execute_wait(cmd) job_id = None diff --git a/parsl/providers/torque/torque.py b/parsl/providers/torque/torque.py index c15591706c..7992893abb 100644 --- a/parsl/providers/torque/torque.py +++ b/parsl/providers/torque/torque.py @@ -189,15 +189,13 @@ def submit(self, command, tasks_per_node, job_name="parsl.torque"): logger.debug("Writing submit script") self._write_submit_script(self.template_string, script_path, job_name, job_config) - channel_script_path = self.channel.push_file(script_path, self.channel.script_dir) - submit_options = '' if self.queue is not None: submit_options = '{0} -q {1}'.format(submit_options, self.queue) if self.account is not None: submit_options = '{0} -A {1}'.format(submit_options, self.account) - launch_cmd = "qsub {0} {1}".format(submit_options, channel_script_path) + launch_cmd = "qsub {0} {1}".format(submit_options, script_path) retcode, stdout, stderr = self.execute_wait(launch_cmd) job_id = None