Skip to content

Commit

Permalink
Add configurable AMQP port to endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-janidlo committed Oct 26, 2023
1 parent 0efa055 commit 3833aac
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 1 deletion.
3 changes: 3 additions & 0 deletions changelog.d/20231020_110321_chris_443_ampqs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ New Functionality

- The ``Executor`` can now be told which port to use to listen to AMQP results, via
either the ``amqp_port`` keyword argument or the ``amqp_port`` property.

- Endpoints can be configured to talk to RMQ over a different port via the ``amqp_port``
configuration option.
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ class Config(RepresentationMixin):
start command for that single-user endpoint within this timeframe, the
single-user endpoint is started back up again.
Default: 30 seconds
amqp_port : int | None
Port to use for AMQP connections. Note that only 5671, 5672, and 443 are
supported by the Compute web services. If None, the port is assigned by the
services (typically 5671).
Default: None
"""

def __init__(
Expand All @@ -137,6 +143,7 @@ def __init__(
multi_user: bool | None = None,
allowed_functions: list[str] | None = None,
authentication_policy: str | None = None,
amqp_port: int | None = None,
# Tuning info
heartbeat_period=30,
heartbeat_threshold=120,
Expand Down Expand Up @@ -192,6 +199,8 @@ def __init__(
self.allowed_functions = allowed_functions
self.authentication_policy = authentication_policy

self.amqp_port = amqp_port

# Single-user tuning
self.heartbeat_period = heartbeat_period
self.heartbeat_threshold = heartbeat_threshold
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class ConfigModel(BaseConfigModel):
log_dir: t.Optional[str]
stdout: t.Optional[str]
stderr: t.Optional[str]
amqp_port: t.Optional[int]

_validate_engine = _validate_params("engine")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ def __init__(

self._command = CommandQueueSubscriber(
queue_info=cq_info,
port=config.amqp_port,
command_queue=self._command_queue,
stop_event=self._command_stop_event,
thread_name="CQS",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def migrate_tasks_to_internal(
log.info(f"Starting the TaskQueueSubscriber as {endpoint_uuid}")
task_q_proc = TaskQueueSubscriber(
queue_info=connection_params,
port=self.config.amqp_port,
external_queue=pending_task_queue,
quiesce_event=quiesce_event,
endpoint_id=endpoint_uuid,
Expand Down Expand Up @@ -330,7 +331,9 @@ def _main_loop(self):
"""
log.debug("_main_loop begins")

results_publisher = ResultPublisher(queue_info=self.result_q_info)
results_publisher = ResultPublisher(
queue_info=self.result_q_info, port=self.config.amqp_port
)
results_publisher.start()

