From a20e85921a6a4307ef94f877d01be4c047b778a4 Mon Sep 17 00:00:00 2001 From: Willem Thiart Date: Thu, 28 Jan 2016 21:00:47 +0200 Subject: [PATCH] Add appendentries message batching Performance improvement. Appendentries will now send many logs per message. --- include/raft_log.h | 4 +++- src/raft_log.c | 43 +++++++++++++++++++++++++++++++++++-------- src/raft_server.c | 23 ++++++++--------------- tests/test_log.c | 16 ++++++++-------- 4 files changed, 54 insertions(+), 32 deletions(-) diff --git a/include/raft_log.h b/include/raft_log.h index d8b435c5..51834993 100644 --- a/include/raft_log.h +++ b/include/raft_log.h @@ -33,7 +33,9 @@ void log_empty(log_t * me_); * @return oldest entry */ void *log_poll(log_t * me_); -raft_entry_t* log_get_from_idx(log_t* me_, int idx); +raft_entry_t* log_get_from_idx(log_t* me_, int idx, int *n_etys); + +raft_entry_t* log_get_at_idx(log_t* me_, int idx); /** * @return youngest entry */ diff --git a/src/raft_log.c b/src/raft_log.c index 6867f77b..decad38f 100644 --- a/src/raft_log.c +++ b/src/raft_log.c @@ -32,8 +32,8 @@ typedef struct /* position of the queue */ int front, back; - /* we compact the log, and thus need to increment the base idx */ - int base_log_idx; + /* we compact the log, and thus need to increment the Base Log Index */ + int base; raft_entry_t* entries; @@ -103,23 +103,50 @@ int log_append_entry(log_t* me_, raft_entry_t* c) return 0; } -raft_entry_t* log_get_from_idx(log_t* me_, int idx) +raft_entry_t* log_get_from_idx(log_t* me_, int idx, int *n_etys) { log_private_t* me = (log_private_t*)me_; int i; assert(0 <= idx - 1); - if (me->base_log_idx + me->count < idx || idx < me->base_log_idx) + if (me->base + me->count < idx || idx < me->base) return NULL; /* idx starts at 1 */ idx -= 1; - i = (me->front + idx - me->base_log_idx) % me->size; + i = (me->front + idx - me->base) % me->size; + + assert(i <= me->back); + + int logs_till_end_of_log = me->back - i; + + /* idx - me->front - me->base; */ + /* i = */ + + *n_etys = logs_till_end_of_log; return &me->entries[i]; } +raft_entry_t* log_get_at_idx(log_t* me_, int idx) +{ + log_private_t* me = (log_private_t*)me_; + int i; + + assert(0 <= idx - 1); + + if (me->base + me->count < idx || idx < me->base) + return NULL; + + /* idx starts at 1 */ + idx -= 1; + + i = (me->front + idx - me->base) % me->size; + return &me->entries[i]; + +} + int log_count(log_t* me_) { return ((log_private_t*)me_)->count; @@ -132,7 +159,7 @@ void log_delete(log_t* me_, int idx) /* idx starts at 1 */ idx -= 1; - idx -= me->base_log_idx; + idx -= me->base; for (end = log_count(me_); idx < end; idx++) { @@ -157,7 +184,7 @@ void *log_poll(log_t * me_) &me->entries[me->front], me->front); me->front++; me->count--; - me->base_log_idx++; + me->base++; return (void*)elem; } @@ -194,5 +221,5 @@ void log_free(log_t * me_) int log_get_current_idx(log_t* me_) { log_private_t* me = (log_private_t*)me_; - return log_count(me_) + me->base_log_idx; + return log_count(me_) + me->base; } diff --git a/src/raft_server.c b/src/raft_server.c index 5577fd1f..6e5bcd0e 100644 --- a/src/raft_server.c +++ b/src/raft_server.c @@ -159,7 +159,7 @@ int raft_periodic(raft_server_t* me_, int msec_since_last_period) raft_entry_t* raft_get_entry_from_idx(raft_server_t* me_, int etyidx) { raft_server_private_t* me = (raft_server_private_t*)me_; - return log_get_from_idx(me->log, etyidx); + return log_get_at_idx(me->log, etyidx); } int raft_recv_appendentries_response(raft_server_t* me_, @@ -625,6 +625,12 @@ int raft_apply_entry(raft_server_t* me_) return 0; } +raft_entry_t* raft_get_entries_from_idx(raft_server_t* me_, int idx, int* n_etys) +{ + raft_server_private_t* me = (raft_server_private_t*)me_; + return log_get_from_idx(me->log, idx, n_etys); +} + int raft_send_appendentries(raft_server_t* me_, raft_node_t* node) { raft_server_private_t* me = (raft_server_private_t*)me_; @@ -645,20 +651,7 @@ int raft_send_appendentries(raft_server_t* me_, raft_node_t* node) int next_idx = raft_node_get_next_idx(node); - msg_entry_t mety; - - raft_entry_t* ety = raft_get_entry_from_idx(me_, next_idx); - if (ety) - { - mety.term = ety->term; - mety.id = ety->id; - mety.type = ety->type; - mety.data.len = ety->data.len; - mety.data.buf = ety->data.buf; - ae.entries = &mety; - // TODO: we want to send more than 1 at a time - ae.n_entries = 1; - } + ae.entries = raft_get_entries_from_idx(me_, next_idx, &ae.n_entries); /* previous log is the log just before the new logs */ if (1 < next_idx) diff --git a/tests/test_log.c b/tests/test_log.c index 6f9bd2fe..3a5c342a 100644 --- a/tests/test_log.c +++ b/tests/test_log.c @@ -47,7 +47,7 @@ void TestLog_get_at_idx(CuTest * tc) CuAssertTrue(tc, 3 == log_count(l)); CuAssertTrue(tc, 3 == log_count(l)); - CuAssertTrue(tc, e2.id == log_get_from_idx(l, 2)->id); + CuAssertTrue(tc, e2.id == log_get_at_idx(l, 2)->id); } void TestLog_get_at_idx_returns_null_where_out_of_bounds(CuTest * tc) @@ -58,7 +58,7 @@ void TestLog_get_at_idx_returns_null_where_out_of_bounds(CuTest * tc) l = log_new(); e1.id = 1; CuAssertTrue(tc, 0 == log_append_entry(l, &e1)); - CuAssertTrue(tc, NULL == log_get_from_idx(l, 2)); + CuAssertTrue(tc, NULL == log_get_at_idx(l, 2)); } static int __log_pop( @@ -99,13 +99,13 @@ void TestLog_delete(CuTest * tc) CuAssertTrue(tc, ((raft_entry_t*)llqueue_poll(queue))->id == e3.id); CuAssertTrue(tc, 2 == log_count(l)); - CuAssertTrue(tc, NULL == log_get_from_idx(l, 3)); + CuAssertTrue(tc, NULL == log_get_at_idx(l, 3)); log_delete(l, 2); CuAssertTrue(tc, 1 == log_count(l)); - CuAssertTrue(tc, NULL == log_get_from_idx(l, 2)); + CuAssertTrue(tc, NULL == log_get_at_idx(l, 2)); log_delete(l, 1); CuAssertTrue(tc, 0 == log_count(l)); - CuAssertTrue(tc, NULL == log_get_from_idx(l, 1)); + CuAssertTrue(tc, NULL == log_get_at_idx(l, 1)); } void TestLog_delete_onwards(CuTest * tc) @@ -125,9 +125,9 @@ void TestLog_delete_onwards(CuTest * tc) /* even 3 gets deleted */ log_delete(l, 2); CuAssertTrue(tc, 1 == log_count(l)); - CuAssertTrue(tc, e1.id == log_get_from_idx(l, 1)->id); - CuAssertTrue(tc, NULL == log_get_from_idx(l, 2)); - CuAssertTrue(tc, NULL == log_get_from_idx(l, 3)); + CuAssertTrue(tc, e1.id == log_get_at_idx(l, 1)->id); + CuAssertTrue(tc, NULL == log_get_at_idx(l, 2)); + CuAssertTrue(tc, NULL == log_get_at_idx(l, 3)); } void TestLog_peektail(CuTest * tc)