From 08f966c2798d58fd32bff4690830bcb0656df241 Mon Sep 17 00:00:00 2001 From: Karen-Schoener <100308664+Karen-Schoener@users.noreply.github.com> Date: Fri, 6 Dec 2024 16:01:49 -0500 Subject: [PATCH] Fixes #1690 add mesh multicast test (#1691) Fixes #1690 --- tests/system_tests_multicast.py | 159 +++++++++++++++++++++++++++++++- 1 file changed, 155 insertions(+), 4 deletions(-) diff --git a/tests/system_tests_multicast.py b/tests/system_tests_multicast.py index 179325881..65d18dd26 100644 --- a/tests/system_tests_multicast.py +++ b/tests/system_tests_multicast.py @@ -33,7 +33,8 @@ from proton import Message from proton import Delivery from system_test import AsyncTestSender, AsyncTestReceiver, TestCase -from system_test import Qdrouterd, main_module, TIMEOUT, TestTimeout, unittest +from system_test import Qdrouterd, main_module, TIMEOUT, TestTimeout, \ + PollTimeout, unittest from system_test import ALLOCATOR_TYPE, ROUTER_ADDRESS_TYPE @@ -131,6 +132,12 @@ def router(name, mode, extra): cls.INT_A.wait_router_connected('INT.B') cls.INT_B.wait_router_connected('INT.A') + + # Check to make sure that the routers are connected to their + # respective edge router. + cls.INT_A.is_edge_routers_connected(num_edges=1) + cls.INT_B.is_edge_routers_connected(num_edges=1) + cls.EA1.wait_connectors() cls.EB1.wait_connectors() @@ -269,6 +276,7 @@ def test_50_presettled(self): body = " MCAST PRESETTLED " test = MulticastPresettled(self.config, 10, body, SendPresettled()) test.run() + self.assertIsNone(test.error) def test_51_presettled_mixed_large_msg(self): # Same as above, but large message bodies (mixed sender settle mode) @@ -429,6 +437,125 @@ def test_999_check_for_leaks(self): self._check_for_leaks() +class MulticastMeshTest(TestCase): + """ + Verify the multicast forwarding logic across a mesh router + configuration + """ + @classmethod + def setUpClass(cls): + """Start a router""" + super(MulticastMeshTest, cls).setUpClass() + + def router(name, mode, extra): + config = [ + ('router', {'mode': mode, + 'id': name, + 'workerThreads': W_THREADS}), + ('listener', {'role': 'normal', + 'port': cls.tester.get_port()}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), + ] + + if extra: + config.extend(extra) + config = Qdrouterd.Config(config) + cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) + return cls.routers[-1] + + # configuration: + # 3 interior routers. + # + # +---------+ +---------+ + # | INT.A |----| INT.B | + # +---------+ +---------+ + # \ / + # \ / + # +---------+ + # | INT.C | + # +---------+ + # + # Each router has 2 multicast consumers + # INT.A has a multicast sender + + cls.routers = [] + + interrouter_port_B_to_A = cls.tester.get_port() + interrouter_port_C_to_A = cls.tester.get_port() + interrouter_port_C_to_B = cls.tester.get_port() + + router('INT.A', 'interior', + [('listener', {'role': 'inter-router', + 'port': interrouter_port_B_to_A}), + ('listener', {'role': 'inter-router', + 'port': interrouter_port_C_to_A})]) + cls.INT_A = cls.routers[0] + cls.INT_A.listener = cls.INT_A.addresses[0] + + router('INT.B', 'interior', + [('connector', {'name': 'connectorToA', + 'role': 'inter-router', + 'port': interrouter_port_B_to_A}), + ('listener', {'role': 'inter-router', + 'port': interrouter_port_C_to_B})]) + cls.INT_B = cls.routers[1] + cls.INT_B.listener = cls.INT_B.addresses[0] + + router('INT.C', 'interior', + [('connector', {'name': 'connectorToA', + 'role': 'inter-router', + 'port': interrouter_port_C_to_A}), + ('connector', {'name': 'connectorToB', + 'role': 'inter-router', + 'port': interrouter_port_C_to_B})]) + cls.INT_C = cls.routers[2] + cls.INT_C.listener = cls.INT_C.addresses[0] + + cls.INT_A.wait_router_connected('INT.B') + cls.INT_A.wait_router_connected('INT.C') + cls.INT_B.wait_router_connected('INT.A') + cls.INT_B.wait_router_connected('INT.C') + cls.INT_C.wait_router_connected('INT.A') + cls.INT_C.wait_router_connected('INT.B') + + # Client topology: + # all routes have 2 receivers + # Interior INT_A has 1 sender + # + cls.config = [ + # Interior router INT_A: + {'router': cls.INT_A, + 'senders': ['S-INT_A-1'], + 'receivers': ['R-INT_A-1', 'R-INT_A-2'], + 'subscribers': 2, + 'remotes': 2, + }, + # Interior router INT_B: + {'router': cls.INT_B, + 'senders': [], + 'receivers': ['R-INT_B-1', 'R-INT_B-2'], + 'subscribers': 2, + 'remotes': 2, + }, + # Interior router INT_C: + {'router': cls.INT_C, + 'senders': [], + 'receivers': ['R-INT_C-1', 'R-INT_C-2'], + 'subscribers': 2, + 'remotes': 2, + } + ] + + def test_150_presettled(self): + # Simply send a bunch of pre-settled multicast messages. + # Also, wait before stopping test in order to wait for any + # any duplicate messages to be received. + body = " MCAST PRESETTLED " + test = MulticastPresettled(self.config, 10, body, SendPresettled(), 10) + test.run() + self.assertIsNone(test.error) + + # # Settlement options for Link attach # @@ -506,6 +633,7 @@ def __init__(self, config, count, body, topic=None, **handler_kwargs): # per receiver self.c_received = {} + self.received_msgs = {} # count per outcome self.n_outcomes = {} @@ -525,7 +653,7 @@ def done(self): self.s_conns = {} def timeout(self): - self.error = "Timeout Expired" + self.error = f"Timeout Expired: c_received={self.c_received}" self.done() @abc.abstractmethod @@ -585,6 +713,20 @@ def on_message(self, event): self.n_received += 1 name = event.link.name self.c_received[name] = 1 + self.c_received.get(name, 0) + self.check_for_duplicate_msg(event) + + def check_for_duplicate_msg(self, event): + name = event.link.name + + if name not in self.received_msgs: + self.received_msgs[name] = {} + + # Check for duplicate messages + if event.message.body in self.received_msgs[name]: + self.error = f"Duplicate message detected for receiver {name}, c_received={self.c_received}" + self.done() + else: + self.received_msgs[name][event.message.body] = True def on_accepted(self, event): self.n_accepted += 1 @@ -636,7 +778,7 @@ class MulticastPresettled(MulticastBase): Verifies that all messages are settled by the sender """ - def __init__(self, config, count, body, settlement_mode): + def __init__(self, config, count, body, settlement_mode, done_wait_interval=0): # use a large prefetch to prevent drops super(MulticastPresettled, self).__init__(config, count, @@ -650,6 +792,7 @@ def __init__(self, config, count, body, settlement_mode): self.sender_settled = 0 self.done_count = 0 self.unsettled_deliveries = dict() + self.done_wait_interval = done_wait_interval def create_receiver(self, container, conn, source, name): return container.create_receiver(conn, source=source, name=name, @@ -673,7 +816,15 @@ def check_if_done(self): # sender to_rcv = self.n_senders * self.msg_count * self.n_receivers if to_rcv == self.n_received and not self.unsettled_deliveries: - self.done() + if self.done_wait_interval: + # start a timer to wait before stopping test in order to + # wait for any any duplicate messages to be received + self.timers.append(self.reactor.schedule(self.done_wait_interval, PollTimeout(self))) + else: + self.done() + + def poll_timeout(self): + self.done() def on_message(self, event): super(MulticastPresettled, self).on_message(event)