Skip to content

Commit

Permalink
[CT-971] Add flag for socks batch processing (#1777)
Browse files Browse the repository at this point in the history
  • Loading branch information
dydxwill authored Jun 26, 2024
1 parent f320246 commit 79a3143
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ describe('message-forwarder', () => {
};

beforeAll(async () => {
config.BATCH_PROCESSING_ENABLED = false;
await dbHelpers.clearData();
await dbHelpers.migrate();
await testMocks.seedData();
await Promise.all([
Expand Down
4 changes: 4 additions & 0 deletions indexer/services/socks/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ export const configSchema = {
COMLINK_URL: parseString(),
AXIOS_TIMEOUT_MS: parseInteger({ default: 5000 }), // 5 seconds
INITIAL_GET_TIMEOUT_MS: parseInteger({ default: 20_000 }), // 20 seconds
BATCH_PROCESSING_ENABLED: parseBoolean({ default: true }),
KAFKA_BATCH_PROCESSING_COMMIT_FREQUENCY_MS: parseNumber({
default: 3_000,
}),
};

////////////////////////////////////////////////////////////////////////////////
Expand Down
2 changes: 1 addition & 1 deletion indexer/services/socks/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ async function start(): Promise<void> {
});

await connectToKafka();
await startConsumer();

const subscriptions: Subscriptions = new Subscriptions();
index = new Index(wss, subscriptions);
messageForwarder = new MessageForwarder(subscriptions, index);
subscriptions.start(messageForwarder.forwardToClient);
messageForwarder.start();
await startConsumer(config.BATCH_PROCESSING_ENABLED);

logger.info({
at: 'index#start',
Expand Down
116 changes: 109 additions & 7 deletions indexer/services/socks/src/lib/message-forwarder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ import {
logger,
InfoObject,
} from '@dydxprotocol-indexer/base';
import { updateOnMessageFunction } from '@dydxprotocol-indexer/kafka';
import { KafkaMessage } from 'kafkajs';
import { updateOnBatchFunction, updateOnMessageFunction } from '@dydxprotocol-indexer/kafka';
import {
Batch,
EachBatchPayload,
KafkaMessage,
} from 'kafkajs';
import _ from 'lodash';

import config from '../config';
Expand Down Expand Up @@ -61,11 +65,25 @@ export class MessageForwarder {
throw new Error('MessageForwarder already started');
}

// Kafkajs requires the function passed into `eachMessage` be an async function.
// eslint-disable-next-line @typescript-eslint/require-await
updateOnMessageFunction(async (topic, message): Promise<void> => {
return this.onMessage(topic, message);
});
if (config.BATCH_PROCESSING_ENABLED) {
logger.info({
at: 'consumers#connect',
message: 'Batch processing enabled',
});
updateOnBatchFunction(async (payload: EachBatchPayload): Promise<void> => {
return this.onBatch(payload);
});
} else {
logger.info({
at: 'consumers#connect',
message: 'Batch processing disabled. Processing each message individually',
});
// Kafkajs requires the function passed into `eachMessage` be an async function.
// eslint-disable-next-line @typescript-eslint/require-await
updateOnMessageFunction(async (topic, message): Promise<void> => {
return this.onMessage(topic, message);
});
}

this.started = true;
this.batchSending = setInterval(
Expand All @@ -74,6 +92,89 @@ export class MessageForwarder {
);
}

public async onBatch(
payload: EachBatchPayload,
): Promise<void> {
const batch: Batch = payload.batch;
const topic: string = batch.topic;
const partition: string = batch.partition.toString();
const metricTags: Record<string, string> = { topic, partition };
if (batch.isEmpty()) {
logger.error({
at: 'on-batch#onBatch',
message: 'Empty batch',
...metricTags,
});
return;
}

const startTime: number = Date.now();
const firstMessageTimestamp: number = Number(batch.messages[0].timestamp);
const batchTimeInQueue: number = startTime - firstMessageTimestamp;
const batchInfo = {
firstMessageTimestamp: new Date(firstMessageTimestamp).toISOString(),
batchTimeInQueue,
messagesInBatch: batch.messages.length,
firstOffset: batch.firstOffset(),
lastOffset: batch.lastOffset(),
...metricTags,
};

logger.info({
at: 'on-batch#onBatch',
message: 'Received batch',
...batchInfo,
});
stats.timing(
'socks.batch_time_in_queue',
batchTimeInQueue,
metricTags,
);

let lastCommitTime: number = startTime;
for (let i = 0; i < batch.messages.length; i++) {
const message: KafkaMessage = batch.messages[i];
await this.onMessage(batch.topic, message);

// Commit every KAFKA_BATCH_PROCESSING_COMMIT_FREQUENCY_MS to reduce number of roundtrips, and
// also prevent disconnecting from the broker due to inactivity.
const now: number = Date.now();
if (now - lastCommitTime > config.KAFKA_BATCH_PROCESSING_COMMIT_FREQUENCY_MS) {
logger.info({
at: 'on-batch#onBatch',
message: 'Committing offsets and sending heart beat',
...batchInfo,
});
payload.resolveOffset(message.offset);
await Promise.all([
payload.heartbeat(),
// commitOffsetsIfNecessary will respect autoCommitThreshold and will not commit if
// fewer messages than the threshold have been processed since the last commit.
payload.commitOffsetsIfNecessary(),
]);
lastCommitTime = now;
}
}

const batchProcessingTime: number = Date.now() - startTime;
logger.info({
at: 'on-batch#onBatch',
message: 'Finished Processing Batch',
batchProcessingTime,
...batchInfo,
});
stats.timing(
'socks.batch_processing_time',
batchProcessingTime,
metricTags,
);
stats.timing(
'socks.batch_size',
batch.messages.length,
metricTags,
);
}

public stop(): void {
if (this.stopped) {
throw new Error('MessageForwarder already stopped');
Expand Down Expand Up @@ -228,6 +329,7 @@ export class MessageForwarder {
stats.increment(
`${config.SERVICE_NAME}.forward_message_with_subscribers`,
1,
config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE,
);
}
}
Expand Down

0 comments on commit 79a3143

Please sign in to comment.