Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance improvement after output refactoring #89

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 78 additions & 60 deletions wal2json.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx,

static bool parse_table_identifier(List *qualified_tables, char separator, List **select_tables);
static bool string_to_SelectTable(char *rawstring, char separator, List **select_tables);
static void appendStringInfoStrings(StringInfo str, int numargs,...);

void
_PG_init(void)
Expand Down Expand Up @@ -391,7 +392,7 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
/* Transaction starts */
OutputPluginPrepareWrite(ctx, true);

appendStringInfo(ctx->out, "{%s", data->nl);
appendStringInfoStrings(ctx->out, 2, "{", data->nl);

if (data->include_xids)
appendStringInfo(ctx->out, "%s\"xid\":%s%u,%s", data->ht, data->sp, txn->xid, data->nl);
Expand All @@ -400,15 +401,15 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
char *lsn_str = DatumGetCString(DirectFunctionCall1(pg_lsn_out, txn->end_lsn));

appendStringInfo(ctx->out, "%s\"nextlsn\":%s\"%s\",%s", data->ht, data->sp, lsn_str, data->nl);
appendStringInfoStrings(ctx->out, 7, data->ht, "\"nextlsn\":", data->sp, "\"", lsn_str, "\"", data->nl);

pfree(lsn_str);
}

if (data->include_timestamp)
appendStringInfo(ctx->out, "%s\"timestamp\":%s\"%s\",%s", data->ht, data->sp, timestamptz_to_str(txn->commit_time), data->nl);
appendStringInfoStrings(ctx->out, 7, data->ht, "\"timestamp\":", data->sp, "\"", timestamptz_to_str(txn->commit_time), "\"", data->nl);

appendStringInfo(ctx->out, "%s\"change\":%s[", data->ht, data->sp);
appendStringInfoStrings(ctx->out, 4, data->ht, "\"change\":", data->sp, "[");

if (data->write_in_chunks)
OutputPluginWrite(ctx, true);
Expand All @@ -434,9 +435,9 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,

/* if we don't write in chunks, we need a newline here */
if (!data->write_in_chunks)
appendStringInfo(ctx->out, "%s", data->nl);
appendStringInfoString(ctx->out, data->nl);

appendStringInfo(ctx->out, "%s]%s}", data->ht, data->nl);
appendStringInfoStrings(ctx->out, 4, data->ht, "]", data->nl, "}");

OutputPluginWrite(ctx, true);
}
Expand Down Expand Up @@ -478,22 +479,22 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu
*/
if (replident)
{
appendStringInfo(&colnames, "%s%s%s\"oldkeys\":%s{%s", data->ht, data->ht, data->ht, data->sp, data->nl);
appendStringInfo(&colnames, "%s%s%s%s\"keynames\":%s[", data->ht, data->ht, data->ht, data->ht, data->sp);
appendStringInfo(&coltypes, "%s%s%s%s\"keytypes\":%s[", data->ht, data->ht, data->ht, data->ht, data->sp);
appendStringInfoStrings(&colnames, 7, data->ht, data->ht, data->ht, "\"oldkeys\":", data->sp, "{", data->nl);
appendStringInfoStrings(&colnames, 7, data->ht, data->ht, data->ht, data->ht, "\"keynames\":", data->sp, "[");
appendStringInfoStrings(&coltypes, 7, data->ht, data->ht, data->ht, data->ht, "\"keytypes\":", data->sp, "[");
if (data->include_type_oids)
appendStringInfo(&coltypeoids, "%s%s%s%s\"keytypeoids\":%s[", data->ht, data->ht, data->ht, data->ht, data->sp);
appendStringInfo(&colvalues, "%s%s%s%s\"keyvalues\":%s[", data->ht, data->ht, data->ht, data->ht, data->sp);
appendStringInfoStrings(&coltypeoids, 7, data->ht, data->ht, data->ht, data->ht, "\"keytypeoids\":", data->sp, "[");
appendStringInfoStrings(&colvalues, 7, data->ht, data->ht, data->ht, data->ht, "\"keyvalues\":", data->sp, "[");
}
else
{
appendStringInfo(&colnames, "%s%s%s\"columnnames\":%s[", data->ht, data->ht, data->ht, data->sp);
appendStringInfo(&coltypes, "%s%s%s\"columntypes\":%s[", data->ht, data->ht, data->ht, data->sp);
appendStringInfoStrings(&colnames, 6, data->ht, data->ht, data->ht, "\"columnnames\":", data->sp, "[");
appendStringInfoStrings(&coltypes, 6, data->ht, data->ht, data->ht, "\"columntypes\":", data->sp, "[");
if (data->include_type_oids)
appendStringInfo(&coltypeoids, "%s%s%s\"columntypeoids\":%s[", data->ht, data->ht, data->ht, data->sp);
appendStringInfoStrings(&coltypeoids, 6, data->ht, data->ht, data->ht, "\"columntypeoids\":", data->sp, "[");
if (data->include_not_null)
appendStringInfo(&colnotnulls, "%s%s%s\"columnoptionals\":%s[", data->ht, data->ht, data->ht, data->sp);
appendStringInfo(&colvalues, "%s%s%s\"columnvalues\":%s[", data->ht, data->ht, data->ht, data->sp);
appendStringInfoStrings(&colnotnulls, 6, data->ht, data->ht, data->ht, "\"columnoptionals\":", data->sp, "[");
appendStringInfoStrings(&colvalues, 6, data->ht, data->ht, data->ht, "\"columnvalues\":", data->sp, "[");
}

