Skip to content

Commit

Permalink
Add working_dir kwarg to GlobusComputeEngineBase
Browse files Browse the repository at this point in the history
`ThreadPoolEngine` and `ProcessPoolEngine` now pass the `working_dir` default
`"tasks_working_dir"` to the `GlobusComputeEngineBase`, which makes the path
absolute.  When `execute_task` runs the cwd is changed to `working_dir`,
ensuring that tasks run in the same directory.

* Added a new method `GlobusComputeEngineBase.set_working_dir()` to
  ensure `working_dir` is absolute.

* Enforce absolute paths for `run_dir` in `execute_task()`

* Removed an obsolete test
  • Loading branch information
yadudoc authored and khk-globus committed Nov 13, 2024
1 parent f75edc6 commit 1cd2f0d
Show file tree
Hide file tree
Showing 12 changed files with 182 additions and 68 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
New Functionality
^^^^^^^^^^^^^^^^^

- ``GlobusComputeEngine``, ``ThreadPoolEngine``, and ``ProcessPoolEngine`` can
now be configured with ``working_dir`` to specify the tasks working directory.
If a relative path is specified, it is set in relation to the endpoint
run directory (usually ``~/.globus_compute/<endpoint_name>``). Here's an example
config file:

.. code-block:: yaml
engine:
type: GlobusComputeEngine
working_dir: /absolute/path/to/tasks_working_dir
Bug Fixes
^^^^^^^^^

