Skip to content

Commit

Permalink
Add working_dir as a kwarg to GlobusComputeEngineBase
Browse files Browse the repository at this point in the history
ThreadPoolEngine and ProcessPoolEngine now pass the working_dir: default "tasks_working_dir"
to the GlobusComputeEngineBase, which makes the path absolute. When execute_task runs
the working dir is set to this absolute path ensuring that tasks run in the same directory.
  • Loading branch information
yadudoc committed Oct 23, 2024
1 parent 5732e93 commit c34d27b
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
New Functionality
^^^^^^^^^^^^^^^^^

- ``GlobusComputeEngine``, ``ThreadPoolEngine``, and ``ProcessPoolEngine`` can
now be configured with ``working_dir`` to specify the tasks working directory.
If a relative path is specified, it is set in relation to the endpoint
run directory (usually ~/.globus_compute/<endpoint_name>). Here's an example
config file:

.. code-block:: yaml
engine:
type: GlobusComputeEngine
working_dir: /dev/shm/tasks_working_dir
Bug Fixes
^^^^^^^^^

- Fixed a bug where functions run with ``ThreadPoolEngine`` and ``ProcessPoolEngine``
create and switch into the ``tasks_working_dir`` creating endless nesting.
5 changes: 4 additions & 1 deletion compute_endpoint/globus_compute_endpoint/engines/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def __init__(
*args: object,
endpoint_id: t.Optional[uuid.UUID] = None,
max_retries_on_system_failure: int = 0,
working_dir: t.Union[str, os.PathLike] = "tasks_working_dir",
**kwargs: object,
):
self._shutdown_event = threading.Event()
Expand All @@ -89,7 +90,9 @@ def __init__(
# endpoint interchange happy
self.container_type: t.Optional[str] = None
self.run_dir: t.Optional[str] = None
self.working_dir: t.Union[str, os.PathLike] = "tasks_working_dir"
self.working_dir: t.Union[str, os.PathLike] = working_dir
if not os.path.abspath(working_dir):
self.working_dir = os.path.abspath(working_dir)
self.run_in_sandbox: bool = False
# This attribute could be set by the subclasses in their
# start method if another component insists on owning the queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(
encrypted: bool = True,
strategy: str | None = None,
job_status_kwargs: t.Optional[JobStatusPollerKwargs] = None,
working_dir: str | os.PathLike = "tasks_working_dir",
working_dir: t.Union[str, os.PathLike] = "tasks_working_dir",
run_in_sandbox: bool = False,
**kwargs,
):
Expand Down Expand Up @@ -116,7 +116,10 @@ def __init__(
self.label = label or type(self).__name__
self._status_report_thread = ReportingThread(target=self.report_status, args=[])
super().__init__(
*args, max_retries_on_system_failure=max_retries_on_system_failure, **kwargs
*args,
max_retries_on_system_failure=max_retries_on_system_failure,
working_dir=working_dir,
**kwargs,
)
self.strategy = strategy

Expand Down Expand Up @@ -146,7 +149,6 @@ def __init__(
self.executor.interchange_launch_cmd = self._get_compute_ix_launch_cmd()
self.executor.launch_cmd = self._get_compute_launch_cmd()

self.working_dir = working_dir
self.run_in_sandbox = run_in_sandbox
if strategy is None:
strategy = "simple"
Expand Down
11 changes: 9 additions & 2 deletions compute_endpoint/globus_compute_endpoint/engines/process_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
import multiprocessing
import os
import queue
import typing as t
import uuid
Expand All @@ -22,13 +23,19 @@


class ProcessPoolEngine(GlobusComputeEngineBase):
def __init__(self, *args, label: str = "ProcessPoolEngine", **kwargs):
def __init__(
self,
*args,
working_dir: t.Union[str, os.PathLike] = "tasks_working_dir",
label: str = "ProcessPoolEngine",
**kwargs,
):
self.label = label
self.executor: t.Optional[NativeExecutor] = None
self._executor_args = args
self._executor_kwargs = kwargs
self._status_report_thread = ReportingThread(target=self.report_status, args=[])
super().__init__(*args, **kwargs)
super().__init__(*args, working_dir=working_dir, **kwargs)

def start(
self,
Expand Down
11 changes: 9 additions & 2 deletions compute_endpoint/globus_compute_endpoint/engines/thread_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
import multiprocessing
import os
import queue
import typing as t
import uuid
Expand All @@ -22,11 +23,17 @@


class ThreadPoolEngine(GlobusComputeEngineBase):
def __init__(self, *args, label: str = "ThreadPoolEngine", **kwargs):
def __init__(
self,
*args,
working_dir: t.Union[str, os.PathLike] = "tasks_working_dir",
label: str = "ThreadPoolEngine",
**kwargs,
):
self.label = label
self.executor = NativeExecutor(*args, **kwargs)
self._status_report_thread = ReportingThread(target=self.report_status, args=[])
super().__init__(*args, **kwargs)
super().__init__(*args, working_dir=working_dir, **kwargs)

def start(
self,
Expand Down
15 changes: 14 additions & 1 deletion compute_endpoint/tests/unit/test_engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from parsl import HighThroughputExecutor
from parsl.executors.high_throughput.interchange import ManagerLost
from parsl.providers import KubernetesProvider
from tests.utils import double, kill_manager
from tests.utils import double, get_cwd, kill_manager

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -73,6 +73,19 @@ def test_engine_submit(engine_type: GlobusComputeEngineBase, engine_runner):
assert future.result(timeout=5) == param * 2


@pytest.mark.parametrize(
"engine_type", (ProcessPoolEngine, ThreadPoolEngine, GlobusComputeEngine)
)
def test_engine_working_dir(engine_type: GlobusComputeEngineBase, engine_runner):
"""working dir remains constant across multiple fn invocations"""
engine = engine_runner(engine_type)

resource_spec = {}
future1 = engine._submit(get_cwd, resource_spec)
future2 = engine._submit(get_cwd, resource_spec)
assert future1.result() == future2.result()


@pytest.mark.parametrize(
"engine_type", (ProcessPoolEngine, ThreadPoolEngine, GlobusComputeEngine)
)
Expand Down
6 changes: 6 additions & 0 deletions compute_endpoint/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,9 @@ def get_env_vars():
import os

return os.environ


def get_cwd():
import os

return os.getcwd()

0 comments on commit c34d27b

Please sign in to comment.