Skip to content

Commit

Permalink
Fixes skupperproject#1675: Remove unused link-route code (skupperproj…
Browse files Browse the repository at this point in the history
…ect#1676)

* Fixes skupperproject#1675: Remove unused link-route code

This patch retains the parts of the link routing code that was
leveraged by the core link endpoint feature. This retained code has
been renamed to identify its use by core endpoints.

* fixup: remove old link peer mgmt attribute

* fixup: clarify special case for core endpoint links
  • Loading branch information
kgiusti authored Nov 25, 2024
1 parent b46f834 commit 171c488
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 205 deletions.
16 changes: 8 additions & 8 deletions include/qpid/dispatch/protocol_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -720,14 +720,15 @@ const char *qdr_link_internal_address(const qdr_link_t *link);
bool qdr_link_is_anonymous(const qdr_link_t *link);

/**
* qdr_link_is_routed
* qdr_link_is_core_endpoint
*
* Indicate whether the link is link-routed.
* Indicate whether the link is terminated in the router core. These links are used by the core to send and receive
* messages.
*
* @param link Link object
* @return True if the link is link-routed.
* @return True if the link is terminated in the core
*/
bool qdr_link_is_routed(const qdr_link_t *link);
bool qdr_link_is_core_endpoint(const qdr_link_t *link);

