Skip to content

Commit

Permalink
Fixes #1690 add mesh multicast test (#1691)
Browse files Browse the repository at this point in the history
Fixes #1690
  • Loading branch information
Karen-Schoener authored Dec 6, 2024
1 parent 82cafde commit 08f966c
Showing 1 changed file with 155 additions and 4 deletions.
159 changes: 155 additions & 4 deletions tests/system_tests_multicast.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
from proton import Message
from proton import Delivery
from system_test import AsyncTestSender, AsyncTestReceiver, TestCase
from system_test import Qdrouterd, main_module, TIMEOUT, TestTimeout, unittest
from system_test import Qdrouterd, main_module, TIMEOUT, TestTimeout, \
PollTimeout, unittest
from system_test import ALLOCATOR_TYPE, ROUTER_ADDRESS_TYPE


Expand Down Expand Up @@ -131,6 +132,12 @@ def router(name, mode, extra):

cls.INT_A.wait_router_connected('INT.B')
cls.INT_B.wait_router_connected('INT.A')

# Check to make sure that the routers are connected to their
# respective edge router.
cls.INT_A.is_edge_routers_connected(num_edges=1)
cls.INT_B.is_edge_routers_connected(num_edges=1)

cls.EA1.wait_connectors()
cls.EB1.wait_connectors()

Expand Down Expand Up @@ -269,6 +276,7 @@ def test_50_presettled(self):
body = " MCAST PRESETTLED "
test = MulticastPresettled(self.config, 10, body, SendPresettled())
test.run()
self.assertIsNone(test.error)

def test_51_presettled_mixed_large_msg(self):
# Same as above, but large message bodies (mixed sender settle mode)
Expand Down Expand Up @@ -429,6 +437,125 @@ def test_999_check_for_leaks(self):
self._check_for_leaks()


class MulticastMeshTest(TestCase):
"""
Verify the multicast forwarding logic across a mesh router
configuration
"""
@classmethod
def setUpClass(cls):
"""Start a router"""
super(MulticastMeshTest, cls).setUpClass()