- Fixed a bug where functions run with ``ThreadPoolEngine`` and ``ProcessPoolEngine``
create and switch into the ``tasks_working_dir`` creating endless nesting.
30 changes: 29 additions & 1 deletion compute_endpoint/globus_compute_endpoint/engines/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,31 @@ def __init__(
*args: object,
endpoint_id: uuid.UUID | None = None,
max_retries_on_system_failure: int = 0,
working_dir: str | os.PathLike = "tasks_working_dir",
**kwargs: object,
):
"""
Parameters
----------
endpoint_id: uuid | None
ID of the endpoint that the engine serves as execution backend
max_retries_on_system_failure: int
Set the number of retries for functions that fail due to system
failures such as node failure/loss. Since functions can fail
after partial runs, consider additional cleanup logic before
enabling this functionality. default=0
working_dir: str | os.PathLike
Directory within which functions should execute, defaults to
(~/.globus_compute/<endpoint_name>/tasks_working_dir)
If a relative path is supplied, the working dir is set relative
to the endpoint.run_dir. If an absolute path is supplied, it is
used as is. default="tasks_working_dir"
kwargs
"""
self._shutdown_event = threading.Event()
self.endpoint_id = endpoint_id
self.max_retries_on_system_failure = max_retries_on_system_failure
Expand All @@ -91,7 +114,7 @@ def __init__(
# endpoint interchange happy
self.container_type: str | None = None
self.run_dir: str | None = None
self.working_dir: str | os.PathLike = "tasks_working_dir"
self.working_dir: str | os.PathLike = working_dir
self.run_in_sandbox: bool = False
# This attribute could be set by the subclasses in their
# start method if another component insists on owning the queue.
Expand All @@ -111,6 +134,11 @@ def start(
def get_status_report(self) -> EPStatusReport:
raise NotImplementedError

def set_working_dir(self, run_dir: str | None = None):
if not os.path.isabs(self.working_dir):
run_dir = os.path.abspath(run_dir or os.getcwd())
self.working_dir = os.path.join(run_dir, self.working_dir)

def report_status(self) -> None:
status_report = self.get_status_report()
packed: bytes = messagepack.pack(status_report)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ def __init__(
encrypted: bool = True,
strategy: str | None = None,
job_status_kwargs: t.Optional[JobStatusPollerKwargs] = None,
working_dir: str | os.PathLike = "tasks_working_dir",
run_in_sandbox: bool = False,
**kwargs,
):
Expand Down Expand Up @@ -90,13 +89,6 @@ def __init__(
encrypted: bool
Flag to enable/disable encryption (CurveZMQ). Default is True.
working_dir: str | os.PathLike
Directory within which functions should execute, defaults to
(~/.globus_compute/<endpoint_name>/tasks_working_dir)
If a relative path is supplied, the working dir is set relative
to the endpoint.run_dir. If an absolute path is supplied, it is
used as is.
run_in_sandbox: bool
Functions will run in a sandbox directory under the working_dir
if this option is enabled. Default: False
Expand Down Expand Up @@ -146,7 +138,6 @@ def __init__(
self.executor.interchange_launch_cmd = self._get_compute_ix_launch_cmd()
self.executor.launch_cmd = self._get_compute_launch_cmd()

self.working_dir = working_dir
self.run_in_sandbox = run_in_sandbox
if strategy is None:
strategy = "simple"
Expand Down Expand Up @@ -249,9 +240,7 @@ def start(
assert endpoint_id, "GCExecutor requires kwarg:endpoint_id at start"
assert run_dir, "GCExecutor requires kwarg:run_dir at start"

if not os.path.isabs(self.working_dir):
# set relative to run_dir
self.working_dir = os.path.join(run_dir, self.working_dir)
self.set_working_dir(run_dir=run_dir)

self.endpoint_id = endpoint_id
self.run_dir = run_dir
Expand Down
21 changes: 13 additions & 8 deletions compute_endpoint/globus_compute_endpoint/engines/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,19 @@ def execute_task(

os.environ.pop("GC_TASK_SANDBOX_DIR", None)
os.environ["GC_TASK_UUID"] = str(task_id)
if run_dir:
os.makedirs(run_dir, exist_ok=True)
os.chdir(run_dir)
if run_in_sandbox:
os.makedirs(str(task_id)) # task_id is expected to be unique
os.chdir(str(task_id))
# Set sandbox dir so that apps can use it
os.environ["GC_TASK_SANDBOX_DIR"] = os.getcwd()

if not run_dir or not os.path.isabs(run_dir):
raise RuntimeError(
f"execute_task requires an absolute path for run_dir, got {run_dir=}"
)

os.makedirs(run_dir, exist_ok=True)
os.chdir(run_dir)
if run_in_sandbox:
os.makedirs(str(task_id)) # task_id is expected to be unique
os.chdir(str(task_id))
# Set sandbox dir so that apps can use it
os.environ["GC_TASK_SANDBOX_DIR"] = os.getcwd()

env_details = get_env_details()
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,11 @@ def submit(

container_loc = self._get_container_location(packed_task)
ser = serializer.serialize(
(execute_task, [task_id, packed_task, self.endpoint_id], {})
(
execute_task,
[task_id, packed_task, self.endpoint_id],
{"run_dir": self.run_dir},
)
)
payload = Task(task_id, container_loc, ser).pack()
assert self.outgoing_q # Placate mypy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ def start(
self,
*args,
endpoint_id: t.Optional[uuid.UUID] = None,
run_dir: t.Optional[str] = None,
results_passthrough: t.Optional[queue.Queue] = None,
**kwargs,
) -> None:
"""
Parameters
----------
endpoint_id: Endpoint UUID
run_dir: endpoint run directory
results_passthrough: Queue to which packed results will be posted
Returns
-------
Expand All @@ -59,6 +61,7 @@ def start(
if results_passthrough:
self.results_passthrough = results_passthrough
assert self.results_passthrough
self.set_working_dir(run_dir=run_dir)

self._status_report_thread.start()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@ def start(
self,
*args,
endpoint_id: t.Optional[uuid.UUID] = None,
run_dir: t.Optional[str] = None,
results_passthrough: t.Optional[queue.Queue] = None,
**kwargs,
) -> None:
"""
Parameters
----------
endpoint_id: Endpoint UUID
run_dir: endpoint run directory
results_passthrough: Queue to which packed results will be posted
run_dir Not used
Returns
-------
"""
Expand All @@ -50,6 +51,7 @@ def start(
self.results_passthrough = results_passthrough
assert self.results_passthrough

self.set_working_dir(run_dir=run_dir)
# mypy think the thread can be none
self._status_report_thread.start()

Expand Down
33 changes: 32 additions & 1 deletion compute_endpoint/tests/unit/test_engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from parsl import HighThroughputExecutor
from parsl.executors.high_throughput.interchange import ManagerLost
from parsl.providers import KubernetesProvider
from tests.utils import double, kill_manager
from tests.utils import double, get_cwd, kill_manager

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -73,6 +73,37 @@ def test_engine_submit(engine_type: GlobusComputeEngineBase, engine_runner):
assert future.result(timeout=5) == param * 2


@pytest.mark.parametrize(
"engine_type", (ProcessPoolEngine, ThreadPoolEngine, GlobusComputeEngine)
)
def test_engine_working_dir(
engine_type: GlobusComputeEngineBase,
engine_runner,
ez_pack_task,
serde,
task_uuid,
):
"""working dir remains constant across multiple fn invocations
This test requires submitting the task payload so that the execute_task
wrapper is used which switches into the working_dir, which created
working_dir nesting when relative paths were used.
"""
engine = engine_runner(engine_type)

task_args = (str(task_uuid), ez_pack_task(get_cwd), {})

future1 = engine.submit(*task_args)
unpacked1 = messagepack.unpack(future1.result()) # blocks; avoid race condition

future2 = engine.submit(*task_args) # exact same task
unpacked2 = messagepack.unpack(future2.result())

# data is enough for test, but in error case, be kind to dev
cwd1 = serde.deserialize(unpacked1.data)
cwd2 = serde.deserialize(unpacked2.data)
assert cwd1 == cwd2, "working dir should be idempotent"


@pytest.mark.parametrize(
"engine_type", (ProcessPoolEngine, ThreadPoolEngine, GlobusComputeEngine)
)
Expand Down
17 changes: 13 additions & 4 deletions compute_endpoint/tests/unit/test_execute_task.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from unittest import mock

import pytest
from globus_compute_common import messagepack
from globus_compute_endpoint.engines.helper import execute_task

Expand All @@ -13,12 +14,18 @@ def divide(x, y):
return x / y


def test_execute_task(endpoint_uuid, serde, task_uuid, ez_pack_task):
@pytest.mark.parametrize("run_dir", ("tmp", None, "$HOME"))
def test_bad_run_dir(endpoint_uuid, task_uuid, run_dir):
with pytest.raises(RuntimeError):
execute_task(task_uuid, b"", endpoint_uuid, run_dir=run_dir)


def test_execute_task(endpoint_uuid, serde, task_uuid, ez_pack_task, tmp_path):
inp, outp = (10, 2), 5

task_bytes = ez_pack_task(divide, *inp)

packed_result = execute_task(task_uuid, task_bytes, endpoint_uuid)
packed_result = execute_task(task_uuid, task_bytes, endpoint_uuid, run_dir=tmp_path)
assert isinstance(packed_result, bytes)

result = messagepack.unpack(packed_result)
Expand All @@ -31,11 +38,13 @@ def test_execute_task(endpoint_uuid, serde, task_uuid, ez_pack_task):
assert serde.deserialize(result.data) == outp


def test_execute_task_with_exception(endpoint_uuid, task_uuid, ez_pack_task):
def test_execute_task_with_exception(endpoint_uuid, task_uuid, ez_pack_task, tmp_path):
task_bytes = ez_pack_task(divide, 10, 0)

with mock.patch(f"{_MOCK_BASE}log") as mock_log:
packed_result = execute_task(task_uuid, task_bytes, endpoint_uuid)
packed_result = execute_task(
task_uuid, task_bytes, endpoint_uuid, run_dir=tmp_path
)

assert mock_log.exception.called
a, _k = mock_log.exception.call_args
Expand Down
16 changes: 11 additions & 5 deletions compute_endpoint/tests/unit/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def test_register_and_kill(test_worker):
assert messages[1][0] == b"WRKR_DIE", messages


def test_execute_hello_world(test_worker):
def test_execute_hello_world(test_worker, tmp_path):
task_id = uuid.uuid1()
task_body = test_worker.serializer.serialize((hello_world, (), {}))
internal_task = Task(task_id, "RAW", task_body)
Expand Down Expand Up @@ -125,15 +125,19 @@ def test_execute_failing_function(test_worker):


def test_execute_function_exceeding_result_size_limit(
test_worker, endpoint_uuid, task_uuid, ez_pack_task
test_worker, endpoint_uuid, task_uuid, ez_pack_task, tmp_path
):
return_size = 10

task_bytes = ez_pack_task(large_result, return_size)

with mock.patch("globus_compute_endpoint.engines.helper.log") as mock_log:
s_result = execute_task(
task_uuid, task_bytes, endpoint_uuid, result_size_limit=return_size - 2
task_uuid,
task_bytes,
endpoint_uuid,
result_size_limit=return_size - 2,
run_dir=tmp_path,
)
result = messagepack.unpack(s_result)

Expand All @@ -145,12 +149,14 @@ def test_execute_function_exceeding_result_size_limit(
assert mock_log.exception.called


def test_app_timeout(test_worker, endpoint_uuid, task_uuid, ez_pack_task):
def test_app_timeout(test_worker, endpoint_uuid, task_uuid, ez_pack_task, tmp_path):
task_bytes = ez_pack_task(sleeper, 1)

with mock.patch("globus_compute_endpoint.engines.helper.log") as mock_log:
with mock.patch.dict(os.environ, {"GC_TASK_TIMEOUT": "0.01"}):
packed_result = execute_task(task_uuid, task_bytes, endpoint_uuid)
packed_result = execute_task(
task_uuid, task_bytes, endpoint_uuid, run_dir=tmp_path
)

result = messagepack.unpack(packed_result)
assert isinstance(result, messagepack.message_types.Result)
Expand Down
Loading

0 comments on commit 1cd2f0d

Please sign in to comment.