/**
* qdr_link_strip_annotations_in
Expand Down Expand Up @@ -851,10 +852,9 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
bool settled, qd_bitmask_t *link_exclusion, int ingress_index,
uint64_t remote_disposition,
qd_delivery_state_t *remote_state);
qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg, bool settled,
const uint8_t *tag, int tag_length,
uint64_t remote_disposition,
qd_delivery_state_t *remote_state);
qdr_delivery_t *qdr_link_deliver_to_core(qdr_link_t *link, qd_message_t *msg, bool settled,
uint64_t remote_disposition,
qd_delivery_state_t *remote_state);

/**
* qdr_link_process_deliveries
Expand Down
4 changes: 0 additions & 4 deletions python/skupper_router/management/skrouter.json
Original file line number Diff line number Diff line change
Expand Up @@ -1422,10 +1422,6 @@
"type": "integer",
"description": "The capacity, in deliveries, for the link. The number of undelivered plus unsettled deliveries shall not exceed the capacity. This is enforced by link flow control."
},
"peer": {
"type": "string",
"description": "Identifier of the paired link if this is an attach-routed link."
},
"undeliveredCount": {
"type": "integer",
"graph": true,
Expand Down
35 changes: 10 additions & 25 deletions src/adaptors/amqp/amqp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -770,35 +770,20 @@ static bool AMQP_rx_handler(qd_router_t *router, qd_link_t *link)
break;
}

// Handle the link-routed case
// Intercept deliveries destined to a core endpoint and hand them directly to the router core. Incoming links to a
// core endpoint address are "attach routed". This means the incoming link is bound directly to the core endpoint
// consumer when the first attach arrives. Afterwards deliveries arriving on this link are passed directly to the
// core endpoint rather than going through the delivery forwarding logic.
//
if (qdr_link_is_routed(rlink)) {
pn_delivery_tag_t dtag = pn_delivery_tag(pnd);

if (dtag.size > QDR_DELIVERY_TAG_MAX) {
qd_log(LOG_ROUTER, QD_LOG_DEBUG, "link route delivery failure: msg tag size exceeded %zd (max=%d)",
dtag.size, QDR_DELIVERY_TAG_MAX);
qd_message_set_discard(msg, true);
pn_link_flow(pn_link, 1);
_reject_delivery(pnd, QD_AMQP_COND_INVALID_FIELD, "delivery tag length exceeded");
if (receive_complete) {
pn_delivery_settle(pnd);
qd_message_free(msg);
}
return next_delivery;
}

if (qdr_link_is_core_endpoint(rlink)) {
log_link_message(conn, pn_link, msg);
delivery = qdr_link_deliver_to_routed_link(rlink,
msg,
pn_delivery_settled(pnd),
(uint8_t*) dtag.start,
dtag.size,
pn_delivery_remote_state(pnd),
qd_delivery_read_remote_state(pnd));
delivery = qdr_link_deliver_to_core(rlink, msg,
pn_delivery_settled(pnd),
pn_delivery_remote_state(pnd),
qd_delivery_read_remote_state(pnd));
qd_link_set_incoming_msg(link, (qd_message_t*) 0); // msg no longer exclusive to qd_link
qdr_node_connect_deliveries(link, delivery, pnd);
qdr_delivery_decref(router->router_core, delivery, "release protection of return from deliver_to_routed_link");
qdr_delivery_decref(router->router_core, delivery, "release protection of return from deliver_to_core");
return next_delivery;
}

Expand Down
55 changes: 21 additions & 34 deletions src/router_core/agent_link.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,27 @@
#define QDR_LINK_LINK_DIR 5
#define QDR_LINK_OWNING_ADDR 6
#define QDR_LINK_CAPACITY 7
#define QDR_LINK_PEER 8
#define QDR_LINK_UNDELIVERED_COUNT 9
#define QDR_LINK_UNSETTLED_COUNT 10
#define QDR_LINK_DELIVERY_COUNT 11
#define QDR_LINK_CONNECTION_ID 12
#define QDR_LINK_ADMIN_STATE 13
#define QDR_LINK_OPER_STATE 14
#define QDR_LINK_PRESETTLED_COUNT 15
#define QDR_LINK_DROPPED_PRESETTLED_COUNT 16
#define QDR_LINK_ACCEPTED_COUNT 17
#define QDR_LINK_REJECTED_COUNT 18
#define QDR_LINK_RELEASED_COUNT 19
#define QDR_LINK_MODIFIED_COUNT 20
#define QDR_LINK_DELAYED_1SEC 21
#define QDR_LINK_DELAYED_10SEC 22
#define QDR_LINK_DELIVERIES_STUCK 23
#define QDR_LINK_OPEN_MOVED_STREAMS 24
#define QDR_LINK_INGRESS_HISTOGRAM 25
#define QDR_LINK_PRIORITY 26
#define QDR_LINK_SETTLE_RATE 27
#define QDR_LINK_CREDIT_AVAILABLE 28
#define QDR_LINK_ZERO_CREDIT_SECONDS 29
#define QDR_LINK_UNDELIVERED_COUNT 8
#define QDR_LINK_UNSETTLED_COUNT 9
#define QDR_LINK_DELIVERY_COUNT 10
#define QDR_LINK_CONNECTION_ID 11
#define QDR_LINK_ADMIN_STATE 12
#define QDR_LINK_OPER_STATE 13
#define QDR_LINK_PRESETTLED_COUNT 14
#define QDR_LINK_DROPPED_PRESETTLED_COUNT 15
#define QDR_LINK_ACCEPTED_COUNT 16
#define QDR_LINK_REJECTED_COUNT 17
#define QDR_LINK_RELEASED_COUNT 18
#define QDR_LINK_MODIFIED_COUNT 19
#define QDR_LINK_DELAYED_1SEC 20
#define QDR_LINK_DELAYED_10SEC 21
#define QDR_LINK_DELIVERIES_STUCK 22
#define QDR_LINK_OPEN_MOVED_STREAMS 23
#define QDR_LINK_INGRESS_HISTOGRAM 24
#define QDR_LINK_PRIORITY 25
#define QDR_LINK_SETTLE_RATE 26
#define QDR_LINK_CREDIT_AVAILABLE 27
#define QDR_LINK_ZERO_CREDIT_SECONDS 28

const char *qdr_link_columns[] =
{"name",
Expand All @@ -62,7 +61,6 @@ const char *qdr_link_columns[] =
"linkDir",
"owningAddr",
"capacity",
"peer",
"undeliveredCount",
"unsettledCount",
"deliveryCount",
Expand Down Expand Up @@ -147,8 +145,6 @@ static void qdr_agent_write_column_CT(qdr_core_t *core, qd_composed_field_t *bod
case QDR_LINK_OWNING_ADDR:
if(link->owning_addr)
qd_compose_insert_string(body, address_key(link->owning_addr));
else if (link->connected_link && link->connected_link->terminus_addr)
qd_compose_insert_string(body, link->connected_link->terminus_addr);
else if (link->terminus_addr)
qd_compose_insert_string(body, link->terminus_addr);
else
Expand All @@ -159,15 +155,6 @@ static void qdr_agent_write_column_CT(qdr_core_t *core, qd_composed_field_t *bod
qd_compose_insert_uint(body, link->capacity);
break;

case QDR_LINK_PEER:
if (link->connected_link) {
char id[100];
snprintf(id, 100, "%"PRId64, link->connected_link->identity);
qd_compose_insert_string(body, id);
} else
qd_compose_insert_null(body);
break;

case QDR_LINK_UNDELIVERED_COUNT:
qd_compose_insert_ulong(body, DEQ_SIZE(link->undelivered));
break;
Expand Down
2 changes: 1 addition & 1 deletion src/router_core/agent_link.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ void qdra_link_update_CT(qdr_core_t *core,
qdr_query_t *query,
qd_parsed_field_t *in_body);

#define QDR_LINK_COLUMN_COUNT 30
#define QDR_LINK_COLUMN_COUNT 29

extern const char *qdr_link_columns[QDR_LINK_COLUMN_COUNT + 1];

Expand Down
73 changes: 2 additions & 71 deletions src/router_core/connections.c
Original file line number Diff line number Diff line change
Expand Up @@ -611,9 +611,9 @@ bool qdr_link_is_anonymous(const qdr_link_t *link)
}


bool qdr_link_is_routed(const qdr_link_t *link)
bool qdr_link_is_core_endpoint(const qdr_link_t *link)
{
return link->connected_link != 0 || link->core_endpoint != 0;
return link->core_endpoint != 0;
}


Expand Down Expand Up @@ -1061,23 +1061,6 @@ void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *conn, qd
}


static void qdr_link_abort_undelivered_CT(qdr_core_t *core, qdr_link_t *link)
{
assert(link->link_direction == QD_OUTGOING);

qdr_connection_t *conn = link->conn;

sys_mutex_lock(&conn->work_lock);
qdr_delivery_t *dlv = DEQ_HEAD(link->undelivered);
while (dlv) {
if (!qdr_delivery_receive_complete(dlv))
qdr_delivery_set_aborted(dlv);
dlv = DEQ_NEXT(dlv);
}
sys_mutex_unlock(&conn->work_lock);
}


static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link, const char *log_text)
{
//
Expand All @@ -1093,14 +1076,6 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
if (link->core_endpoint)
qdrc_endpoint_do_cleanup_CT(core, link->core_endpoint);

//
// If the link has a connected peer, unlink the peer
//
if (link->connected_link) {
link->connected_link->connected_link = 0;
link->connected_link = 0;
}

//
// If this link is involved in inter-router communication, remove its reference
// from the core mask-bit tables
Expand Down Expand Up @@ -2240,22 +2215,6 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac
return;
}

//
// Handle attach-routed links
//
if (link->connected_link) {
qdr_terminus_t *remote_terminus = link->link_direction == QD_OUTGOING ? target : source;
if (link->strip_prefix) {
qdr_terminus_strip_address_prefix(remote_terminus, link->strip_prefix);
}
if (link->insert_prefix) {
qdr_terminus_insert_address_prefix(remote_terminus, link->insert_prefix);
}

qdr_link_outbound_second_attach_CT(core, link->connected_link, source, target);
return;
}

if (link->link_direction == QD_INCOMING) {
//
// Handle incoming link cases
Expand Down Expand Up @@ -2362,34 +2321,6 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
}
}

//
// For routed links, propagate the detach
//
if (link->connected_link) {
//
// If the connected link is outgoing and there is a delivery on the connected link's undelivered
// list that is not receive-complete, we must flag that delivery as aborted or it will forever
// block the propagation of the detach.
//
if (link->connected_link->link_direction == QD_OUTGOING)
qdr_link_abort_undelivered_CT(core, link->connected_link);

if (dt != QD_LOST)
qdr_link_outbound_detach_CT(core, link->connected_link, error, QDR_CONDITION_NONE, dt == QD_CLOSED);
else {
qdr_link_outbound_detach_CT(core, link->connected_link, 0, QDR_CONDITION_ROUTED_LINK_LOST, !link->terminus_survives_disconnect);
qdr_error_free(error);
}

//
// If the link is completely detached, release its resources
//
if (link->detach_send_done)
qdr_link_cleanup_protected_CT(core, conn, link, "Link detached");

return;
}

//
// For auto links, switch the auto link to failed state and record the error
//
Expand Down
7 changes: 3 additions & 4 deletions src/router_core/delivery.c
Original file line number Diff line number Diff line change
Expand Up @@ -346,12 +346,11 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv) TA_NO_THREAD
}

//
// If this is an incoming link and it is not link-routed or inter-router, issue
// one replacement credit on the link. Note that credit on inter-router links is
// issued immediately even for unsettled deliveries.
// If this is an incoming link and it is not inter-router or inter-edge, issue one replacement credit on the link.
// Note that credit on inter-router links is issued immediately even for unsettled deliveries.
//
if (moved && link->link_direction == QD_INCOMING &&
link->link_type != QD_LINK_ROUTER && !link->edge && !link->connected_link)
link->link_type != QD_LINK_ROUTER && !link->edge)
qdr_link_issue_credit_CT(core, link, 1, false);

return moved;
Expand Down
14 changes: 0 additions & 14 deletions src/router_core/forwarder.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,20 +317,6 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery
out_dlv->link_work = qdr_link_work_getref(work);
sys_mutex_unlock(&out_link->conn->work_lock);

//
// We are dealing here only with link routed deliveries
// If the out_link has a connected link and if the out_link is an inter-router link, increment the global deliveries_transit
// If the out_link is a route container link, add to the global deliveries_egress
//
if (out_link->connected_link) {
if (out_link->conn->role == QDR_ROLE_INTER_ROUTER) {
core->deliveries_transit++;
}
else {
core->deliveries_egress++;
}
}

//
// Activate the outgoing connection for later processing.
//
Expand Down
3 changes: 0 additions & 3 deletions src/router_core/router_core_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ struct qdr_action_t {
qdr_delivery_t *delivery;
qd_delivery_state_t *dstate;
uint64_t disposition;
uint8_t tag[32];
int tag_length;
bool settled;
bool presettled; // true if remote settles while msg is in flight
bool more; // true if there are more frames arriving, false otherwise
Expand Down Expand Up @@ -450,7 +448,6 @@ struct qdr_link_t {
int detach_count; ///< 0, 1, or 2 depending on the state of the lifecycle
uint32_t open_moved_streams; ///< Number of still-open streaming deliveries that were moved from this link
qdr_address_t *owning_addr; ///< [ref] Address record that owns this link
qdr_link_t *connected_link; ///< [ref] If this is a link-route, reference the connected link
qdrc_endpoint_t *core_endpoint; ///< [ref] Set if this link terminates on an in-core endpoint
qdr_link_ref_t *ref[QDR_LINK_LIST_CLASSES]; ///< Pointers to containing reference objects
qdr_auto_link_t *auto_link; ///< [ref] Auto_link that owns this link
Expand Down
Loading

0 comments on commit 171c488

Please sign in to comment.