From 3833aac5694ac8ad685541d308e751cbadb8ce60 Mon Sep 17 00:00:00 2001 From: Chris Janidlo Date: Thu, 19 Oct 2023 12:26:22 -0500 Subject: [PATCH] Add configurable AMQP port to endpoint --- .../20231020_110321_chris_443_ampqs.rst | 3 +++ .../endpoint/config/config.py | 9 ++++++++ .../endpoint/config/model.py | 1 + .../endpoint/endpoint_manager.py | 1 + .../endpoint/interchange.py | 5 +++- .../rabbit_mq/command_queue_subscriber.py | 6 +++++ .../endpoint/rabbit_mq/result_publisher.py | 6 +++++ .../rabbit_mq/task_queue_subscriber.py | 8 +++++++ .../unit/test_command_queue_subscriber.py | 13 +++++++++++ .../tests/unit/test_result_publisher.py | 12 ++++++++++ .../tests/unit/test_task_queue_subscriber.py | 23 +++++++++++++++++++ 11 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 compute_endpoint/tests/unit/test_task_queue_subscriber.py diff --git a/changelog.d/20231020_110321_chris_443_ampqs.rst b/changelog.d/20231020_110321_chris_443_ampqs.rst index a41934270..283080bf9 100644 --- a/changelog.d/20231020_110321_chris_443_ampqs.rst +++ b/changelog.d/20231020_110321_chris_443_ampqs.rst @@ -5,3 +5,6 @@ New Functionality - The ``Executor`` can now be told which port to use to listen to AMQP results, via either the ``amqp_port`` keyword argument or the ``amqp_port`` property. + +- Endpoints can be configured to talk to RMQ over a different port via the ``amqp_port`` + configuration option. diff --git a/compute_endpoint/globus_compute_endpoint/endpoint/config/config.py b/compute_endpoint/globus_compute_endpoint/endpoint/config/config.py index 288af40e9..22be80565 100644 --- a/compute_endpoint/globus_compute_endpoint/endpoint/config/config.py +++ b/compute_endpoint/globus_compute_endpoint/endpoint/config/config.py @@ -125,6 +125,12 @@ class Config(RepresentationMixin): start command for that single-user endpoint within this timeframe, the single-user endpoint is started back up again. Default: 30 seconds + + amqp_port : int | None + Port to use for AMQP connections. Note that only 5671, 5672, and 443 are + supported by the Compute web services. If None, the port is assigned by the + services (typically 5671). + Default: None """ def __init__( @@ -137,6 +143,7 @@ def __init__( multi_user: bool | None = None, allowed_functions: list[str] | None = None, authentication_policy: str | None = None, + amqp_port: int | None = None, # Tuning info heartbeat_period=30, heartbeat_threshold=120, @@ -192,6 +199,8 @@ def __init__( self.allowed_functions = allowed_functions self.authentication_policy = authentication_policy + self.amqp_port = amqp_port + # Single-user tuning self.heartbeat_period = heartbeat_period self.heartbeat_threshold = heartbeat_threshold diff --git a/compute_endpoint/globus_compute_endpoint/endpoint/config/model.py b/compute_endpoint/globus_compute_endpoint/endpoint/config/model.py index 5441c8e08..cec6b7b5a 100644 --- a/compute_endpoint/globus_compute_endpoint/endpoint/config/model.py +++ b/compute_endpoint/globus_compute_endpoint/endpoint/config/model.py @@ -110,6 +110,7 @@ class ConfigModel(BaseConfigModel): log_dir: t.Optional[str] stdout: t.Optional[str] stderr: t.Optional[str] + amqp_port: t.Optional[int] _validate_engine = _validate_params("engine") diff --git a/compute_endpoint/globus_compute_endpoint/endpoint/endpoint_manager.py b/compute_endpoint/globus_compute_endpoint/endpoint/endpoint_manager.py index 3ac23b256..30ceebdee 100644 --- a/compute_endpoint/globus_compute_endpoint/endpoint/endpoint_manager.py +++ b/compute_endpoint/globus_compute_endpoint/endpoint/endpoint_manager.py @@ -205,6 +205,7 @@ def __init__( self._command = CommandQueueSubscriber( queue_info=cq_info, + port=config.amqp_port, command_queue=self._command_queue, stop_event=self._command_stop_event, thread_name="CQS", diff --git a/compute_endpoint/globus_compute_endpoint/endpoint/interchange.py b/compute_endpoint/globus_compute_endpoint/endpoint/interchange.py index 846bb452c..02df79931 100644 --- a/compute_endpoint/globus_compute_endpoint/endpoint/interchange.py +++ b/compute_endpoint/globus_compute_endpoint/endpoint/interchange.py @@ -170,6 +170,7 @@ def migrate_tasks_to_internal( log.info(f"Starting the TaskQueueSubscriber as {endpoint_uuid}") task_q_proc = TaskQueueSubscriber( queue_info=connection_params, + port=self.config.amqp_port, external_queue=pending_task_queue, quiesce_event=quiesce_event, endpoint_id=endpoint_uuid, @@ -330,7 +331,9 @@ def _main_loop(self): """ log.debug("_main_loop begins") - results_publisher = ResultPublisher(queue_info=self.result_q_info) + results_publisher = ResultPublisher( + queue_info=self.result_q_info, port=self.config.amqp_port + ) results_publisher.start() executor = self.executor diff --git a/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/command_queue_subscriber.py b/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/command_queue_subscriber.py index 2d325d0fc..e511ecea7 100644 --- a/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/command_queue_subscriber.py +++ b/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/command_queue_subscriber.py @@ -24,6 +24,7 @@ def __init__( self, *, queue_info: dict, + port: int | None = None, command_queue: queue.SimpleQueue[tuple[int, BasicProperties, bytes]], stop_event: threading.Event, poll_period_s: float = 0.5, @@ -36,6 +37,8 @@ def __init__( Parameters ---------- :param queue_info: the AMQP connection credentials, as received from upstream + :param port: Port to connect over AMQP(S). Takes precedence over value + sent by web service. :param command_queue: Messages from upstream will be placed in this queue; consumers of this queue must call .to_ack() with the message id when finished processing @@ -57,6 +60,7 @@ def __init__( super().__init__() self.queue_info = queue_info + self.port = port self._command_queue = command_queue self._stop_event = stop_event self._to_ack: queue.SimpleQueue[int] = queue.SimpleQueue() @@ -129,6 +133,8 @@ def run(self): def _connect(self) -> pika.SelectConnection: pika_params = pika.URLParameters(self.queue_info["connection_url"]) + if self.port is not None: + pika_params.port = self.port return pika.SelectConnection( pika_params, on_close_callback=self._on_connection_closed, diff --git a/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/result_publisher.py b/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/result_publisher.py index d252e485e..5ece4d94b 100644 --- a/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/result_publisher.py +++ b/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/result_publisher.py @@ -34,6 +34,7 @@ def __init__( self, *, queue_info: dict, + port: int | None = None, poll_period_s: float = 0.5, connect_attempt_limit: int = 5, channel_close_window_s: int = 10, @@ -42,6 +43,8 @@ def __init__( """ :param queue_info: Pika connection parameters to connect to RabbitMQ; typically as returned from the web-service by the web-service + :param port: Port to connect over AMQP(S). Takes precedence over value + sent by web service. :param poll_period_s: [default: 0.5] how frequently to check for and handle events. For example, if the thread should stop or if there are results to send along. @@ -54,6 +57,7 @@ def __init__( ``channel_close_window_s``) before shutting down the thread. """ self.queue_info = queue_info + self.port = port # how often to check for work; every `poll_period_s`, the `_event_watcher` # method will handle any outstanding work. @@ -152,6 +156,8 @@ def stop(self, block=True, timeout=5) -> None: def _connect(self) -> pika.SelectConnection: pika_params = pika.URLParameters(self.queue_info["connection_url"]) + if self.port is not None: + pika_params.port = self.port return pika.SelectConnection( pika_params, on_open_callback=self._on_connection_open, diff --git a/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/task_queue_subscriber.py b/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/task_queue_subscriber.py index 21eae950e..d057c2b06 100644 --- a/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/task_queue_subscriber.py +++ b/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/task_queue_subscriber.py @@ -34,6 +34,7 @@ def __init__( *, endpoint_id: str, queue_info: dict, + port: int | None = None, external_queue: multiprocessing.Queue, quiesce_event: EventType, ): @@ -45,6 +46,10 @@ def __init__( Dictionary that includes the key "connection_url", as well as exchange and queue declaration information specified by the server. + port: int or None + Port to connect over AMQP(S). Takes precedence over value + sent by web service. + external_queue: multiprocessing.Queue Each incoming message will be pushed to the queue. Please note that upon pushing a message into this queue, it will be @@ -62,6 +67,7 @@ def __init__( self.endpoint_id = endpoint_id self.external_queue = external_queue self.queue_info = queue_info + self.port = port self.quiesce_event = quiesce_event self._channel_closed = multiprocessing.Event() @@ -76,6 +82,8 @@ def __init__( def _connect(self) -> pika.SelectConnection: pika_params = pika.URLParameters(self.queue_info["connection_url"]) + if self.port is not None: + pika_params.port = self.port return pika.SelectConnection( pika_params, on_open_callback=self._on_connection_open, diff --git a/compute_endpoint/tests/unit/test_command_queue_subscriber.py b/compute_endpoint/tests/unit/test_command_queue_subscriber.py index 1b26386d9..012a62c29 100644 --- a/compute_endpoint/tests/unit/test_command_queue_subscriber.py +++ b/compute_endpoint/tests/unit/test_command_queue_subscriber.py @@ -272,3 +272,16 @@ def test_event_watcher_initiates_shutdown(mock_cqs): assert not mcqs._channel.close.called, "Verify test setup" mcqs._event_watcher() assert mcqs._channel.close.called + + +@pytest.mark.parametrize("port", [443, 5671, 5672]) +def test_amqp_port(port, mocker): + qi = {"queue": None, "connection_url": "amqp://some.address:1111"} + rcqs = cqs.CommandQueueSubscriber( + queue_info=qi, command_queue=mocker.Mock(), stop_event=mocker.Mock(), port=port + ) + connection = mocker.patch(f"{_MOCK_BASE}pika.SelectConnection") + + rcqs._connect() + + assert connection.call_args.args[0].port == port diff --git a/compute_endpoint/tests/unit/test_result_publisher.py b/compute_endpoint/tests/unit/test_result_publisher.py index 402be6358..12df24e12 100644 --- a/compute_endpoint/tests/unit/test_result_publisher.py +++ b/compute_endpoint/tests/unit/test_result_publisher.py @@ -3,6 +3,7 @@ from unittest import mock import pika +import pytest from globus_compute_endpoint.endpoint.rabbit_mq import ( RabbitPublisherStatus, ResultPublisher, @@ -218,3 +219,14 @@ def test_publish_enqueues_message(randomstring): res = randomstring() f_enqueued.set_result(res) assert f.result() == res + + +@pytest.mark.parametrize("port", [443, 5671, 5672]) +def test_amqp_port(port, mocker): + qi = {**q_info, "connection_url": "amqp://some.address:1111"} + rp = ResultPublisher(queue_info=qi, port=port) + connection = mocker.patch(f"{_MOCK_BASE}pika.SelectConnection") + + rp._connect() + + assert connection.call_args.args[0].port == port diff --git a/compute_endpoint/tests/unit/test_task_queue_subscriber.py b/compute_endpoint/tests/unit/test_task_queue_subscriber.py new file mode 100644 index 000000000..dba978fa2 --- /dev/null +++ b/compute_endpoint/tests/unit/test_task_queue_subscriber.py @@ -0,0 +1,23 @@ +import pytest +from globus_compute_endpoint.endpoint.rabbit_mq.task_queue_subscriber import ( + TaskQueueSubscriber, +) + +_MOCK_BASE = "globus_compute_endpoint.endpoint.rabbit_mq.task_queue_subscriber." + + +@pytest.mark.parametrize("port", [443, 5671, 5672]) +def test_amqp_port(port, mocker): + qi = {"queue": None, "connection_url": "amqp://some.address:1111"} + tqs = TaskQueueSubscriber( + endpoint_id="", + external_queue=mocker.Mock(), + quiesce_event=mocker.Mock(), + queue_info=qi, + port=port, + ) + connection = mocker.patch(f"{_MOCK_BASE}pika.SelectConnection") + + tqs._connect() + + assert connection.call_args.args[0].port == port