Skip to content

Commit

Permalink
Fixes skupperproject#1416: ensure cutthrough notifications are raised…
Browse files Browse the repository at this point in the history
… properly

The notification calls must be made whenever the cutthrough message
slots are updated. Move these calls to the point in the code where the
slots are updated. Doing so no longer requires users of the message
I/O API to remember to manually call the cutthrough notification API.

Closes skupperproject#1416
  • Loading branch information
kgiusti committed Feb 14, 2024
1 parent 9308e8a commit d73df53
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 16 deletions.
3 changes: 0 additions & 3 deletions src/adaptors/tcp_lite/tcp_lite.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <qpid/dispatch/protocol_adaptor.h>
#include <qpid/dispatch/server.h>
#include <qpid/dispatch/log.h>
#include <qpid/dispatch/cutthrough_utils.h>
#include <qpid/dispatch/platform.h>
#include <qpid/dispatch/connection_counters.h>
#include <proton/proactor.h>
Expand Down Expand Up @@ -640,7 +639,6 @@ static uint64_t produce_read_buffers_XSIDE_IO(tcplite_connection_t *conn, qd_mes
if (!DEQ_IS_EMPTY(qd_buffers)) {
//qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] produce_read_buffers_XSIDE_IO - Producing %ld buffers", conn->conn_id, DEQ_SIZE(qd_buffers));
qd_message_produce_buffers(stream, &qd_buffers);
cutthrough_notify_buffers_produced_inbound(stream);
}
} else {
*blocked = true;
Expand Down Expand Up @@ -677,7 +675,6 @@ static uint64_t consume_write_buffers_XSIDE_IO(tcplite_connection_t *conn, qd_me
}
//qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] consume_write_buffers_XSIDE_IO - Consuming %ld buffers", conn->conn_id, actual);
pn_raw_connection_write_buffers(conn->raw_conn, raw_buffers, actual);
cutthrough_notify_buffers_consumed_outbound(stream);
}
}

Expand Down
21 changes: 21 additions & 0 deletions src/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "qpid/dispatch/iterator.h"
#include "qpid/dispatch/log.h"
#include "qpid/dispatch/threading.h"
#include <qpid/dispatch/cutthrough_utils.h>

#include <proton/object.h>

Expand Down Expand Up @@ -1530,6 +1531,7 @@ bool qd_message_has_data_in_content_or_pending_buffers(qd_message_t *msg)

