Skip to content

Commit

Permalink
Add Q2 Disable router annotation flag
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Nov 10, 2023
1 parent 20c09f7 commit 720e29e
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 8 deletions.
18 changes: 10 additions & 8 deletions docs/notes/router-annotations.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ next hop router.
* A destination address. This optional field overrides the value of
the *to* field in the message properties section.
* A flag that indicates that the message is classified as
a *streaming message*. This provides a hint to the downstream router
to avoid unnecessary buffering of the message before forwarding it.
* A set of flags that control various aspects of the message
transfer. See the description below for more detail.
* The identifier for the ingress-mesh. This is added by the ingress
router for the message when that router is an edge router in an edge-mesh.
Expand Down Expand Up @@ -103,11 +102,14 @@ The definition of this custom section follows:
<field name="ingress-mesh" type="str8-utf8" mandatory="false"/>
</type>
* The flags field is used for passing boolean flags. The least
significant bit (bit 0; hex 0x01) is used for the streaming flag. All
other bits are reserved and initialized to zero. An interior router
must ignore these unreserved flags and pass them to the next hop
without modifying them.
* The flags field is used for passing boolean flags. Undefined bits
are reserved and must be forwarded to the next hop without
modification. The following bits are defined:
** bit 0 (hex 0x01) is used to indicate the message is streaming. This provides a hint to
the downstream router to avoid unnecessary buffering of the message before forwarding it.
** bit 1 (hex 0x02) is used to indicate the transfer supports resend-release. See the
corresponding document for more details.
** bit 2 (hex 0x04) is used to indicate Q2 backpressure should be disabled for this message.
* Entries on the trace list are restricted to the *str8-utf8*
type. Note that the trace list will always be present however it may
Expand Down
26 changes: 26 additions & 0 deletions include/qpid/dispatch/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,32 @@ void qd_message_set_resend_released_annotation(qd_message_t *msg, bool value);
*/
bool qd_message_is_resend_released(const qd_message_t *msg);

/**
* Mark this message so Q2 will be disabled on this router and all downstream routers.
*
* This annotation can be used to disable Q2 backpressure along the entire path this message travels through the router
* network. It should only be used if there is some other form of flow-control in effect, such as the TCP adaptor
* windowing algorithm.
*
* This function will invoke qd_message_Q2_holdoff_disable() on the message to disable Q2 backpressure on the current
* router. Note that this may invoke the Q2 unblock handler if the message is currently blocked by Q2.
*
* @param msg Pointer to a message.
*/
void qd_message_set_Q2_disabled_annotation(qd_message_t *msg);

/**
* Returns true if the "disable Q2" flag is set in the message annotation section. See
* qd_message_set_Q2_disabled_annotation()
*
* It is expected that the caller will use this function to check if an incoming message needs to have Q2 backpressure
* disabled. If true the caller should invoke qd_message_Q2_holdoff_disable() on the given message prior to forwarding
* it.
*
* @param msg Pointer to a message.
*/
bool qd_message_is_Q2_disabled_annotation(const qd_message_t *msg);

/**
* Prevent the router from doing any transformations to the message annotations
* section of the message.
Expand Down
16 changes: 16 additions & 0 deletions src/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -1274,6 +1274,22 @@ bool qd_message_is_resend_released(const qd_message_t *msg)
}


void qd_message_set_Q2_disabled_annotation(qd_message_t *msg)
{
qd_message_pvt_t *msg_pvt = (qd_message_pvt_t*) msg;
if (!(msg_pvt->ra_flags & MSG_FLAG_DISABLE_Q2)) {
msg_pvt->ra_flags |= MSG_FLAG_DISABLE_Q2;
qd_message_Q2_holdoff_disable(msg);
}
}


bool qd_message_is_Q2_disabled_annotation(const qd_message_t *msg) {
const qd_message_pvt_t *msg_pvt = (const qd_message_pvt_t*) msg;
return !!(msg_pvt->ra_flags & MSG_FLAG_DISABLE_Q2);
}


void qd_message_disable_router_annotations(qd_message_t *msg)
{
qd_message_content_t *content = ((qd_message_pvt_t *)msg)->content;
Expand Down
1 change: 1 addition & 0 deletions src/message_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ ALLOC_DECLARE(qd_message_content_t);
#define MSG_CONTENT(m) (((qd_message_pvt_t*) m)->content)
#define MSG_FLAG_STREAMING 0x01u
#define MSG_FLAG_RESEND_RELEASED 0x02u
#define MSG_FLAG_DISABLE_Q2 0x04u

/** Initialize logging */
void qd_message_initialize(void);
Expand Down
7 changes: 7 additions & 0 deletions src/router_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,13 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
return next_delivery;
}

//
// Before the message is delivered check if Q2 has been disabled by the upstream router.
//
if (qd_message_is_Q2_disabled_annotation(msg)) {
qd_message_Q2_holdoff_disable(msg);
}

if (anonymous_link) {
qd_iterator_t *addr_iter = 0;

Expand Down
18 changes: 18 additions & 0 deletions tests/system_tests_router_annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,24 @@ def test_07_ra_big_interior_interior(self):
expected = ["0/RouterA", "0/RouterB", "0/RouterC"]
self.assertEqual(expected, ra.trace)

def test_08_q2_disabled_flag(self):
"""Verify that the Q2 Disabled flag is forwarded across all routers"""
_name = "test_08_q2_disabled_flag"
ra = RouterAnnotationsSection(flags=0x04) # 0x04 == Q2_DISABLED
msg = InterRouterMessage(router_annotations=ra, body=_name)
msg.address = f"closest/{_name}"
# anonymous sender:
test = MessageAnnotations(msg,
self.EdgeA.addresses[0],
self.EdgeC.addresses[0],
msg.address,
None)
test.run()
self.assertIsNone(test.error)
self.assertEqual(_name, test.recv_msg.body)
ra = test.recv_msg.router_annotations
self.assertEqual(0x04, ra.flags, f"Expected ra_flags=0x04 got {ra.flags}")


class InvalidMessageAnnotations(MessagingHandler):
"""Simple client for sending invalid router annotations"""
Expand Down

0 comments on commit 720e29e

Please sign in to comment.