executor = self.executor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(
self,
*,
queue_info: dict,
port: int | None = None,
command_queue: queue.SimpleQueue[tuple[int, BasicProperties, bytes]],
stop_event: threading.Event,
poll_period_s: float = 0.5,
Expand All @@ -36,6 +37,8 @@ def __init__(
Parameters
----------
:param queue_info: the AMQP connection credentials, as received from upstream
:param port: Port to connect over AMQP(S). Takes precedence over value
sent by web service.
:param command_queue: Messages from upstream will be placed in this queue;
consumers of this queue must call .to_ack() with the message id when
finished processing
Expand All @@ -57,6 +60,7 @@ def __init__(
super().__init__()

self.queue_info = queue_info
self.port = port
self._command_queue = command_queue
self._stop_event = stop_event
self._to_ack: queue.SimpleQueue[int] = queue.SimpleQueue()
Expand Down Expand Up @@ -129,6 +133,8 @@ def run(self):

def _connect(self) -> pika.SelectConnection:
pika_params = pika.URLParameters(self.queue_info["connection_url"])
if self.port is not None:
pika_params.port = self.port
return pika.SelectConnection(
pika_params,
on_close_callback=self._on_connection_closed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __init__(
self,
*,
queue_info: dict,
port: int | None = None,
poll_period_s: float = 0.5,
connect_attempt_limit: int = 5,
channel_close_window_s: int = 10,
Expand All @@ -42,6 +43,8 @@ def __init__(
"""
:param queue_info: Pika connection parameters to connect to RabbitMQ;
typically as returned from the web-service by the web-service
:param port: Port to connect over AMQP(S). Takes precedence over value
sent by web service.
:param poll_period_s: [default: 0.5] how frequently to check for and
handle events. For example, if the thread should stop or if there
are results to send along.
Expand All @@ -54,6 +57,7 @@ def __init__(
``channel_close_window_s``) before shutting down the thread.
"""
self.queue_info = queue_info
self.port = port

# how often to check for work; every `poll_period_s`, the `_event_watcher`
# method will handle any outstanding work.
Expand Down Expand Up @@ -152,6 +156,8 @@ def stop(self, block=True, timeout=5) -> None:

def _connect(self) -> pika.SelectConnection:
pika_params = pika.URLParameters(self.queue_info["connection_url"])
if self.port is not None:
pika_params.port = self.port
return pika.SelectConnection(
pika_params,
on_open_callback=self._on_connection_open,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __init__(
*,
endpoint_id: str,
queue_info: dict,
port: int | None = None,
external_queue: multiprocessing.Queue,
quiesce_event: EventType,
):
Expand All @@ -45,6 +46,10 @@ def __init__(
Dictionary that includes the key "connection_url", as well as exchange
and queue declaration information specified by the server.
port: int or None
Port to connect over AMQP(S). Takes precedence over value
sent by web service.
external_queue: multiprocessing.Queue
Each incoming message will be pushed to the queue.
Please note that upon pushing a message into this queue, it will be
Expand All @@ -62,6 +67,7 @@ def __init__(
self.endpoint_id = endpoint_id
self.external_queue = external_queue
self.queue_info = queue_info
self.port = port
self.quiesce_event = quiesce_event

self._channel_closed = multiprocessing.Event()
Expand All @@ -76,6 +82,8 @@ def __init__(

def _connect(self) -> pika.SelectConnection:
pika_params = pika.URLParameters(self.queue_info["connection_url"])
if self.port is not None:
pika_params.port = self.port
return pika.SelectConnection(
pika_params,
on_open_callback=self._on_connection_open,
Expand Down
13 changes: 13 additions & 0 deletions compute_endpoint/tests/unit/test_command_queue_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,16 @@ def test_event_watcher_initiates_shutdown(mock_cqs):
assert not mcqs._channel.close.called, "Verify test setup"
mcqs._event_watcher()
assert mcqs._channel.close.called


@pytest.mark.parametrize("port", [443, 5671, 5672])
def test_amqp_port(port, mocker):
qi = {"queue": None, "connection_url": "amqp://some.address:1111"}
rcqs = cqs.CommandQueueSubscriber(
queue_info=qi, command_queue=mocker.Mock(), stop_event=mocker.Mock(), port=port
)
connection = mocker.patch(f"{_MOCK_BASE}pika.SelectConnection")

rcqs._connect()

assert connection.call_args.args[0].port == port
12 changes: 12 additions & 0 deletions compute_endpoint/tests/unit/test_result_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from unittest import mock

import pika
import pytest
from globus_compute_endpoint.endpoint.rabbit_mq import (
RabbitPublisherStatus,
ResultPublisher,
Expand Down Expand Up @@ -218,3 +219,14 @@ def test_publish_enqueues_message(randomstring):
res = randomstring()
f_enqueued.set_result(res)
assert f.result() == res


@pytest.mark.parametrize("port", [443, 5671, 5672])
def test_amqp_port(port, mocker):
qi = {**q_info, "connection_url": "amqp://some.address:1111"}
rp = ResultPublisher(queue_info=qi, port=port)
connection = mocker.patch(f"{_MOCK_BASE}pika.SelectConnection")

rp._connect()

assert connection.call_args.args[0].port == port
23 changes: 23 additions & 0 deletions compute_endpoint/tests/unit/test_task_queue_subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import pytest
from globus_compute_endpoint.endpoint.rabbit_mq.task_queue_subscriber import (
TaskQueueSubscriber,
)

_MOCK_BASE = "globus_compute_endpoint.endpoint.rabbit_mq.task_queue_subscriber."


@pytest.mark.parametrize("port", [443, 5671, 5672])
def test_amqp_port(port, mocker):
qi = {"queue": None, "connection_url": "amqp://some.address:1111"}
tqs = TaskQueueSubscriber(
endpoint_id="",
external_queue=mocker.Mock(),
quiesce_event=mocker.Mock(),
queue_info=qi,
port=port,
)
connection = mocker.patch(f"{_MOCK_BASE}pika.SelectConnection")

tqs._connect()

assert connection.call_args.args[0].port == port

0 comments on commit 3833aac

Please sign in to comment.