/* Print column information (name, type, value) */
Expand Down Expand Up @@ -578,7 +579,7 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu
}

/* Accumulate each column info */
appendStringInfo(&colnames, "%s", comma);
appendStringInfoString(&colnames, comma);
escape_json(&colnames, NameStr(attr->attname));

if (data->include_types)
Expand All @@ -588,24 +589,24 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu
char *type_str;

type_str = TextDatumGetCString(DirectFunctionCall2(format_type, attr->atttypid, attr->atttypmod));
appendStringInfo(&coltypes, "%s", comma);
appendStringInfoString(&coltypes, comma);
escape_json(&coltypes, type_str);
pfree(type_str);
}
else
{
Form_pg_type type_form = (Form_pg_type) GETSTRUCT(type_tuple);
appendStringInfo(&coltypes, "%s", comma);
appendStringInfoString(&coltypes, comma);
escape_json(&coltypes, NameStr(type_form->typname));
}

/* oldkeys doesn't print not-null constraints */
if (!replident && data->include_not_null)
{
if (attr->attnotnull)
appendStringInfo(&colnotnulls, "%sfalse", comma);
appendStringInfoStrings(&colnotnulls, 2, comma, "false");
else
appendStringInfo(&colnotnulls, "%strue", comma);
appendStringInfoStrings(&colnotnulls, 2, comma, "true");
}
}

Expand All @@ -616,7 +617,7 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu

