Skip to content

Commit

Permalink
Add appendentries message batching
Browse files Browse the repository at this point in the history
Performance improvement.
Appendentries will now send many logs per message.
  • Loading branch information
willemt committed Jan 28, 2016
1 parent b8e1fd6 commit a20e859
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 32 deletions.
4 changes: 3 additions & 1 deletion include/raft_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ void log_empty(log_t * me_);
* @return oldest entry */
void *log_poll(log_t * me_);

raft_entry_t* log_get_from_idx(log_t* me_, int idx);
raft_entry_t* log_get_from_idx(log_t* me_, int idx, int *n_etys);

raft_entry_t* log_get_at_idx(log_t* me_, int idx);

/**
* @return youngest entry */
Expand Down
43 changes: 35 additions & 8 deletions src/raft_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ typedef struct
/* position of the queue */
int front, back;

/* we compact the log, and thus need to increment the base idx */
int base_log_idx;
/* we compact the log, and thus need to increment the Base Log Index */
int base;

raft_entry_t* entries;

Expand Down Expand Up @@ -103,23 +103,50 @@ int log_append_entry(log_t* me_, raft_entry_t* c)
return 0;
}

raft_entry_t* log_get_from_idx(log_t* me_, int idx)
raft_entry_t* log_get_from_idx(log_t* me_, int idx, int *n_etys)
{
log_private_t* me = (log_private_t*)me_;
int i;

assert(0 <= idx - 1);

if (me->base_log_idx + me->count < idx || idx < me->base_log_idx)
if (me->base + me->count < idx || idx < me->base)
return NULL;

/* idx starts at 1 */
idx -= 1;

i = (me->front + idx - me->base_log_idx) % me->size;
i = (me->front + idx - me->base) % me->size;

assert(i <= me->back);

int logs_till_end_of_log = me->back - i;

/* idx - me->front - me->base; */
/* i = */

*n_etys = logs_till_end_of_log;
return &me->entries[i];
}

raft_entry_t* log_get_at_idx(log_t* me_, int idx)
{
log_private_t* me = (log_private_t*)me_;
int i;

assert(0 <= idx - 1);

if (me->base + me->count < idx || idx < me->base)
return NULL;

/* idx starts at 1 */
idx -= 1;

i = (me->front + idx - me->base) % me->size;
return &me->entries[i];

}

int log_count(log_t* me_)
{
return ((log_private_t*)me_)->count;
Expand All @@ -132,7 +159,7 @@ void log_delete(log_t* me_, int idx)

/* idx starts at 1 */
idx -= 1;
idx -= me->base_log_idx;
idx -= me->base;

for (end = log_count(me_); idx < end; idx++)
{
Expand All @@ -157,7 +184,7 @@ void *log_poll(log_t * me_)
&me->entries[me->front], me->front);
me->front++;
me->count--;
me->base_log_idx++;
me->base++;
return (void*)elem;
}

Expand Down Expand Up @@ -194,5 +221,5 @@ void log_free(log_t * me_)
int log_get_current_idx(log_t* me_)
{
log_private_t* me = (log_private_t*)me_;
return log_count(me_) + me->base_log_idx;
return log_count(me_) + me->base;
}
23 changes: 8 additions & 15 deletions src/raft_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ int raft_periodic(raft_server_t* me_, int msec_since_last_period)
raft_entry_t* raft_get_entry_from_idx(raft_server_t* me_, int etyidx)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
return log_get_from_idx(me->log, etyidx);
return log_get_at_idx(me->log, etyidx);
}

int raft_recv_appendentries_response(raft_server_t* me_,
Expand Down Expand Up @@ -625,6 +625,12 @@ int raft_apply_entry(raft_server_t* me_)
return 0;
}

raft_entry_t* raft_get_entries_from_idx(raft_server_t* me_, int idx, int* n_etys)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
return log_get_from_idx(me->log, idx, n_etys);
}

int raft_send_appendentries(raft_server_t* me_, raft_node_t* node)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
Expand All @@ -645,20 +651,7 @@ int raft_send_appendentries(raft_server_t* me_, raft_node_t* node)

int next_idx = raft_node_get_next_idx(node);

msg_entry_t mety;

raft_entry_t* ety = raft_get_entry_from_idx(me_, next_idx);
if (ety)
{
mety.term = ety->term;
mety.id = ety->id;
mety.type = ety->type;
mety.data.len = ety->data.len;
mety.data.buf = ety->data.buf;
ae.entries = &mety;
// TODO: we want to send more than 1 at a time
ae.n_entries = 1;
}
ae.entries = raft_get_entries_from_idx(me_, next_idx, &ae.n_entries);

/* previous log is the log just before the new logs */
if (1 < next_idx)
Expand Down
16 changes: 8 additions & 8 deletions tests/test_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void TestLog_get_at_idx(CuTest * tc)
CuAssertTrue(tc, 3 == log_count(l));

CuAssertTrue(tc, 3 == log_count(l));
CuAssertTrue(tc, e2.id == log_get_from_idx(l, 2)->id);
CuAssertTrue(tc, e2.id == log_get_at_idx(l, 2)->id);
}

void TestLog_get_at_idx_returns_null_where_out_of_bounds(CuTest * tc)
Expand All @@ -58,7 +58,7 @@ void TestLog_get_at_idx_returns_null_where_out_of_bounds(CuTest * tc)
l = log_new();
e1.id = 1;
CuAssertTrue(tc, 0 == log_append_entry(l, &e1));
CuAssertTrue(tc, NULL == log_get_from_idx(l, 2));
CuAssertTrue(tc, NULL == log_get_at_idx(l, 2));
}

static int __log_pop(
Expand Down Expand Up @@ -99,13 +99,13 @@ void TestLog_delete(CuTest * tc)
CuAssertTrue(tc, ((raft_entry_t*)llqueue_poll(queue))->id == e3.id);

CuAssertTrue(tc, 2 == log_count(l));
CuAssertTrue(tc, NULL == log_get_from_idx(l, 3));
CuAssertTrue(tc, NULL == log_get_at_idx(l, 3));
log_delete(l, 2);
CuAssertTrue(tc, 1 == log_count(l));
CuAssertTrue(tc, NULL == log_get_from_idx(l, 2));
CuAssertTrue(tc, NULL == log_get_at_idx(l, 2));
log_delete(l, 1);
CuAssertTrue(tc, 0 == log_count(l));
CuAssertTrue(tc, NULL == log_get_from_idx(l, 1));
CuAssertTrue(tc, NULL == log_get_at_idx(l, 1));
}

void TestLog_delete_onwards(CuTest * tc)
Expand All @@ -125,9 +125,9 @@ void TestLog_delete_onwards(CuTest * tc)
/* even 3 gets deleted */
log_delete(l, 2);
CuAssertTrue(tc, 1 == log_count(l));
CuAssertTrue(tc, e1.id == log_get_from_idx(l, 1)->id);
CuAssertTrue(tc, NULL == log_get_from_idx(l, 2));
CuAssertTrue(tc, NULL == log_get_from_idx(l, 3));
CuAssertTrue(tc, e1.id == log_get_at_idx(l, 1)->id);
CuAssertTrue(tc, NULL == log_get_at_idx(l, 2));
CuAssertTrue(tc, NULL == log_get_at_idx(l, 3));
}

void TestLog_peektail(CuTest * tc)
Expand Down

0 comments on commit a20e859

Please sign in to comment.