diff --git a/compute_endpoint/globus_compute_endpoint/endpoint/interchange.py b/compute_endpoint/globus_compute_endpoint/endpoint/interchange.py index 846bb452c..2ddb34630 100644 --- a/compute_endpoint/globus_compute_endpoint/endpoint/interchange.py +++ b/compute_endpoint/globus_compute_endpoint/endpoint/interchange.py @@ -285,9 +285,11 @@ def _parent_watcher(ppid: int): except pika.exceptions.ProbableAuthenticationError as e: log.error(f"Unable to connect to AMQP service: {e}") self._kill_event.set() - except Exception: - log.error("Unhandled exception in main kernel.") - log.debug("Unhandled exception in main kernel.", exc_info=True) + except Exception as e: + log.error( + f"Unhandled exception in main kernel: ({type(e).__name__}) {e}" + ) + log.debug("Unhandled exception in main kernel.", exc_info=e) finally: if not self._kill_event.is_set(): self._reconnect_fail_counter += 1 diff --git a/compute_endpoint/tests/integration/endpoint/endpoint/test_endpoint.py b/compute_endpoint/tests/integration/endpoint/endpoint/test_endpoint.py index 67f385913..78e4de063 100644 --- a/compute_endpoint/tests/integration/endpoint/endpoint/test_endpoint.py +++ b/compute_endpoint/tests/integration/endpoint/endpoint/test_endpoint.py @@ -1,9 +1,9 @@ import json -import logging import os import pathlib +import random import uuid -from unittest.mock import MagicMock, Mock, patch +from unittest import mock import globus_compute_sdk.sdk.client import globus_compute_sdk.sdk.login_manager @@ -22,11 +22,26 @@ from globus_compute_sdk.sdk.web_client import WebClient _MOCK_BASE = "globus_compute_endpoint.endpoint.endpoint." +_SVC_ADDY = "http://api.funcx.fqdn" # something clearly not correct @pytest.fixture(autouse=True) def patch_compute_client(mocker): - return mocker.patch(f"{_MOCK_BASE}Client") + responses.add( + responses.GET, + _SVC_ADDY + "/v2/version", + json={"api": "1.2.0", "min_ep_version": "2.0.0", "min_sdk_version": "1.0.0a6"}, + status=200, + ) + + gcc = globus_compute_sdk.Client( + funcx_service_address=_SVC_ADDY, + do_version_check=False, + login_manager=mock.Mock(), + ) + gcc.web_client = WebClient(base_url=_SVC_ADDY) + + yield mocker.patch(f"{_MOCK_BASE}Client", return_value=gcc) def test_non_configured_endpoint(mocker): @@ -43,27 +58,9 @@ def test_non_configured_endpoint(mocker): "😎 Great display/.name", ], ) -def test_start_endpoint_display_name(mocker, fs, patch_compute_client, display_name): - svc_addy = "http://api.funcx" - gcc = globus_compute_sdk.Client( - funcx_service_address=svc_addy, - do_version_check=False, - login_manager=mocker.Mock(), - ) - gcc.web_client = WebClient(base_url=svc_addy) - patch_compute_client.return_value = gcc - - responses.add( - responses.GET, - svc_addy + "/v2/version", - json={"api": "1.0.5", "min_ep_version": "1.0.5", "min_sdk_version": "0.0.2a0"}, - status=200, - ) - responses.add( - responses.POST, - svc_addy + "/v2/endpoints", - json={}, - status=404, # we are verifying the POST, not the response +def test_start_endpoint_display_name(mocker, fs, display_name): + responses.add( # 404 == we are verifying the POST, not the response + responses.POST, _SVC_ADDY + "/v2/endpoints", json={}, status=404 ) ep = endpoint.Endpoint() @@ -74,8 +71,8 @@ def test_start_endpoint_display_name(mocker, fs, patch_compute_client, display_n with pytest.raises(SystemExit) as pyt_exc: ep.start_endpoint(ep_dir, str(uuid.uuid4()), ep_conf, False, True, reg_info={}) + assert int(str(pyt_exc.value)) == os.EX_UNAVAILABLE, "Verify exit due to test 404" - assert int(str(pyt_exc.value)) == os.EX_UNAVAILABLE req = pyt_exc.value.__cause__._underlying_response.request req_json = json.loads(req.body) if display_name is not None: @@ -84,29 +81,9 @@ def test_start_endpoint_display_name(mocker, fs, patch_compute_client, display_n assert "display_name" not in req_json -def test_start_endpoint_allowlist_passthrough(mocker, fs, patch_compute_client): - gcc_addy = "http://api.funcx" - gcc = globus_compute_sdk.Client( - funcx_service_address=gcc_addy, - do_version_check=False, - login_manager=mocker.Mock(), - ) - gcwc = WebClient(base_url=gcc_addy) - gcwc.post = MagicMock() - gcc.web_client = gcwc - patch_compute_client.return_value = gcc - - responses.add( - responses.GET, - gcc_addy + "/v2/version", - json={"api": "1.0.5", "min_ep_version": "1.0.5", "min_sdk_version": "0.0.2a0"}, - status=200, - ) - responses.add( - responses.POST, - gcc_addy + "/v2/endpoints", - json={}, - status=200, +def test_start_endpoint_allowlist_passthrough(mocker, fs): + responses.add( # 404 == we are verifying the POST, not the response + responses.POST, _SVC_ADDY + "/v2/endpoints", json={}, status=404 ) ep = endpoint.Endpoint() @@ -115,37 +92,19 @@ def test_start_endpoint_allowlist_passthrough(mocker, fs, patch_compute_client): ep_dir.mkdir(parents=True, exist_ok=True) ep_conf.allowed_functions = [str(uuid.uuid4()), str(uuid.uuid4())] - with pytest.raises(SystemExit): + with pytest.raises(SystemExit) as pyt_exc: ep.start_endpoint(ep_dir, str(uuid.uuid4()), ep_conf, False, True, reg_info={}) + assert int(str(pyt_exc.value)) == os.EX_UNAVAILABLE, "Verify exit due to test 404" - called_data = gcc.web_client.post.call_args[1]["data"] - assert len(called_data["allowed_functions"]) == 2 - assert called_data["allowed_functions"][1] == ep_conf.allowed_functions[1] + req = pyt_exc.value.__cause__._underlying_response.request + req_json = json.loads(req.body) + assert len(req_json["allowed_functions"]) == 2 + assert req_json["allowed_functions"][1] == ep_conf.allowed_functions[1] -def test_start_endpoint_auth_policy_passthrough(mocker, fs, patch_compute_client): - gcc_addy = "https://compute.api.globus.org" - gcc = globus_compute_sdk.Client( - funcx_service_address=gcc_addy, - do_version_check=False, - login_manager=mocker.Mock(), - ) - gcwc = WebClient(base_url=gcc_addy) - gcwc.post = MagicMock() - gcc.web_client = gcwc - patch_compute_client.return_value = gcc - - responses.add( - responses.GET, - gcc_addy + "/v2/version", - json={"api": "1.0.5", "min_ep_version": "1.0.5", "min_sdk_version": "0.0.2a0"}, - status=200, - ) - responses.add( - responses.POST, - gcc_addy + "/v2/endpoints", - json={}, - status=200, +def test_start_endpoint_auth_policy_passthrough(mocker, fs): + responses.add( # 404 == we are verifying the POST, not the response + responses.POST, _SVC_ADDY + "/v2/endpoints", json={}, status=404 ) ep_dir = pathlib.Path("/some/path/some_endpoint_name") @@ -155,17 +114,20 @@ def test_start_endpoint_auth_policy_passthrough(mocker, fs, patch_compute_client ep_conf = Config() ep_conf.authentication_policy = str(uuid.uuid4()) - with pytest.raises(SystemExit): + with pytest.raises(SystemExit) as pyt_exc: ep.start_endpoint(ep_dir, str(uuid.uuid4()), ep_conf, False, True, reg_info={}) + assert int(str(pyt_exc.value)) == os.EX_UNAVAILABLE, "Verify exit due to test 404" + + req = pyt_exc.value.__cause__._underlying_response.request + req_json = json.loads(req.body) - called_data = gcc.web_client.post.call_args[1]["data"] - assert called_data["authentication_policy"] == ep_conf.authentication_policy + assert req_json["authentication_policy"] == ep_conf.authentication_policy def test_endpoint_logout(monkeypatch): # not forced, and no running endpoints - logout_true = Mock(return_value=True) - logout_false = Mock(return_value=False) + logout_true = mock.Mock(return_value=True) + logout_false = mock.Mock(return_value=False) monkeypatch.setattr( globus_compute_sdk.sdk.login_manager.LoginManager, "logout", logout_true ) @@ -203,19 +165,23 @@ def test_endpoint_logout(monkeypatch): assert success -@patch(f"{_MOCK_BASE}Endpoint.get_endpoint_id", return_value="abc-uuid") -@patch("globus_compute_endpoint.cli.get_config") -@patch(f"{_MOCK_BASE}Client.stop_endpoint") +@mock.patch(f"{_MOCK_BASE}Endpoint.get_endpoint_id", return_value="abc-uuid") +@mock.patch("globus_compute_endpoint.cli.get_config") +@mock.patch(f"{_MOCK_BASE}Client.stop_endpoint") def test_stop_remote_endpoint(mock_get_id, mock_get_gcc, mock_stop_endpoint): ep_dir = pathlib.Path("some_ep_dir") / "abc-endpoint" _do_stop_endpoint(ep_dir=ep_dir, remote=False) assert not mock_stop_endpoint.called + + path = f"/v2/endpoints/{mock_stop_endpoint.return_value}/lock" + responses.add(responses.POST, _SVC_ADDY + path, json={}, status=200) + _do_stop_endpoint(ep_dir=ep_dir, remote=True) assert mock_stop_endpoint.called -@patch(f"{_MOCK_BASE}Endpoint.get_endpoint_id", return_value="abc-uuid") +@mock.patch(f"{_MOCK_BASE}Endpoint.get_endpoint_id", return_value="abc-uuid") @pytest.mark.parametrize( "cur_config", [ @@ -315,41 +281,46 @@ def test_endpoint_update_funcx(mock_get_id, mocker, fs, cur_config, randomstring raise AssertionError(f"Unexpected exception: ({type(e).__name__}) {e}") -def test_endpoint_setup_execution(mocker, tmp_path, randomstring, caplog): +def test_endpoint_setup_execution(mocker, tmp_path, randomstring): mocker.patch(f"{_MOCK_BASE}Endpoint.check_pidfile", return_value={"exists": False}) tmp_file_content = randomstring() tmp_file = tmp_path / "random.txt" tmp_file.write_text(tmp_file_content) - command = f"""\ -cat {tmp_file} -exit 1 # exit early to avoid the rest of endpoint setup - """ + exit_code = random.randint(1, 255) # == avoid rest of endpoint setup + command = f"cat {tmp_file}\nexit {exit_code}" endpoint_dir = None endpoint_uuid = None endpoint_config = Config(endpoint_setup=command) - log_to_console = None - no_color = None - reg_info = None + log_to_console = False + no_color = True + reg_info = {} ep = endpoint.Endpoint() - with caplog.at_level(logging.INFO), pytest.raises(SystemExit) as e: - ep.start_endpoint( - endpoint_dir, - endpoint_uuid, - endpoint_config, - log_to_console, - no_color, - reg_info, - ) + with mock.patch(f"{_MOCK_BASE}log") as mock_log: + with pytest.raises(SystemExit) as e: + ep.start_endpoint( + endpoint_dir, + endpoint_uuid, + endpoint_config, + log_to_console, + no_color, + reg_info, + ) assert e.value.code == os.EX_CONFIG - assert tmp_file_content in caplog.text + a, _k = mock_log.error.call_args + assert "endpoint_setup failed" in a[0] + assert f"exit code {exit_code}" in a[0] + + info_txt = "\n".join(a[0] for a, _k in mock_log.info.call_args_list) + assert tmp_file_content in info_txt -def test_endpoint_teardown_execution(mocker, tmp_path, randomstring, caplog): + +def test_endpoint_teardown_execution(mocker, tmp_path, randomstring): mocker.patch( f"{_MOCK_BASE}Endpoint.check_pidfile", return_value={"exists": True, "active": True}, @@ -359,19 +330,21 @@ def test_endpoint_teardown_execution(mocker, tmp_path, randomstring, caplog): tmp_file = tmp_path / "random.txt" tmp_file.write_text(tmp_file_content) - command = f"""\ -cat {tmp_file} -exit 1 # exit early to avoid the rest of endpoint teardown - """ + exit_code = random.randint(1, 255) # == avoid rest of endpoint setup + command = f"cat {tmp_file}\nexit {exit_code}" endpoint_dir = tmp_path endpoint_config = Config(endpoint_teardown=command) - with caplog.at_level(logging.INFO), pytest.raises(SystemExit) as e: - endpoint.Endpoint.stop_endpoint( - endpoint_dir, - endpoint_config, - ) + with mock.patch(f"{_MOCK_BASE}log") as mock_log: + with pytest.raises(SystemExit) as e: + endpoint.Endpoint.stop_endpoint(endpoint_dir, endpoint_config) assert e.value.code == os.EX_CONFIG - assert tmp_file_content in caplog.text + + a, _k = mock_log.error.call_args + assert "endpoint_teardown failed" in a[0] + assert f"exit code {exit_code}" in a[0] + + info_txt = "\n".join(a[0] for a, _k in mock_log.info.call_args_list) + assert tmp_file_content in info_txt diff --git a/compute_endpoint/tests/integration/endpoint/endpoint/test_endpoint_manager.py b/compute_endpoint/tests/integration/endpoint/endpoint/test_endpoint_manager.py index 0eb2a05c0..c7bb49f09 100644 --- a/compute_endpoint/tests/integration/endpoint/endpoint/test_endpoint_manager.py +++ b/compute_endpoint/tests/integration/endpoint/endpoint/test_endpoint_manager.py @@ -5,7 +5,7 @@ import shutil import uuid from importlib.machinery import SourceFileLoader -from unittest.mock import ANY +from unittest.mock import ANY, patch import globus_compute_endpoint.endpoint import pytest @@ -16,6 +16,7 @@ logger = logging.getLogger("mock_funcx") +_MOCK_BASE = "globus_compute_endpoint.endpoint.endpoint." DEF_CONFIG_DIR = ( pathlib.Path(globus_compute_endpoint.endpoint.config.__file__).resolve().parent ) @@ -104,7 +105,7 @@ def test_configure_multi_user(self, mu): "This test needs to be re-written after endpoint_register is updated" ) def test_start(self, mocker): - mock_client = mocker.patch("globus_compute_endpoint.endpoint.endpoint.Client") + mock_client = mocker.patch(f"{_MOCK_BASE}Client") reg_info = { "endpoint_id": "abcde12345", "address": "localhost", @@ -130,17 +131,13 @@ def test_start(self, mocker): mock_daemon = mocker.patch.object(Endpoint, "daemon_launch", return_value=None) - mock_uuid = mocker.patch("globus_compute_endpoint.endpoint.endpoint.uuid.uuid4") + mock_uuid = mocker.patch(f"{_MOCK_BASE}uuid.uuid4") mock_uuid.return_value = 123456 - mock_pidfile = mocker.patch( - "globus_compute_endpoint.endpoint.endpoint.daemon.pidfile.PIDLockFile" - ) + mock_pidfile = mocker.patch(f"{_MOCK_BASE}daemon.pidfile.PIDLockFile") mock_pidfile.return_value = None - mock_results_ack_handler = mocker.patch( - "globus_compute_endpoint.endpoint.endpoint.ResultsAckHandler" - ) + mock_results_ack_handler = mocker.patch(f"{_MOCK_BASE}ResultsAckHandler") manager = Endpoint(funcx_dir=os.getcwd()) config_dir = os.path.join(manager.funcx_dir, "mock_endpoint") @@ -189,11 +186,9 @@ def test_start_registration_error(self, mocker): being asserted against because this zmq setup happens before registration occurs. """ - mocker.patch("globus_compute_endpoint.endpoint.endpoint.Client") + mocker.patch(f"{_MOCK_BASE}Client") - mock_register_endpoint = mocker.patch( - "globus_compute_endpoint.endpoint.endpoint.register_endpoint" - ) + mock_register_endpoint = mocker.patch(f"{_MOCK_BASE}register_endpoint") mock_register_endpoint.side_effect = GlobusAPIError( _fake_http_response(status=400, method="POST") ) @@ -206,15 +201,13 @@ def test_start_registration_error(self, mocker): return_value=(b"12345abcde", b"12345abcde"), ) - mock_uuid = mocker.patch("globus_compute_endpoint.endpoint.endpoint.uuid.uuid4") + mock_uuid = mocker.patch(f"{_MOCK_BASE}uuid.uuid4") mock_uuid.return_value = 123456 - mock_pidfile = mocker.patch( - "globus_compute_endpoint.endpoint.endpoint.daemon.pidfile.PIDLockFile" - ) + mock_pidfile = mocker.patch(f"{_MOCK_BASE}daemon.pidfile.PIDLockFile") mock_pidfile.return_value = None - mocker.patch("globus_compute_endpoint.endpoint.endpoint.ResultsAckHandler") + mocker.patch(f"{_MOCK_BASE}ResultsAckHandler") manager = Endpoint(funcx_dir=os.getcwd()) config_dir = os.path.join(manager.funcx_dir, "mock_endpoint") @@ -247,11 +240,9 @@ def test_start_registration_5xx_error(self, mocker): own. mock_zmq_create and mock_zmq_load are being asserted against because this zmq setup happens before registration occurs. """ - mocker.patch("globus_compute_endpoint.endpoint.endpoint.Client") + mocker.patch(f"{_MOCK_BASE}Client") - mock_register_endpoint = mocker.patch( - "globus_compute_endpoint.endpoint.endpoint.register_endpoint" - ) + mock_register_endpoint = mocker.patch(f"{_MOCK_BASE}register_endpoint") mock_register_endpoint.side_effect = GlobusAPIError( _fake_http_response(status=500, method="POST") ) @@ -274,17 +265,13 @@ def test_start_registration_5xx_error(self, mocker): mock_daemon = mocker.patch.object(Endpoint, "daemon_launch", return_value=None) - mock_uuid = mocker.patch("globus_compute_endpoint.endpoint.endpoint.uuid.uuid4") + mock_uuid = mocker.patch(f"{_MOCK_BASE}uuid.uuid4") mock_uuid.return_value = 123456 - mock_pidfile = mocker.patch( - "globus_compute_endpoint.endpoint.endpoint.daemon.pidfile.PIDLockFile" - ) + mock_pidfile = mocker.patch(f"{_MOCK_BASE}daemon.pidfile.PIDLockFile") mock_pidfile.return_value = None - mock_results_ack_handler = mocker.patch( - "globus_compute_endpoint.endpoint.endpoint.ResultsAckHandler" - ) + mock_results_ack_handler = mocker.patch(f"{_MOCK_BASE}ResultsAckHandler") manager = Endpoint(funcx_dir=os.getcwd()) config_dir = os.path.join(manager.funcx_dir, "mock_endpoint") @@ -328,7 +315,7 @@ def test_start_registration_5xx_error(self, mocker): ) def test_start_without_executors(self, mocker): - mock_client = mocker.patch("globus_compute_endpoint.endpoint.endpoint.Client") + mock_client = mocker.patch(f"{_MOCK_BASE}Client") mock_client.return_value.register_endpoint.return_value = { "endpoint_id": "abcde12345", "address": "localhost", @@ -365,9 +352,7 @@ class mock_executors: @pytest.mark.skip("This test doesn't make much sense") def test_daemon_launch(self, mocker): - mock_interchange = mocker.patch( - "globus_compute_endpoint.endpoint.endpoint.EndpointInterchange" - ) + mock_interchange = mocker.patch(f"{_MOCK_BASE}EndpointInterchange") mock_interchange.return_value.start.return_value = None mock_interchange.return_value.stop.return_value = None @@ -456,7 +441,7 @@ def test_with_funcx_config(self, mocker): ) def test_get_or_create_endpoint_uuid_no_json_no_uuid(self, mocker): - mock_uuid = mocker.patch("globus_compute_endpoint.endpoint.endpoint.uuid.uuid4") + mock_uuid = mocker.patch(f"{_MOCK_BASE}uuid.uuid4") mock_uuid.return_value = 123456 config_dir = pathlib.Path("/some/path/mock_endpoint") @@ -481,15 +466,15 @@ def test_get_or_create_endpoint_uuid_given_json(self): assert "abcde12345" == manager.get_or_create_endpoint_uuid(config_dir, "234567") - @pytest.mark.parametrize("dir_exists", [True, False]) - @pytest.mark.parametrize("web_svc_ok", [True, False]) - @pytest.mark.parametrize("force", [True, False]) + @pytest.mark.parametrize("dir_exists", (True, False)) + @pytest.mark.parametrize("web_svc_ok", (True, False)) + @pytest.mark.parametrize("force", (True, False)) def test_delete_endpoint(self, mocker, dir_exists, web_svc_ok, force): manager = Endpoint() config_dir = pathlib.Path("/some/path/mock_endpoint") ep_uuid_str = str(uuid.uuid4()) - mock_client = mocker.patch("globus_compute_endpoint.endpoint.endpoint.Client") + mock_client = mocker.patch(f"{_MOCK_BASE}Client") mock_stop_endpoint = mocker.patch.object(Endpoint, "stop_endpoint") mock_rmtree = mocker.patch.object(shutil, "rmtree") mocker.patch.object(Endpoint, "get_endpoint_id", return_value=ep_uuid_str) @@ -508,11 +493,13 @@ def test_delete_endpoint(self, mocker, dir_exists, web_svc_ok, force): mock_client.return_value.delete_endpoint.side_effect = exc if not force: - with pytest.raises(SystemExit): + with pytest.raises(SystemExit), patch(f"{_MOCK_BASE}log") as mock_log: manager.delete_endpoint(config_dir, None, force) + a, _k = mock_log.critical.call_args + assert "without deleting the endpoint" in a[0], "expected notice" - mock_stop_endpoint.assert_not_called() - mock_rmtree.assert_not_called() + assert not mock_stop_endpoint.called + assert not mock_rmtree.called return manager.delete_endpoint(config_dir, None, force) diff --git a/compute_endpoint/tests/integration/endpoint/executors/high_throughput/test_manager.py b/compute_endpoint/tests/integration/endpoint/executors/high_throughput/test_manager.py index c8c37140f..0cd7b0f38 100644 --- a/compute_endpoint/tests/integration/endpoint/executors/high_throughput/test_manager.py +++ b/compute_endpoint/tests/integration/endpoint/executors/high_throughput/test_manager.py @@ -2,11 +2,14 @@ import pickle import queue import shutil +import subprocess import pytest from globus_compute_endpoint.engines.high_throughput.manager import Manager from globus_compute_endpoint.engines.high_throughput.messages import Task +_MOCK_BASE = "globus_compute_endpoint.engines.high_throughput.manager." + class TestManager: @pytest.fixture(autouse=True) @@ -17,9 +20,7 @@ def test_setup_teardown(self): def test_remove_worker_init(self, mocker): # zmq is being mocked here because it was making tests hang - mocker.patch( - "globus_compute_endpoint.engines.high_throughput.manager.zmq.Context" # noqa: E501 - ) + mocker.patch(f"{_MOCK_BASE}zmq.Context") # noqa: E501 manager = Manager(logdir="./", uid="mock_uid") manager.worker_map.to_die_count["RAW"] = 0 @@ -33,19 +34,13 @@ def test_remove_worker_init(self, mocker): def test_poll_funcx_task_socket(self, mocker): # zmq is being mocked here because it was making tests hang - mocker.patch( - "globus_compute_endpoint.engines.high_throughput.manager.zmq.Context" # noqa: E501 - ) - - mock_worker_map = mocker.patch( - "globus_compute_endpoint.engines.high_throughput.manager.WorkerMap" - ) + mocker.patch(f"{_MOCK_BASE}zmq.Context") # noqa: E501 + mock_worker_map = mocker.patch(f"{_MOCK_BASE}WorkerMap") manager = Manager(logdir="./", uid="mock_uid") manager.task_queues["RAW"] = queue.Queue() - manager.logdir = "./" manager.worker_type = "RAW" - manager.worker_procs["0"] = "proc" + manager.worker_procs["0"] = mocker.Mock(spec=subprocess.Popen) manager.funcx_task_socket.recv_multipart.return_value = ( b"0", diff --git a/compute_endpoint/tests/integration/test_rabbit_mq/test_task_q.py b/compute_endpoint/tests/integration/test_rabbit_mq/test_task_q.py index 0f2d1379c..2dad8d4c9 100644 --- a/compute_endpoint/tests/integration/test_rabbit_mq/test_task_q.py +++ b/compute_endpoint/tests/integration/test_rabbit_mq/test_task_q.py @@ -1,5 +1,4 @@ import json -import logging import multiprocessing import threading import time @@ -8,6 +7,8 @@ import pytest from globus_compute_endpoint.endpoint.rabbit_mq import TaskQueueSubscriber +_MOCK_BASE = "globus_compute_endpoint.endpoint.rabbit_mq.task_queue_subscriber." + def test_synch(start_task_q_publisher, start_task_q_subscriber, count=10): """Open publisher, and publish to task_q, then open subscriber a fetch""" @@ -20,9 +21,6 @@ def test_synch(start_task_q_publisher, start_task_q_subscriber, count=10): messages.append(b_message) task_q_pub.publish(b_message) - logging.warning(f"Published {count} messages, closing task_q_pub") - logging.warning("Starting task_q_subscriber") - tasks_out = multiprocessing.Queue() start_task_q_subscriber(queue=tasks_out) for i in range(count): @@ -46,14 +44,12 @@ def test_subscriber_recovery(start_task_q_publisher, start_task_q_subscriber): tasks_out = multiprocessing.Queue() quiesce_event = multiprocessing.Event() proc = start_task_q_subscriber(queue=tasks_out, quiesce_event=quiesce_event) - logging.warning("Proc started") for i in range(10): _, message = tasks_out.get() assert messages[i] == message # Terminate the connection proc.stop() - logging.warning("Disconnected") # Launch 10 more messages messages = [] @@ -68,27 +64,25 @@ def test_subscriber_recovery(start_task_q_publisher, start_task_q_subscriber): # Listen for the messages on a new connection quiesce_event.clear() - proc = start_task_q_subscriber(queue=tasks_out, quiesce_event=quiesce_event) + start_task_q_subscriber(queue=tasks_out, quiesce_event=quiesce_event) - logging.warning("Replacement proc started") for i in range(10): - logging.warning("getting message") _, message = tasks_out.get() - logging.warning(f"Got message: {message}") assert messages[i] == message -def test_exclusive_subscriber(start_task_q_publisher, start_task_q_subscriber): +def test_exclusive_subscriber(mocker, start_task_q_publisher, start_task_q_subscriber): """2 subscribers connect, only first one should get any messages""" task_q_pub = start_task_q_publisher() # Start two subscribers on the same rabbit queue tasks_out_1, tasks_out_2 = multiprocessing.Queue(), multiprocessing.Queue() start_task_q_subscriber(queue=tasks_out_1) - time.sleep(1) + time.sleep(0.1) + + mocker.patch(f"{_MOCK_BASE}logger") start_task_q_subscriber(queue=tasks_out_2) - logging.warning("TEST: Launching messages") # Launch 10 messages messages = [] for i in range(10): @@ -99,12 +93,10 @@ def test_exclusive_subscriber(start_task_q_publisher, start_task_q_subscriber): b_message = json.dumps(message).encode("utf-8") task_q_pub.publish(b_message) messages.append(b_message) - logging.warning("TEST: Launching messages") # Confirm that the first subscriber received all the messages for i in range(10): _, message = tasks_out_1.get(timeout=1) - logging.warning(f"Got message: {message}") assert messages[i] == message # Check that the second subscriber did not receive any messages @@ -129,10 +121,6 @@ def test_perf_combined_pub_sub_latency(start_task_q_publisher, start_task_q_subs assert b_message == x avg_latency = sum(latency) / len(latency) - logging.warning( - f"Message latencies in milliseconds, min:{1000*min(latency):.2f}, " - f"max:{1000*max(latency):.2f}, avg:{1000*avg_latency:.2f}" - ) # average latency is expected to be below 5ms # if it exceeds this, it means something is wrong assert avg_latency < 0.005 @@ -164,10 +152,6 @@ def test_perf_combined_pub_sub_throughput( sent_per_second = num_messages / send_time messages_per_second = num_messages / total_time - logging.warning( - f"task throughput for {num_messages} messages at {message_size}B = " - f"{messages_per_second:.2f} messages/s" - ) # each size should record at least 500 messages per second even in an # untuned context with other processes running # slower than that indicates that a serious performance regression has @@ -203,9 +187,8 @@ def _stop_connection_now(tqs: TaskQueueSubscriber): def test_terminate(start_task_q_subscriber): task_q = start_task_q_subscriber() - time.sleep(1) + time.sleep(0.1) task_q.stop() - logging.warning("Calling terminate") with pytest.raises(ValueError): # Expected to raise ValueError since the process should # be terminated at this point from the close() call diff --git a/compute_endpoint/tests/unit/test_bad_endpoint_config.py b/compute_endpoint/tests/unit/test_bad_endpoint_config.py index 81a4b62fe..4789e9b42 100644 --- a/compute_endpoint/tests/unit/test_bad_endpoint_config.py +++ b/compute_endpoint/tests/unit/test_bad_endpoint_config.py @@ -1,18 +1,21 @@ +from unittest import mock + import pytest from globus_compute_endpoint.engines import HighThroughputEngine -invalid_addresses = ["localhost", "login1.theta.alcf.anl.gov", "*"] +_MOCK_BASE = "globus_compute_endpoint.engines.high_throughput.engine." -@pytest.mark.parametrize("address", invalid_addresses) +@pytest.mark.parametrize("address", ("localhost", "login1.theta.alcf.anl.gov", "*")) def test_invalid_address(address): - with pytest.raises(ValueError): - HighThroughputEngine(address=address) - - -valid_addresses = ["192.168.64.12", "fe80::e643:4bff:fe61:8f72", "129.114.44.12"] + with mock.patch(f"{_MOCK_BASE}log") as mock_log: + with pytest.raises(ValueError): + HighThroughputEngine(address=address) + assert mock_log.critical.called -@pytest.mark.parametrize("address", valid_addresses) +@pytest.mark.parametrize( + "address", ("192.168.64.12", "fe80::e643:4bff:fe61:8f72", "129.114.44.12") +) def test_valid_address(address): HighThroughputEngine(address=address) diff --git a/compute_endpoint/tests/unit/test_command_queue_subscriber.py b/compute_endpoint/tests/unit/test_command_queue_subscriber.py index 1b26386d9..2869ae3b8 100644 --- a/compute_endpoint/tests/unit/test_command_queue_subscriber.py +++ b/compute_endpoint/tests/unit/test_command_queue_subscriber.py @@ -204,18 +204,35 @@ def test_cqs_connection_closed_stops_loop(mock_cqs): assert mcqs._connection.ioloop.stop.called -def test_cqs_channel_closed_retries_then_shuts_down(mock_cqs): - exc = Exception("some pika reason") +def test_cqs_channel_closed_retries_then_shuts_down(mock_cqs, randomstring): + exc_text = f"some pika reason {randomstring()}" + exc = Exception(exc_text) *_, mcqs = mock_cqs - for i in range(1, mcqs.channel_close_window_limit): - mcqs._connection.ioloop.call_later.reset_mock() + with mock.patch(f"{_MOCK_BASE}log") as mock_log: + for i in range(1, mcqs.channel_close_window_limit): + mcqs._connection.ioloop.call_later.reset_mock() + mcqs._on_channel_closed(mcqs._channel, exc) + assert len(mcqs._channel_closes) == i + a, _k = mock_log.warning.call_args + assert " Channel closed " in a[0] + assert exc_text in a[0], "Expect exception text in logs" + assert mock_log.debug.call_count == i + assert mock_log.warning.call_count == i + assert not mock_log.error.called + + mcqs._on_channel_closed(mcqs._connection, exc) + assert mock_log.error.called + a, _k = mock_log.error.call_args + assert " Unable to sustain channel " in a[0], "Expect error log after attempts" + assert exc_text in a[0], "Expect exception text in logs" + + # and finally, no error if called "too many" times mcqs._on_channel_closed(mcqs._connection, exc) - assert len(mcqs._channel_closes) == i - mcqs._on_channel_closed(mcqs._connection, exc) + assert mock_log.error.call_count == 2 - # and finally, no error if called "too many" times - mcqs._on_channel_closed(mcqs._connection, exc) + assert mock_log.debug.call_count == i, "After attempts, should be only errors" + assert mock_log.warning.call_count == i, "After attempts, should be only errors" def test_cqs_stable_connection_resets_fail_counter(mocker, mock_cqs): diff --git a/compute_endpoint/tests/unit/test_endpointinterchange.py b/compute_endpoint/tests/unit/test_endpointinterchange.py index b042a09ba..f0b69e9eb 100644 --- a/compute_endpoint/tests/unit/test_endpointinterchange.py +++ b/compute_endpoint/tests/unit/test_endpointinterchange.py @@ -1,4 +1,5 @@ import random +from unittest import mock import pytest from globus_compute_endpoint.endpoint.config import Config @@ -7,10 +8,11 @@ _mock_base = "globus_compute_endpoint.endpoint.interchange." -def test_main_exception_always_quiesces(mocker, fs, reset_signals): +def test_main_exception_always_quiesces(mocker, fs, randomstring, reset_signals): num_iterations = random.randint(1, 10) + exc_text = f"Woot {randomstring()}" - # [False, False] * num because _kill_event and _quiesce_event are the same mock; + # [False, ...] * num because _kill_event and _quiesce_event are the same mock; # .is_set() is called twice per loop is_set_returns = [False, False, False] * num_iterations + [True] false_true_g = iter(is_set_returns) @@ -26,9 +28,13 @@ def false_true(): ) ei._task_puller_proc = mocker.MagicMock() ei._start_threads_and_main = mocker.MagicMock() - ei._start_threads_and_main.side_effect = Exception("Woot") + ei._start_threads_and_main.side_effect = Exception(exc_text) ei._kill_event.is_set = false_true - ei.start() + with mock.patch(f"{_mock_base}log") as mock_log: + ei.start() + assert mock_log.error.call_count == num_iterations + a, _k = mock_log.error.call_args + assert exc_text in a[0], "Expect exception text shared in logs" assert ei._quiesce_event.set.called assert ei._quiesce_event.set.call_count == num_iterations diff --git a/compute_endpoint/tests/unit/test_endpointmanager_unit.py b/compute_endpoint/tests/unit/test_endpointmanager_unit.py index ba4278fa0..88c96e6eb 100644 --- a/compute_endpoint/tests/unit/test_endpointmanager_unit.py +++ b/compute_endpoint/tests/unit/test_endpointmanager_unit.py @@ -500,8 +500,11 @@ def test_restarts_running_endpoint_with_cached_args(epmanager, mocker): em._cached_cmd_start_args[child_pid] = args_tup em.cmd_start_endpoint = mocker.Mock() - em.wait_for_children() + with mock.patch(f"{_MOCK_BASE}log") as mock_log: + em.wait_for_children() + a, _k = mock_log.info.call_args + assert "using cached arguments to start" in a[0] assert em.cmd_start_endpoint.call_args.args == args_tup diff --git a/compute_endpoint/tests/unit/test_execute_task.py b/compute_endpoint/tests/unit/test_execute_task.py index 3e641c119..fe27155fb 100644 --- a/compute_endpoint/tests/unit/test_execute_task.py +++ b/compute_endpoint/tests/unit/test_execute_task.py @@ -1,5 +1,6 @@ import logging import uuid +from unittest import mock from globus_compute_common import messagepack from globus_compute_endpoint.engines.helper import execute_task @@ -8,6 +9,8 @@ logger = logging.getLogger(__name__) +_MOCK_BASE = "globus_compute_endpoint.engines.helper." + def divide(x, y): return x / y @@ -56,7 +59,13 @@ def test_execute_task_with_exception(): ) ) - packed_result = execute_task(task_id, task_message) + with mock.patch(f"{_MOCK_BASE}log") as mock_log: + packed_result = execute_task(task_id, task_message) + + assert mock_log.exception.called + a, _k = mock_log.exception.call_args + assert "while executing user function" in a[0] + assert isinstance(packed_result, bytes) result = messagepack.unpack(packed_result) assert isinstance(result, messagepack.message_types.Result) diff --git a/compute_endpoint/tests/unit/test_worker.py b/compute_endpoint/tests/unit/test_worker.py index d868762c3..8c8c3cbf0 100644 --- a/compute_endpoint/tests/unit/test_worker.py +++ b/compute_endpoint/tests/unit/test_worker.py @@ -9,6 +9,8 @@ from globus_compute_endpoint.engines.high_throughput.messages import Task from globus_compute_endpoint.engines.high_throughput.worker import Worker +_MOCK_BASE = "globus_compute_endpoint.engines.high_throughput.worker." + def hello_world(): return "hello world" @@ -39,9 +41,7 @@ def reset_signals_auto(reset_signals): @pytest.fixture def test_worker(): - with mock.patch( - "globus_compute_endpoint.engines.high_throughput.worker.zmq.Context" - ) as mock_context: + with mock.patch(f"{_MOCK_BASE}zmq.Context") as mock_context: # the worker will receive tasks and send messages on this mock socket mock_socket = mock.Mock() mock_context.return_value.socket.return_value = mock_socket @@ -97,7 +97,8 @@ def test_execute_failing_function(test_worker): task_body = test_worker.serializer.serialize((failing_function, (), {})) task_message = Task(task_id, "RAW", task_body).pack() - result = test_worker._worker_execute_task(str(task_id), task_message) + with mock.patch(f"{_MOCK_BASE}log") as mock_log: + result = test_worker._worker_execute_task(str(task_id), task_message) assert isinstance(result, dict) assert "data" in result @@ -105,6 +106,9 @@ def test_execute_failing_function(test_worker): assert isinstance(result, messagepack.message_types.Result) assert result.task_id == task_id + a, _k = mock_log.exception.call_args + assert "Failed to execute task" in a[0] + # error string contains the KeyError which failed assert "KeyError" in result.data assert result.is_error is True @@ -129,7 +133,8 @@ def test_execute_function_exceeding_result_size_limit(test_worker): messagepack.message_types.Task(task_id=task_id, task_buffer=payload) ) - s_result = execute_task(task_id, task_body, result_size_limit=return_size - 2) + with mock.patch("globus_compute_endpoint.engines.helper.log") as mock_log: + s_result = execute_task(task_id, task_body, result_size_limit=return_size - 2) result = messagepack.unpack(s_result) assert isinstance(result, messagepack.message_types.Result) @@ -137,6 +142,7 @@ def test_execute_function_exceeding_result_size_limit(test_worker): assert result.task_id == task_id assert result.error_details assert result.error_details.code == "MaxResultSizeExceeded" + assert mock_log.exception.called def sleeper(t): @@ -156,10 +162,12 @@ def test_app_timeout(test_worker): messagepack.message_types.Task(task_id=task_id, task_buffer=task_body) ) - with mock.patch.dict(os.environ, {"GC_TASK_TIMEOUT": "0.01"}): - packed_result = execute_task(task_id, task_body) + with mock.patch("globus_compute_endpoint.engines.helper.log") as mock_log: + with mock.patch.dict(os.environ, {"GC_TASK_TIMEOUT": "0.01"}): + packed_result = execute_task(task_id, task_body) result = messagepack.unpack(packed_result) assert isinstance(result, messagepack.message_types.Result) assert result.task_id == task_id assert "AppTimeout" in result.data + assert mock_log.exception.called diff --git a/compute_endpoint/tox.ini b/compute_endpoint/tox.ini index ada32e1e1..cb6bc61c9 100644 --- a/compute_endpoint/tox.ini +++ b/compute_endpoint/tox.ini @@ -10,16 +10,14 @@ extras = test usedevelop = true commands = coverage erase - coverage run -m pytest --durations 5 {posargs} + coverage run -m pytest --durations 5 --log-cli-level=ERROR {posargs} coverage report [testenv:mypy] deps = - mypy==1.5.1 - types-requests - types-click + mypy==1.6.1 ../compute_sdk/ -commands = mypy -p globus_compute_endpoint {posargs} +commands = mypy --install-types --non-interactive -p globus_compute_endpoint {posargs} [testenv:publish-release]