if (isnull)
{
appendStringInfo(&colvalues, "%snull", comma);
appendStringInfoStrings(&colvalues, 2, comma, "null");
}
else
{
Expand Down Expand Up @@ -648,27 +649,27 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu
pg_strncasecmp(outputstr, "Infinity", 8) == 0 ||
pg_strncasecmp(outputstr, "-Infinity", 9) == 0)
{
appendStringInfo(&colvalues, "%snull", comma);
appendStringInfoStrings(&colvalues, 2, comma, "null");
elog(DEBUG1, "attribute \"%s\" is special: %s", NameStr(attr->attname), outputstr);
}
else if (strspn(outputstr, "0123456789+-eE.") == strlen(outputstr))
appendStringInfo(&colvalues, "%s%s", comma, outputstr);
appendStringInfoStrings(&colvalues, 2, comma, outputstr);
else
elog(ERROR, "%s is not a number", outputstr);
break;
case BOOLOID:
if (strcmp(outputstr, "t") == 0)
appendStringInfo(&colvalues, "%strue", comma);
appendStringInfoStrings(&colvalues, 2, comma, "true");
else
appendStringInfo(&colvalues, "%sfalse", comma);
appendStringInfoStrings(&colvalues, 2, comma, "false");
break;
case BYTEAOID:
appendStringInfo(&colvalues, "%s", comma);
appendStringInfoString(&colvalues, comma);
/* string is "\x54617069727573", start after "\x" */
escape_json(&colvalues, (outputstr + 2));
break;
default:
appendStringInfo(&colvalues, "%s", comma);
appendStringInfoString(&colvalues, comma);
escape_json(&colvalues, outputstr);
break;
}
Expand All @@ -682,27 +683,27 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu
/* Column info ends */
if (replident)
{
appendStringInfo(&colnames, "],%s", data->nl);
appendStringInfoStrings(&colnames, 2, "],", data->nl);
if (data->include_types)
appendStringInfo(&coltypes, "],%s", data->nl);
appendStringInfoStrings(&coltypes, 2, "],", data->nl);
if (data->include_type_oids)
appendStringInfo(&coltypeoids, "],%s", data->nl);
appendStringInfo(&colvalues, "]%s", data->nl);
appendStringInfo(&colvalues, "%s%s%s}%s", data->ht, data->ht, data->ht, data->nl);
appendStringInfoStrings(&coltypeoids, 2, "],", data->nl);
appendStringInfoStrings(&colvalues, 2, "]", data->nl);
appendStringInfoStrings(&colvalues, 5, data->ht, data->ht, data->ht, "}", data->nl);
}
else
{
appendStringInfo(&colnames, "],%s", data->nl);
appendStringInfoStrings(&colnames, 2, "],", data->nl);
if (data->include_types)
appendStringInfo(&coltypes, "],%s", data->nl);
appendStringInfoStrings(&coltypes, 2, "],", data->nl);
if (data->include_type_oids)
appendStringInfo(&coltypeoids, "],%s", data->nl);
appendStringInfoStrings(&coltypeoids, 2, "],", data->nl);
if (data->include_not_null)
appendStringInfo(&colnotnulls, "],%s", data->nl);
appendStringInfoStrings(&colnotnulls, 2, "],", data->nl);
if (hasreplident)
appendStringInfo(&colvalues, "],%s", data->nl);
appendStringInfoStrings(&colvalues, 2, "],", data->nl);
else
appendStringInfo(&colvalues, "]%s", data->nl);
appendStringInfoStrings(&colvalues, 2, "]", data->nl);
}

/* Print data */
Expand Down Expand Up @@ -891,26 +892,26 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,

/* if we don't write in chunks, we need a newline here */
if (!data->write_in_chunks)
appendStringInfo(ctx->out, "%s", data->nl);
appendStringInfoString(ctx->out, data->nl);

appendStringInfo(ctx->out, "%s%s", data->ht, data->ht);
appendStringInfoStrings(ctx->out, 2, data->ht, data->ht);

if (data->nr_changes > 1)
appendStringInfoChar(ctx->out, ',');

appendStringInfo(ctx->out, "{%s", data->nl);
appendStringInfoStrings(ctx->out, 2, "{", data->nl);

