Skip to content

Commit

Permalink
Fixes #1675: Remove unused link-route code
Browse files Browse the repository at this point in the history
This patch retains the parts of the link routing code that was
leveraged by the core link endpoint feature. This retained code has
been renamed to identify its use by core endpoints.
  • Loading branch information
kgiusti committed Nov 20, 2024
1 parent 77ecd45 commit d6ae6af
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 175 deletions.
16 changes: 8 additions & 8 deletions include/qpid/dispatch/protocol_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -720,14 +720,15 @@ const char *qdr_link_internal_address(const qdr_link_t *link);
bool qdr_link_is_anonymous(const qdr_link_t *link);

/**
* qdr_link_is_routed
* qdr_link_is_core_endpoint
*
* Indicate whether the link is link-routed.
* Indicate whether the link is terminated in the router core. These links are used by the core to send and receive
* messages.
*
* @param link Link object
* @return True if the link is link-routed.
* @return True if the link is terminated in the core
*/
bool qdr_link_is_routed(const qdr_link_t *link);
bool qdr_link_is_core_endpoint(const qdr_link_t *link);

/**
* qdr_link_strip_annotations_in
Expand Down Expand Up @@ -851,10 +852,9 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
bool settled, qd_bitmask_t *link_exclusion, int ingress_index,
uint64_t remote_disposition,
qd_delivery_state_t *remote_state);
qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg, bool settled,
const uint8_t *tag, int tag_length,
uint64_t remote_disposition,
qd_delivery_state_t *remote_state);
qdr_delivery_t *qdr_link_deliver_to_core(qdr_link_t *link, qd_message_t *msg, bool settled,
uint64_t remote_disposition,
qd_delivery_state_t *remote_state);

/**
* qdr_link_process_deliveries
Expand Down
32 changes: 7 additions & 25 deletions src/adaptors/amqp/amqp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -770,35 +770,17 @@ static bool AMQP_rx_handler(qd_router_t *router, qd_link_t *link)
break;
}

// Handle the link-routed case
// Handle deliveries destined to a core endpoint.
//
if (qdr_link_is_routed(rlink)) {
pn_delivery_tag_t dtag = pn_delivery_tag(pnd);

if (dtag.size > QDR_DELIVERY_TAG_MAX) {
qd_log(LOG_ROUTER, QD_LOG_DEBUG, "link route delivery failure: msg tag size exceeded %zd (max=%d)",
dtag.size, QDR_DELIVERY_TAG_MAX);
qd_message_set_discard(msg, true);
pn_link_flow(pn_link, 1);
_reject_delivery(pnd, QD_AMQP_COND_INVALID_FIELD, "delivery tag length exceeded");
if (receive_complete) {
pn_delivery_settle(pnd);
qd_message_free(msg);
}
return next_delivery;
}

if (qdr_link_is_core_endpoint(rlink)) {
log_link_message(conn, pn_link, msg);
delivery = qdr_link_deliver_to_routed_link(rlink,
msg,
pn_delivery_settled(pnd),
(uint8_t*) dtag.start,
dtag.size,
pn_delivery_remote_state(pnd),
qd_delivery_read_remote_state(pnd));
delivery = qdr_link_deliver_to_core(rlink, msg,
pn_delivery_settled(pnd),
pn_delivery_remote_state(pnd),
qd_delivery_read_remote_state(pnd));
qd_link_set_incoming_msg(link, (qd_message_t*) 0); // msg no longer exclusive to qd_link
qdr_node_connect_deliveries(link, delivery, pnd);
qdr_delivery_decref(router->router_core, delivery, "release protection of return from deliver_to_routed_link");
qdr_delivery_decref(router->router_core, delivery, "release protection of return from deliver_to_core");
return next_delivery;
}

Expand Down
11 changes: 2 additions & 9 deletions src/router_core/agent_link.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,6 @@ static void qdr_agent_write_column_CT(qdr_core_t *core, qd_composed_field_t *bod
case QDR_LINK_OWNING_ADDR:
if(link->owning_addr)
qd_compose_insert_string(body, address_key(link->owning_addr));
else if (link->connected_link && link->connected_link->terminus_addr)
qd_compose_insert_string(body, link->connected_link->terminus_addr);
else if (link->terminus_addr)
qd_compose_insert_string(body, link->terminus_addr);
else
Expand All @@ -159,13 +157,8 @@ static void qdr_agent_write_column_CT(qdr_core_t *core, qd_composed_field_t *bod
qd_compose_insert_uint(body, link->capacity);
break;

case QDR_LINK_PEER:
if (link->connected_link) {
char id[100];
snprintf(id, 100, "%"PRId64, link->connected_link->identity);
qd_compose_insert_string(body, id);
} else
qd_compose_insert_null(body);
case QDR_LINK_PEER: // link-routing no longer supported
qd_compose_insert_null(body);
break;

case QDR_LINK_UNDELIVERED_COUNT:
Expand Down
73 changes: 2 additions & 71 deletions src/router_core/connections.c
Original file line number Diff line number Diff line change
Expand Up @@ -611,9 +611,9 @@ bool qdr_link_is_anonymous(const qdr_link_t *link)
}


bool qdr_link_is_routed(const qdr_link_t *link)
bool qdr_link_is_core_endpoint(const qdr_link_t *link)
{
return link->connected_link != 0 || link->core_endpoint != 0;
return link->core_endpoint != 0;
}


Expand Down Expand Up @@ -1061,23 +1061,6 @@ void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *conn, qd
}


static void qdr_link_abort_undelivered_CT(qdr_core_t *core, qdr_link_t *link)
{
assert(link->link_direction == QD_OUTGOING);

qdr_connection_t *conn = link->conn;

sys_mutex_lock(&conn->work_lock);
qdr_delivery_t *dlv = DEQ_HEAD(link->undelivered);
while (dlv) {
if (!qdr_delivery_receive_complete(dlv))
qdr_delivery_set_aborted(dlv);
dlv = DEQ_NEXT(dlv);
}
sys_mutex_unlock(&conn->work_lock);
}


static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link, const char *log_text)
{
//
Expand All @@ -1093,14 +1076,6 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
if (link->core_endpoint)
qdrc_endpoint_do_cleanup_CT(core, link->core_endpoint);

//
// If the link has a connected peer, unlink the peer
//
if (link->connected_link) {
link->connected_link->connected_link = 0;
link->connected_link = 0;
}

//
// If this link is involved in inter-router communication, remove its reference
// from the core mask-bit tables
Expand Down Expand Up @@ -2240,22 +2215,6 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac
return;
}

//
// Handle attach-routed links
//
if (link->connected_link) {
qdr_terminus_t *remote_terminus = link->link_direction == QD_OUTGOING ? target : source;
if (link->strip_prefix) {
qdr_terminus_strip_address_prefix(remote_terminus, link->strip_prefix);
}
if (link->insert_prefix) {
qdr_terminus_insert_address_prefix(remote_terminus, link->insert_prefix);
}

qdr_link_outbound_second_attach_CT(core, link->connected_link, source, target);
return;
}

if (link->link_direction == QD_INCOMING) {
//
// Handle incoming link cases
Expand Down Expand Up @@ -2362,34 +2321,6 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
}
}

//
// For routed links, propagate the detach
//
if (link->connected_link) {
//
// If the connected link is outgoing and there is a delivery on the connected link's undelivered
// list that is not receive-complete, we must flag that delivery as aborted or it will forever
// block the propagation of the detach.
//
if (link->connected_link->link_direction == QD_OUTGOING)
qdr_link_abort_undelivered_CT(core, link->connected_link);

if (dt != QD_LOST)
qdr_link_outbound_detach_CT(core, link->connected_link, error, QDR_CONDITION_NONE, dt == QD_CLOSED);
else {
qdr_link_outbound_detach_CT(core, link->connected_link, 0, QDR_CONDITION_ROUTED_LINK_LOST, !link->terminus_survives_disconnect);
qdr_error_free(error);
}

//
// If the link is completely detached, release its resources
//
if (link->detach_send_done)
qdr_link_cleanup_protected_CT(core, conn, link, "Link detached");

return;
}

//
// For auto links, switch the auto link to failed state and record the error
//
Expand Down
7 changes: 3 additions & 4 deletions src/router_core/delivery.c
Original file line number Diff line number Diff line change
Expand Up @@ -346,12 +346,11 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv) TA_NO_THREAD
}

//
// If this is an incoming link and it is not link-routed or inter-router, issue
// one replacement credit on the link. Note that credit on inter-router links is
// issued immediately even for unsettled deliveries.
// If this is an incoming link and it is not inter-router or inter-edge, issue one replacement credit on the link.
// Note that credit on inter-router links is issued immediately even for unsettled deliveries.
//
if (moved && link->link_direction == QD_INCOMING &&
link->link_type != QD_LINK_ROUTER && !link->edge && !link->connected_link)
link->link_type != QD_LINK_ROUTER && !link->edge)
qdr_link_issue_credit_CT(core, link, 1, false);

return moved;
Expand Down
14 changes: 0 additions & 14 deletions src/router_core/forwarder.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,20 +317,6 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery
out_dlv->link_work = qdr_link_work_getref(work);
sys_mutex_unlock(&out_link->conn->work_lock);

//
// We are dealing here only with link routed deliveries
// If the out_link has a connected link and if the out_link is an inter-router link, increment the global deliveries_transit
// If the out_link is a route container link, add to the global deliveries_egress
//
if (out_link->connected_link) {
if (out_link->conn->role == QDR_ROLE_INTER_ROUTER) {
core->deliveries_transit++;
}
else {
core->deliveries_egress++;
}
}

//
// Activate the outgoing connection for later processing.
//
Expand Down
3 changes: 0 additions & 3 deletions src/router_core/router_core_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ struct qdr_action_t {
qdr_delivery_t *delivery;
qd_delivery_state_t *dstate;
uint64_t disposition;
uint8_t tag[32];
int tag_length;
bool settled;
bool presettled; // true if remote settles while msg is in flight
bool more; // true if there are more frames arriving, false otherwise
Expand Down Expand Up @@ -450,7 +448,6 @@ struct qdr_link_t {
int detach_count; ///< 0, 1, or 2 depending on the state of the lifecycle
uint32_t open_moved_streams; ///< Number of still-open streaming deliveries that were moved from this link
qdr_address_t *owning_addr; ///< [ref] Address record that owns this link
qdr_link_t *connected_link; ///< [ref] If this is a link-route, reference the connected link
qdrc_endpoint_t *core_endpoint; ///< [ref] Set if this link terminates on an in-core endpoint
qdr_link_ref_t *ref[QDR_LINK_LIST_CLASSES]; ///< Pointers to containing reference objects
qdr_auto_link_t *auto_link; ///< [ref] Auto_link that owns this link
Expand Down
49 changes: 8 additions & 41 deletions src/router_core/transfer.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,11 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
}


qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg, bool settled,
const uint8_t *tag, int tag_length,
uint64_t remote_disposition,
qd_delivery_state_t* remote_state)
qdr_delivery_t *qdr_link_deliver_to_core(qdr_link_t *link, qd_message_t *msg, bool settled,
uint64_t remote_disposition,
qd_delivery_state_t* remote_state)
{
qdr_action_t *action = qdr_action(qdr_link_deliver_CT, "link_deliver");
qdr_action_t *action = qdr_action(qdr_link_deliver_CT, "link_deliver_to_core");
qdr_delivery_t *dlv = new_qdr_delivery_t();

ZERO(dlv);
Expand All @@ -126,19 +125,16 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *
dlv->conn_id = link->conn_id;
sys_mutex_init(&dlv->dispo_lock);

qd_message_disable_router_annotations(msg); // routed links do not use router annotations
qd_message_disable_router_annotations(msg); // deliveries to the core do not use router annotations

qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, DLV_FMT " Delivery created qdr_link_deliver_to_routed_link",
qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, DLV_FMT " Delivery created qdr_link_deliver_to_core",
DLV_ARGS(dlv));

qdr_delivery_incref(dlv, "qdr_link_deliver_to_routed_link - newly created delivery, add to action list");
qdr_delivery_incref(dlv, "qdr_link_deliver_to_routed_link - protect returned value");
qdr_delivery_incref(dlv, "qdr_link_deliver_to_core - newly created delivery, add to action list");
qdr_delivery_incref(dlv, "qdr_link_deliver_to_core - protect returned value");

action->args.delivery.delivery = dlv;
action->args.delivery.more = !qd_message_receive_complete(msg);
action->args.delivery.tag_length = tag_length;
assert(tag_length <= QDR_DELIVERY_TAG_MAX);
memcpy(action->args.delivery.tag, tag, tag_length);
qdr_action_enqueue(link->core, action);
return dlv;
}
Expand Down Expand Up @@ -759,35 +755,6 @@ void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
return;
}

if (link->connected_link) {
//
// If this is an attach-routed link, put the delivery directly onto the peer link
//
qdr_delivery_t *peer = qdr_forward_new_delivery_CT(core, dlv, link->connected_link, dlv->msg);

//
// Copy the delivery tag. For link-routing, the delivery tag must be preserved.
//
peer->tag_length = action->args.delivery.tag_length;
memcpy(peer->tag, action->args.delivery.tag, peer->tag_length);

qdr_forward_deliver_CT(core, link->connected_link, peer);

if (!dlv->settled) {
DEQ_INSERT_TAIL(link->unsettled, dlv);
dlv->where = QDR_DELIVERY_IN_UNSETTLED;
qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG,
DLV_FMT " Delivery transfer: qdr_link_deliver_CT: action-list -> unsettled-list", DLV_ARGS(dlv));
} else {
//
// If the delivery is settled, decrement the ref_count on the delivery.
// This count was the owned-by-action count.
//
qdr_delivery_decref_CT(core, dlv, "qdr_link_deliver_CT - removed from action");
}
return;
}

//
// NOTE: The link->undelivered list does not need to be protected by the
// connection's work lock for incoming links. This protection is only
Expand Down

0 comments on commit d6ae6af

Please sign in to comment.