def router(name, mode, extra):
config = [
('router', {'mode': mode,
'id': name,
'workerThreads': W_THREADS}),
('listener', {'role': 'normal',
'port': cls.tester.get_port()}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
]

if extra:
config.extend(extra)
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
return cls.routers[-1]

# configuration:
# 3 interior routers.
#
# +---------+ +---------+
# | INT.A |----| INT.B |
# +---------+ +---------+
# \ /
# \ /
# +---------+
# | INT.C |
# +---------+
#
# Each router has 2 multicast consumers
# INT.A has a multicast sender

cls.routers = []

interrouter_port_B_to_A = cls.tester.get_port()
interrouter_port_C_to_A = cls.tester.get_port()
interrouter_port_C_to_B = cls.tester.get_port()

router('INT.A', 'interior',
[('listener', {'role': 'inter-router',
'port': interrouter_port_B_to_A}),
('listener', {'role': 'inter-router',
'port': interrouter_port_C_to_A})])
cls.INT_A = cls.routers[0]
cls.INT_A.listener = cls.INT_A.addresses[0]

router('INT.B', 'interior',
[('connector', {'name': 'connectorToA',
'role': 'inter-router',
'port': interrouter_port_B_to_A}),
('listener', {'role': 'inter-router',
'port': interrouter_port_C_to_B})])
cls.INT_B = cls.routers[1]
cls.INT_B.listener = cls.INT_B.addresses[0]

router('INT.C', 'interior',
[('connector', {'name': 'connectorToA',
'role': 'inter-router',
'port': interrouter_port_C_to_A}),
('connector', {'name': 'connectorToB',
'role': 'inter-router',
'port': interrouter_port_C_to_B})])
cls.INT_C = cls.routers[2]
cls.INT_C.listener = cls.INT_C.addresses[0]

cls.INT_A.wait_router_connected('INT.B')
cls.INT_A.wait_router_connected('INT.C')
cls.INT_B.wait_router_connected('INT.A')
cls.INT_B.wait_router_connected('INT.C')
cls.INT_C.wait_router_connected('INT.A')
cls.INT_C.wait_router_connected('INT.B')

# Client topology:
# all routes have 2 receivers
# Interior INT_A has 1 sender
#
cls.config = [
# Interior router INT_A:
{'router': cls.INT_A,
'senders': ['S-INT_A-1'],
'receivers': ['R-INT_A-1', 'R-INT_A-2'],
'subscribers': 2,
'remotes': 2,
},
# Interior router INT_B:
{'router': cls.INT_B,
'senders': [],
'receivers': ['R-INT_B-1', 'R-INT_B-2'],
'subscribers': 2,
'remotes': 2,
},
# Interior router INT_C:
{'router': cls.INT_C,
'senders': [],
'receivers': ['R-INT_C-1', 'R-INT_C-2'],
'subscribers': 2,
'remotes': 2,
}
]

def test_150_presettled(self):
# Simply send a bunch of pre-settled multicast messages.
# Also, wait before stopping test in order to wait for any
# any duplicate messages to be received.
body = " MCAST PRESETTLED "
test = MulticastPresettled(self.config, 10, body, SendPresettled(), 10)
test.run()
self.assertIsNone(test.error)


#
# Settlement options for Link attach
#
Expand Down Expand Up @@ -506,6 +633,7 @@ def __init__(self, config, count, body, topic=None, **handler_kwargs):

# per receiver
self.c_received = {}
self.received_msgs = {}

# count per outcome
self.n_outcomes = {}
Expand All @@ -525,7 +653,7 @@ def done(self):
self.s_conns = {}

def timeout(self):
self.error = "Timeout Expired"
self.error = f"Timeout Expired: c_received={self.c_received}"
self.done()

@abc.abstractmethod
Expand Down Expand Up @@ -585,6 +713,20 @@ def on_message(self, event):
self.n_received += 1
name = event.link.name
self.c_received[name] = 1 + self.c_received.get(name, 0)
self.check_for_duplicate_msg(event)

def check_for_duplicate_msg(self, event):
name = event.link.name

if name not in self.received_msgs:
self.received_msgs[name] = {}

# Check for duplicate messages
if event.message.body in self.received_msgs[name]:
self.error = f"Duplicate message detected for receiver {name}, c_received={self.c_received}"
self.done()
else:
self.received_msgs[name][event.message.body] = True

def on_accepted(self, event):
self.n_accepted += 1
Expand Down Expand Up @@ -636,7 +778,7 @@ class MulticastPresettled(MulticastBase):
Verifies that all messages are settled by the sender
"""

def __init__(self, config, count, body, settlement_mode):
def __init__(self, config, count, body, settlement_mode, done_wait_interval=0):
# use a large prefetch to prevent drops
super(MulticastPresettled, self).__init__(config,
count,
Expand All @@ -650,6 +792,7 @@ def __init__(self, config, count, body, settlement_mode):
self.sender_settled = 0
self.done_count = 0
self.unsettled_deliveries = dict()
self.done_wait_interval = done_wait_interval

def create_receiver(self, container, conn, source, name):
return container.create_receiver(conn, source=source, name=name,
Expand All @@ -673,7 +816,15 @@ def check_if_done(self):
# sender
to_rcv = self.n_senders * self.msg_count * self.n_receivers
if to_rcv == self.n_received and not self.unsettled_deliveries:
self.done()
if self.done_wait_interval:
# start a timer to wait before stopping test in order to
# wait for any any duplicate messages to be received
self.timers.append(self.reactor.schedule(self.done_wait_interval, PollTimeout(self)))
else:
self.done()

def poll_timeout(self):
self.done()

def on_message(self, event):
super(MulticastPresettled, self).on_message(event)
Expand Down

0 comments on commit 08f966c

Please sign in to comment.