/* Print change kind */
switch (change->action)
{
case REORDER_BUFFER_CHANGE_INSERT:
appendStringInfo(ctx->out, "%s%s%s\"kind\":%s\"insert\",%s", data->ht, data->ht, data->ht, data->sp, data->nl);
appendStringInfoStrings(ctx->out, 7, data->ht, data->ht, data->ht, "\"kind\":", data->sp, "\"insert\",", data->nl);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
appendStringInfo(ctx->out, "%s%s%s\"kind\":%s\"update\",%s", data->ht, data->ht, data->ht, data->sp, data->nl);
appendStringInfoStrings(ctx->out, 7, data->ht, data->ht, data->ht, "\"kind\":", data->sp, "\"update\",", data->nl);
break;
case REORDER_BUFFER_CHANGE_DELETE:
appendStringInfo(ctx->out, "%s%s%s\"kind\":%s\"delete\",%s", data->ht, data->ht, data->ht, data->sp, data->nl);
appendStringInfoStrings(ctx->out, 7, data->ht, data->ht, data->ht, "\"kind\":", data->sp, "\"delete\",", data->nl);
break;
default:
Assert(false);
Expand All @@ -919,13 +920,13 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Print table name (possibly) qualified */
if (data->include_schemas)
{
appendStringInfo(ctx->out, "%s%s%s\"schema\":%s", data->ht, data->ht, data->ht, data->sp);
appendStringInfoStrings(ctx->out, 5, data->ht, data->ht, data->ht, "\"schema\":", data->sp);
escape_json(ctx->out, get_namespace_name(class_form->relnamespace));
appendStringInfo(ctx->out, ",%s", data->nl);
appendStringInfoStrings(ctx->out, 2, ",", data->nl);
}
appendStringInfo(ctx->out, "%s%s%s\"table\":%s", data->ht, data->ht, data->ht, data->sp);
appendStringInfoStrings(ctx->out, 5, data->ht, data->ht, data->ht, "\"table\":", data->sp);
escape_json(ctx->out, NameStr(class_form->relname));
appendStringInfo(ctx->out, ",%s", data->nl);
appendStringInfoStrings(ctx->out, 2, ",", data->nl);

switch (change->action)
{
Expand Down Expand Up @@ -991,7 +992,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Assert(false);
}

appendStringInfo(ctx->out, "%s%s}", data->ht, data->ht);
appendStringInfoStrings(ctx->out, 3, data->ht, data->ht, "}");

MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
Expand Down Expand Up @@ -1032,38 +1033,38 @@ pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,

/* if we don't write in chunks, we need a newline here */
if (!data->write_in_chunks && transactional)
appendStringInfo(ctx->out, "%s", data->nl);
appendStringInfoString(ctx->out, data->nl);

/* build a complete JSON object for non-transactional message */
if (!transactional)
appendStringInfo(ctx->out, "{%s%s\"change\":%s[%s", data->nl, data->ht, data->sp, data->nl);
appendStringInfoStrings(ctx->out, 7, "{", data->nl, data->ht, "\"change\":", data->sp, "[", data->nl);

appendStringInfo(ctx->out, "%s%s", data->ht, data->ht);
appendStringInfoStrings(ctx->out, 2, data->ht, data->ht);

if (data->nr_changes > 1)
appendStringInfoChar(ctx->out, ',');

appendStringInfo(ctx->out, "{%s%s%s%s\"kind\":%s\"message\",%s", data->nl, data->ht, data->ht, data->ht, data->sp, data->nl);
appendStringInfoStrings(ctx->out, 9, "{", data->nl, data->ht, data->ht, data->ht, "\"kind\":", data->sp, "\"message\",", data->nl);

if (transactional)
appendStringInfo(ctx->out, "%s%s%s\"transactional\":%strue,%s", data->ht, data->ht, data->ht, data->sp, data->nl);
appendStringInfoStrings(ctx->out, 7, data->ht, data->ht, data->ht, "\"transactional\":", data->sp, "true,", data->nl);
else
appendStringInfo(ctx->out, "%s%s%s\"transactional\":%sfalse,%s", data->ht, data->ht, data->ht, data->sp, data->nl);
appendStringInfoStrings(ctx->out, 7, data->ht, data->ht, data->ht, "\"transactional\":", data->sp, "false,", data->nl);

appendStringInfo(ctx->out, "%s%s%s\"prefix\":%s", data->ht, data->ht, data->ht, data->sp);
appendStringInfoStrings(ctx->out, 5, data->ht, data->ht, data->ht, "\"prefix\":", data->sp);
escape_json(ctx->out, prefix);
appendStringInfo(ctx->out, ",%s%s%s%s\"content\":%s", data->nl, data->ht, data->ht, data->ht, data->sp);
appendStringInfoStrings(ctx->out, 7, ",", data->nl, data->ht, data->ht, data->ht, "\"content\":", data->sp);

content_str = (char *) palloc0((content_size + 1) * sizeof(char));
strncpy(content_str, content, content_size);
escape_json(ctx->out, content_str);
pfree(content_str);

appendStringInfo(ctx->out, "%s%s%s}", data->nl, data->ht, data->ht);
appendStringInfoStrings(ctx->out, 4, data->nl, data->ht, data->ht, "}");

/* build a complete JSON object for non-transactional message */
if (!transactional)
appendStringInfo(ctx->out, "%s%s]%s}", data->nl, data->ht, data->nl);
appendStringInfoStrings(ctx->out, 5, data->nl, data->ht, "]", data->nl, "}");

MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
Expand Down Expand Up @@ -1218,3 +1219,20 @@ string_to_SelectTable(char *rawstring, char separator, List **select_tables)

return true;
}

static void
appendStringInfoStrings(StringInfo str, int numargs,...)
{
va_list args;
int i;

va_start(args, numargs);
for (i = 0; i < numargs; i++)
{
char* s = va_arg(args, char*);
if (s[0] != '\0')
appendStringInfoString(str, s);
}
va_end(args);
}