Skip to content

Commit

Permalink
Do not merge: bitcoin miner extension
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Dec 7, 2023
1 parent d705fff commit eb8889e
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 2 deletions.
57 changes: 55 additions & 2 deletions src/adaptors/tcp/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@
// Effective GigEthernet throughput is 116MB/sec, or ~121635 bytes/msec. For 3 hops routers with ~1msec latency gives a
// 6msec round trip time. Ideally the window would be 1/2 full at most before the ACK arrives:
//
const uint64_t TCP_MAX_CAPACITY = 121635 * 6 * 2; // 1,459,620
const uint64_t TCP_ACK_WINDOW = (TCP_MAX_CAPACITY / 4);
uint64_t TCP_MAX_CAPACITY = 121635 * 6 * 2; // 1,459,620
uint64_t TCP_ACK_WINDOW = (121635 * 6 * 2) / 4;

static qd_duration_t half_closed_idle_timeout = 0; // 0 = disabled

Expand Down Expand Up @@ -140,6 +140,13 @@ struct qdr_tcp_connection_t {
uint64_t bytes_since_last_ack;
uint64_t window_closed_count;

// KAG BEGIN
uint64_t kag_pn_receives;
uint64_t kag_outstanding;
uint64_t kag_updates;
uint64_t kag_total_unacks;
// KAG END

uint64_t opened_time;
uint64_t last_in_time;
uint64_t last_out_time;
Expand Down Expand Up @@ -612,6 +619,23 @@ static void clean_conn_out_buffs(qdr_tcp_connection_t *conn)

static void free_qdr_tcp_connection(qdr_tcp_connection_t *tc)
{
/// KAG:
qd_log(LOG_TCP_ADAPTOR, QD_LOG_INFO,
"[C%" PRIu64 "] WINDOW CONFIGURATION: max-capacity=%" PRIu64 " bytes, ack-window=%" PRIu64,
tc->conn_id, TCP_MAX_CAPACITY, TCP_ACK_WINDOW);

if (tc->kag_pn_receives) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_INFO,
"[C%" PRIu64 "] WINDOW SUMMARY: total closes=%" PRIu64 ", total rx acks=%" PRIu64 ", average outstanding=%.3Lf bytes",
tc->conn_id, tc->window_closed_count, tc->kag_pn_receives,
(long double) tc->kag_outstanding / (long double) tc->kag_pn_receives);
} else if (tc->kag_updates) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_INFO,
"[C%" PRIu64 "] WINDOW SUMMARY: total acks sent=%" PRIu64 ", average ack threshold=%.3Lf bytes",
tc->conn_id, tc->kag_updates,
(long double) tc->kag_total_unacks / (long double) tc->kag_updates);
}

qdr_tcp_stats_t *tcp_stats = get_tcp_stats(tc);
LOCK(&tcp_stats->stats_lock);
tcp_stats->connections_closed += 1;
Expand Down Expand Up @@ -1226,6 +1250,8 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
// TCP stream!) and use section_offset only.
//
if (conn->out_dlv_stream && (conn->bytes_since_last_ack >= TCP_ACK_WINDOW)) {
conn->kag_updates += 1; // KAG
conn->kag_total_unacks += conn->bytes_since_last_ack; // KAG
qd_delivery_state_t *dstate = qd_delivery_state();
dstate->section_number = 0;
dstate->section_offset = conn->bytes_out;
Expand Down Expand Up @@ -1301,6 +1327,7 @@ static qdr_tcp_connection_t *qdr_tcp_connection(qd_tcp_listener_t *listener, qd_
tcp_stats->connections_opened +=1;
UNLOCK(&tcp_stats->stats_lock);
DEQ_INIT(tc->out_buffs);

return tc;
}

Expand Down Expand Up @@ -2212,12 +2239,14 @@ static void qdr_tcp_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t

// resend released will generate a PN_RECEIVED with section_offset == 0, ignore it
if (dstate && dstate->section_offset > 0) {
tc->kag_pn_receives += 1; // KAG
// note: the PN_RECEIVED is generated by the remote TCP
// adaptor, for simplicity we ignore the section_number since
// all we really need is a byte offset:
//
vflow_set_uint64(tc->vflow, VFLOW_ATTRIBUTE_OCTETS_UNACKED, tc->bytes_unacked);
tc->bytes_unacked = tc->bytes_in - dstate->section_offset;
tc->kag_outstanding += tc->bytes_unacked; // KAG
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG,
"[C%" PRIu64 "] tc->bytes_in=%" PRIu64 ", dstate->section_offset=%" PRIu64
", tc->bytes_unacked=%" PRIu64 "",
Expand Down Expand Up @@ -2365,6 +2394,30 @@ static void qdr_tcp_adaptor_init(qdr_core_t *core, void **adaptor_context)
}
}

/// KAG BEGIN DEBUG
if (getenv("TCP_MAX_CAPACITY")) {
const char *tmc = getenv("TCP_MAX_CAPACITY");
int64_t max_cap;
if (sscanf(tmc, "%" SCNd64, &max_cap) != 1 || max_cap < 100) {
fprintf(stderr, "Invalid TCP_MAX_CAPACITY value: %s\n", tmc);
fflush(stderr);
exit(1);
}
TCP_MAX_CAPACITY = max_cap;
TCP_ACK_WINDOW = TCP_MAX_CAPACITY / 4;
}
if (getenv("TCP_ACK_WINDOW")) {
const char *tmc = getenv("TCP_ACK_WINDOW");
int64_t ack_win;
if (sscanf(tmc, "%" SCNd64, &ack_win) != 1 || ack_win < 0 || ack_win > TCP_MAX_CAPACITY) {
fprintf(stderr, "Invalid TCP_ACK_WINDOW value: %s\n", tmc);
fflush(stderr);
exit(1);
}
TCP_ACK_WINDOW = ack_win;
}
/// KAG END

tcp_adaptor = adaptor;
}

Expand Down
4 changes: 4 additions & 0 deletions src/router_core/delivery.c
Original file line number Diff line number Diff line change
Expand Up @@ -1345,6 +1345,10 @@ static bool qdr_delivery_set_remote_delivery_state_CT(qdr_delivery_t *dlv, uint6
{
// old state, has not been transferred to peer?
if (dlv->remote_state) {
// KAG
fprintf(stderr, "ERROR: OVERWRITING LAST DELIVERY UPDATE!\n");
fflush(stderr);
exit(-1);
qd_delivery_state_free(dlv->remote_state);
}
dlv->remote_state = remote_state;
Expand Down

0 comments on commit eb8889e

Please sign in to comment.