From eb8889e8da5433f7662d95c73e7f8dfc46582ca5 Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Thu, 7 Dec 2023 14:19:20 -0500 Subject: [PATCH] Do not merge: bitcoin miner extension --- src/adaptors/tcp/tcp_adaptor.c | 57 ++++++++++++++++++++++++++++++++-- src/router_core/delivery.c | 4 +++ 2 files changed, 59 insertions(+), 2 deletions(-) diff --git a/src/adaptors/tcp/tcp_adaptor.c b/src/adaptors/tcp/tcp_adaptor.c index 125a12319..524957859 100644 --- a/src/adaptors/tcp/tcp_adaptor.c +++ b/src/adaptors/tcp/tcp_adaptor.c @@ -67,8 +67,8 @@ // Effective GigEthernet throughput is 116MB/sec, or ~121635 bytes/msec. For 3 hops routers with ~1msec latency gives a // 6msec round trip time. Ideally the window would be 1/2 full at most before the ACK arrives: // -const uint64_t TCP_MAX_CAPACITY = 121635 * 6 * 2; // 1,459,620 -const uint64_t TCP_ACK_WINDOW = (TCP_MAX_CAPACITY / 4); +uint64_t TCP_MAX_CAPACITY = 121635 * 6 * 2; // 1,459,620 +uint64_t TCP_ACK_WINDOW = (121635 * 6 * 2) / 4; static qd_duration_t half_closed_idle_timeout = 0; // 0 = disabled @@ -140,6 +140,13 @@ struct qdr_tcp_connection_t { uint64_t bytes_since_last_ack; uint64_t window_closed_count; + // KAG BEGIN + uint64_t kag_pn_receives; + uint64_t kag_outstanding; + uint64_t kag_updates; + uint64_t kag_total_unacks; + // KAG END + uint64_t opened_time; uint64_t last_in_time; uint64_t last_out_time; @@ -612,6 +619,23 @@ static void clean_conn_out_buffs(qdr_tcp_connection_t *conn) static void free_qdr_tcp_connection(qdr_tcp_connection_t *tc) { + /// KAG: + qd_log(LOG_TCP_ADAPTOR, QD_LOG_INFO, + "[C%" PRIu64 "] WINDOW CONFIGURATION: max-capacity=%" PRIu64 " bytes, ack-window=%" PRIu64, + tc->conn_id, TCP_MAX_CAPACITY, TCP_ACK_WINDOW); + + if (tc->kag_pn_receives) { + qd_log(LOG_TCP_ADAPTOR, QD_LOG_INFO, + "[C%" PRIu64 "] WINDOW SUMMARY: total closes=%" PRIu64 ", total rx acks=%" PRIu64 ", average outstanding=%.3Lf bytes", + tc->conn_id, tc->window_closed_count, tc->kag_pn_receives, + (long double) tc->kag_outstanding / (long double) tc->kag_pn_receives); + } else if (tc->kag_updates) { + qd_log(LOG_TCP_ADAPTOR, QD_LOG_INFO, + "[C%" PRIu64 "] WINDOW SUMMARY: total acks sent=%" PRIu64 ", average ack threshold=%.3Lf bytes", + tc->conn_id, tc->kag_updates, + (long double) tc->kag_total_unacks / (long double) tc->kag_updates); + } + qdr_tcp_stats_t *tcp_stats = get_tcp_stats(tc); LOCK(&tcp_stats->stats_lock); tcp_stats->connections_closed += 1; @@ -1226,6 +1250,8 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void // TCP stream!) and use section_offset only. // if (conn->out_dlv_stream && (conn->bytes_since_last_ack >= TCP_ACK_WINDOW)) { + conn->kag_updates += 1; // KAG + conn->kag_total_unacks += conn->bytes_since_last_ack; // KAG qd_delivery_state_t *dstate = qd_delivery_state(); dstate->section_number = 0; dstate->section_offset = conn->bytes_out; @@ -1301,6 +1327,7 @@ static qdr_tcp_connection_t *qdr_tcp_connection(qd_tcp_listener_t *listener, qd_ tcp_stats->connections_opened +=1; UNLOCK(&tcp_stats->stats_lock); DEQ_INIT(tc->out_buffs); + return tc; } @@ -2212,12 +2239,14 @@ static void qdr_tcp_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t // resend released will generate a PN_RECEIVED with section_offset == 0, ignore it if (dstate && dstate->section_offset > 0) { + tc->kag_pn_receives += 1; // KAG // note: the PN_RECEIVED is generated by the remote TCP // adaptor, for simplicity we ignore the section_number since // all we really need is a byte offset: // vflow_set_uint64(tc->vflow, VFLOW_ATTRIBUTE_OCTETS_UNACKED, tc->bytes_unacked); tc->bytes_unacked = tc->bytes_in - dstate->section_offset; + tc->kag_outstanding += tc->bytes_unacked; // KAG qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] tc->bytes_in=%" PRIu64 ", dstate->section_offset=%" PRIu64 ", tc->bytes_unacked=%" PRIu64 "", @@ -2365,6 +2394,30 @@ static void qdr_tcp_adaptor_init(qdr_core_t *core, void **adaptor_context) } } + /// KAG BEGIN DEBUG + if (getenv("TCP_MAX_CAPACITY")) { + const char *tmc = getenv("TCP_MAX_CAPACITY"); + int64_t max_cap; + if (sscanf(tmc, "%" SCNd64, &max_cap) != 1 || max_cap < 100) { + fprintf(stderr, "Invalid TCP_MAX_CAPACITY value: %s\n", tmc); + fflush(stderr); + exit(1); + } + TCP_MAX_CAPACITY = max_cap; + TCP_ACK_WINDOW = TCP_MAX_CAPACITY / 4; + } + if (getenv("TCP_ACK_WINDOW")) { + const char *tmc = getenv("TCP_ACK_WINDOW"); + int64_t ack_win; + if (sscanf(tmc, "%" SCNd64, &ack_win) != 1 || ack_win < 0 || ack_win > TCP_MAX_CAPACITY) { + fprintf(stderr, "Invalid TCP_ACK_WINDOW value: %s\n", tmc); + fflush(stderr); + exit(1); + } + TCP_ACK_WINDOW = ack_win; + } + /// KAG END + tcp_adaptor = adaptor; } diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c index b07dda2c9..08b8d9b5d 100644 --- a/src/router_core/delivery.c +++ b/src/router_core/delivery.c @@ -1345,6 +1345,10 @@ static bool qdr_delivery_set_remote_delivery_state_CT(qdr_delivery_t *dlv, uint6 { // old state, has not been transferred to peer? if (dlv->remote_state) { + // KAG + fprintf(stderr, "ERROR: OVERWRITING LAST DELIVERY UPDATE!\n"); + fflush(stderr); + exit(-1); qd_delivery_state_free(dlv->remote_state); } dlv->remote_state = remote_state;