From 7aa8077e412c23b211ca175649fe004ea9b46722 Mon Sep 17 00:00:00 2001 From: Ken Giusti Date: Mon, 16 Oct 2023 10:35:23 -0400 Subject: [PATCH] Fixes #1244: close tcp connection on failed outcome (#1245) --- src/adaptors/tcp_lite/tcp_lite.c | 37 +++++++++++++++----------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/src/adaptors/tcp_lite/tcp_lite.c b/src/adaptors/tcp_lite/tcp_lite.c index e60aba60f..de5726c67 100644 --- a/src/adaptors/tcp_lite/tcp_lite.c +++ b/src/adaptors/tcp_lite/tcp_lite.c @@ -1140,9 +1140,8 @@ static bool manage_flow_XSIDE_IO(tcplite_connection_t *conn) if (qd_message_receive_complete(conn->outbound_stream) && !qd_message_can_consume_buffers(conn->outbound_stream)) { qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Rx-complete, rings empty: Write-closing the raw connection", conn->common.conn_id); pn_raw_connection_write_close(conn->raw_conn); - qdr_delivery_set_disposition(conn->outbound_delivery, PN_ACCEPTED); qdr_delivery_set_context(conn->outbound_delivery, 0); - qdr_delivery_decref(tcplite_context->core, conn->outbound_delivery, "manage_flow_XSIDE_IO - release outbound"); + qdr_delivery_remote_state_updated(tcplite_context->core, conn->outbound_delivery, PN_ACCEPTED, true, 0, true); // accepted, settled, ref_given conn->outbound_delivery = 0; conn->outbound_stream = 0; } @@ -1275,22 +1274,6 @@ static void connection_run_CSIDE_IO(tcplite_connection_t *conn) static void connection_run_XSIDE_IO(tcplite_connection_t *conn) { ASSERT_RAW_IO; - // - // If the inbound stream has a non-zero disposition, there's been an abnormal event on - // the outbound side of that stream. Close the raw connection and close out the inbound stuff. - // - if (conn->inbound_disposition != 0) { - if (!!conn->inbound_delivery && !!conn->inbound_stream) { - qd_message_set_send_complete(conn->inbound_stream); - qdr_delivery_continue(tcplite_context->core, conn->inbound_delivery, true); - qdr_delivery_decref(tcplite_context->core, conn->inbound_delivery, "TCPLITE Closing inbound delivery on error"); - conn->inbound_delivery = 0; - conn->inbound_stream = 0; - } - close_raw_connection_XSIDE_IO(conn); - return; - } - if (conn->listener_side) { connection_run_LSIDE_IO(conn); } else { @@ -1551,13 +1534,27 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di tcplite_connection_t *conn = (tcplite_connection_t*) common; if (dlv == conn->outbound_delivery) { - qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Outbound delivery update - disposition: %s", conn->common.conn_id, pn_disposition_type_name(disp)); + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " Outbound delivery update - disposition: %s", DLV_ARGS(dlv), pn_disposition_type_name(disp)); conn->outbound_disposition = disp; need_wake = true; } else if (dlv == conn->inbound_delivery) { - qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Inbound delivery update - disposition: %s", conn->common.conn_id, pn_disposition_type_name(disp)); + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " Inbound delivery update - disposition: %s", DLV_ARGS(dlv), pn_disposition_type_name(disp)); conn->inbound_disposition = disp; need_wake = true; + if (qd_delivery_state_is_terminal(disp) && disp != PN_ACCEPTED) { + // The delivery failed - this is unrecoverable. + if (!!conn->raw_conn) { + // set the raw connection condition info so it will appear in the vanflow logs + // when the connection disconnects + pn_condition_t *cond = pn_raw_connection_condition(conn->raw_conn); + if (!!cond) { + (void) pn_condition_set_name(cond, "delivery-failed"); + (void) pn_condition_set_description(cond, "destination unreachable"); + } + pn_raw_connection_close(conn->raw_conn); + // clean stuff up when DISCONNECT event arrives + } + } } if (need_wake) {