static void qd_message_receive_cutthrough(qd_message_t *in_msg, pn_delivery_t *delivery, pn_link_t *link, qd_message_content_t *content)
{
bool notify_produced = false;
while (pn_delivery_pending(delivery) > 0 && (sys_atomic_get(&content->uct_consume_slot) - sys_atomic_get(&content->uct_produce_slot)) % UCT_SLOT_COUNT != 1) {
//
// The ring is not full, build a buffer list from the link data and produce one slot.
Expand Down Expand Up @@ -1577,6 +1579,7 @@ static void qd_message_receive_cutthrough(qd_message_t *in_msg, pn_delivery_t *d
//
// Advance the producer slot pointer
//
notify_produced = true;
sys_atomic_set(&content->uct_produce_slot, (use_slot + 1) % UCT_SLOT_COUNT);

if ((sys_atomic_get(&content->uct_consume_slot) - sys_atomic_get(&content->uct_produce_slot)) % UCT_SLOT_COUNT == 1) {
Expand All @@ -1588,6 +1591,10 @@ static void qd_message_receive_cutthrough(qd_message_t *in_msg, pn_delivery_t *d
if (!pn_delivery_partial(delivery) && pn_delivery_pending(delivery) == 0) {
qd_message_set_receive_complete(in_msg);
}

if (notify_produced) {
cutthrough_notify_buffers_produced_inbound(in_msg);
}
}


Expand Down Expand Up @@ -1873,6 +1880,7 @@ uint32_t _compose_router_annotations(qd_message_pvt_t *msg, unsigned int ra_flag
static void qd_message_send_cut_through(qd_message_pvt_t *msg, qd_message_content_t *content, pn_link_t *pnl, pn_session_t *pns, bool *q3_stalled)
{
const size_t q3_upper = QD_BUFFER_SIZE * QD_QLIMIT_Q3_UPPER;
bool notify_consumed = false;

*q3_stalled = !IS_ATOMIC_FLAG_SET(&content->aborted) && (pn_session_outgoing_bytes(pns) >= q3_upper);
while (!*q3_stalled && (sys_atomic_get(&content->uct_consume_slot) - sys_atomic_get(&content->uct_produce_slot)) % UCT_SLOT_COUNT != 0) {
Expand All @@ -1889,6 +1897,7 @@ static void qd_message_send_cut_through(qd_message_pvt_t *msg, qd_message_conten
}

sys_atomic_set(&content->uct_consume_slot, (use_slot + 1) % UCT_SLOT_COUNT);
notify_consumed = true;
*q3_stalled = !IS_ATOMIC_FLAG_SET(&content->aborted) && (pn_session_outgoing_bytes(pns) >= q3_upper);
}

Expand All @@ -1900,6 +1909,10 @@ static void qd_message_send_cut_through(qd_message_pvt_t *msg, qd_message_conten
//
SET_ATOMIC_FLAG(&msg->send_complete);
}

if (notify_consumed) {
cutthrough_notify_buffers_consumed_outbound((qd_message_t *) msg);
}
}


Expand Down Expand Up @@ -3299,6 +3312,7 @@ void qd_message_produce_buffers(qd_message_t *stream, qd_buffer_list_t *buffers)
uint32_t useSlot = sys_atomic_get(&content->uct_produce_slot);
DEQ_MOVE(*buffers, content->uct_slots[useSlot]);
sys_atomic_set(&content->uct_produce_slot, (useSlot + 1) % UCT_SLOT_COUNT);
cutthrough_notify_buffers_produced_inbound(stream);

if ((sys_atomic_get(&content->uct_consume_slot) - sys_atomic_get(&content->uct_produce_slot)) % UCT_SLOT_COUNT == 1) {
SET_ATOMIC_FLAG(&content->uct_producer_stalled);
Expand All @@ -3310,7 +3324,9 @@ int qd_message_consume_buffers(qd_message_t *stream, qd_buffer_list_t *buffers,
{
qd_message_content_t *content = MSG_CONTENT(stream);
int count = 0;
bool notify_consumed = false;
bool empty = sys_atomic_get(&content->uct_consume_slot) == sys_atomic_get(&content->uct_produce_slot);

while (count < limit && !empty) {
uint32_t useSlot = sys_atomic_get(&content->uct_consume_slot);
while (count < limit && !DEQ_IS_EMPTY(content->uct_slots[useSlot])) {
Expand All @@ -3320,11 +3336,16 @@ int qd_message_consume_buffers(qd_message_t *stream, qd_buffer_list_t *buffers,
count++;
}
if (DEQ_IS_EMPTY(content->uct_slots[useSlot])) {
notify_consumed = true;
sys_atomic_set(&content->uct_consume_slot, (useSlot + 1) % UCT_SLOT_COUNT);
}
empty = sys_atomic_get(&content->uct_consume_slot) == sys_atomic_get(&content->uct_produce_slot);
}

if (notify_consumed) {
cutthrough_notify_buffers_consumed_outbound(stream);
}

return count;
}

Expand Down
15 changes: 2 additions & 13 deletions src/router_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include <qpid/dispatch.h>
#include <qpid/dispatch/protocol_adaptor.h>
#include <qpid/dispatch/proton_utils.h>
#include <qpid/dispatch/cutthrough_utils.h>
#include <qpid/dispatch/protocols.h>
#include <qpid/dispatch/connection_counters.h>

Expand Down Expand Up @@ -400,11 +399,6 @@ static int AMQP_conn_wake_handler(void *type_context, qd_connection_t *conn, voi
qd_link_q3_block(qlink);
}

//
// Handle any subsequent activation that is needed
//
cutthrough_notify_buffers_consumed_outbound(stream);

//
// If the stream is send complete, we don't need to be activated any more. Cancel the activation on the stream.
//
Expand Down Expand Up @@ -451,11 +445,6 @@ static int AMQP_conn_wake_handler(void *type_context, qd_connection_t *conn, voi
//
qd_message_receive(qdr_node_delivery_pn_from_qdr(delivery));

//
// Handle any subsequent activation that is needed
//
cutthrough_notify_buffers_produced_inbound(stream);

//
// If the stream is receive complete, we don't need to be activated any more. Cancel the activation on the stream.
//
Expand Down Expand Up @@ -668,9 +657,9 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
//

if (delivery) {
// For cutthrough the core thread only gets notified when the delivery first arrives (via a call to
// qdr_link_deliver) and when it is complete. Do not call qdr_delivery_continue otherwise.
if (qd_message_is_unicast_cutthrough(msg)) {
cutthrough_notify_buffers_produced_inbound(msg);

if (receive_complete) {
qdr_delivery_continue(router->router_core, delivery, pn_delivery_settled(pnd));
}
Expand Down

0 comments on commit d73df53

Please sign in to comment.