From 79a31430dce3f72884738c25db39cfd3a4365ea2 Mon Sep 17 00:00:00 2001 From: dydxwill <119354122+dydxwill@users.noreply.github.com> Date: Wed, 26 Jun 2024 17:01:09 -0400 Subject: [PATCH] [CT-971] Add flag for socks batch processing (#1777) --- .../__tests__/lib/message-forwarder.test.ts | 2 + indexer/services/socks/src/config.ts | 4 + indexer/services/socks/src/index.ts | 2 +- .../socks/src/lib/message-forwarder.ts | 116 ++++++++++++++++-- 4 files changed, 116 insertions(+), 8 deletions(-) diff --git a/indexer/services/socks/__tests__/lib/message-forwarder.test.ts b/indexer/services/socks/__tests__/lib/message-forwarder.test.ts index ccdbd4c263..8524b4b33d 100644 --- a/indexer/services/socks/__tests__/lib/message-forwarder.test.ts +++ b/indexer/services/socks/__tests__/lib/message-forwarder.test.ts @@ -182,6 +182,8 @@ describe('message-forwarder', () => { }; beforeAll(async () => { + config.BATCH_PROCESSING_ENABLED = false; + await dbHelpers.clearData(); await dbHelpers.migrate(); await testMocks.seedData(); await Promise.all([ diff --git a/indexer/services/socks/src/config.ts b/indexer/services/socks/src/config.ts index f0bafab341..a9514594c1 100644 --- a/indexer/services/socks/src/config.ts +++ b/indexer/services/socks/src/config.ts @@ -49,6 +49,10 @@ export const configSchema = { COMLINK_URL: parseString(), AXIOS_TIMEOUT_MS: parseInteger({ default: 5000 }), // 5 seconds INITIAL_GET_TIMEOUT_MS: parseInteger({ default: 20_000 }), // 20 seconds + BATCH_PROCESSING_ENABLED: parseBoolean({ default: true }), + KAFKA_BATCH_PROCESSING_COMMIT_FREQUENCY_MS: parseNumber({ + default: 3_000, + }), }; //////////////////////////////////////////////////////////////////////////////// diff --git a/indexer/services/socks/src/index.ts b/indexer/services/socks/src/index.ts index 6e27875939..b05a1e83b0 100644 --- a/indexer/services/socks/src/index.ts +++ b/indexer/services/socks/src/index.ts @@ -61,13 +61,13 @@ async function start(): Promise { }); await connectToKafka(); - await startConsumer(); const subscriptions: Subscriptions = new Subscriptions(); index = new Index(wss, subscriptions); messageForwarder = new MessageForwarder(subscriptions, index); subscriptions.start(messageForwarder.forwardToClient); messageForwarder.start(); + await startConsumer(config.BATCH_PROCESSING_ENABLED); logger.info({ at: 'index#start', diff --git a/indexer/services/socks/src/lib/message-forwarder.ts b/indexer/services/socks/src/lib/message-forwarder.ts index 163260db4b..6f8cc4e34a 100644 --- a/indexer/services/socks/src/lib/message-forwarder.ts +++ b/indexer/services/socks/src/lib/message-forwarder.ts @@ -3,8 +3,12 @@ import { logger, InfoObject, } from '@dydxprotocol-indexer/base'; -import { updateOnMessageFunction } from '@dydxprotocol-indexer/kafka'; -import { KafkaMessage } from 'kafkajs'; +import { updateOnBatchFunction, updateOnMessageFunction } from '@dydxprotocol-indexer/kafka'; +import { + Batch, + EachBatchPayload, + KafkaMessage, +} from 'kafkajs'; import _ from 'lodash'; import config from '../config'; @@ -61,11 +65,25 @@ export class MessageForwarder { throw new Error('MessageForwarder already started'); } - // Kafkajs requires the function passed into `eachMessage` be an async function. - // eslint-disable-next-line @typescript-eslint/require-await - updateOnMessageFunction(async (topic, message): Promise => { - return this.onMessage(topic, message); - }); + if (config.BATCH_PROCESSING_ENABLED) { + logger.info({ + at: 'consumers#connect', + message: 'Batch processing enabled', + }); + updateOnBatchFunction(async (payload: EachBatchPayload): Promise => { + return this.onBatch(payload); + }); + } else { + logger.info({ + at: 'consumers#connect', + message: 'Batch processing disabled. Processing each message individually', + }); + // Kafkajs requires the function passed into `eachMessage` be an async function. + // eslint-disable-next-line @typescript-eslint/require-await + updateOnMessageFunction(async (topic, message): Promise => { + return this.onMessage(topic, message); + }); + } this.started = true; this.batchSending = setInterval( @@ -74,6 +92,89 @@ export class MessageForwarder { ); } + public async onBatch( + payload: EachBatchPayload, + ): Promise { + const batch: Batch = payload.batch; + const topic: string = batch.topic; + const partition: string = batch.partition.toString(); + const metricTags: Record = { topic, partition }; + if (batch.isEmpty()) { + logger.error({ + at: 'on-batch#onBatch', + message: 'Empty batch', + ...metricTags, + }); + return; + } + + const startTime: number = Date.now(); + const firstMessageTimestamp: number = Number(batch.messages[0].timestamp); + const batchTimeInQueue: number = startTime - firstMessageTimestamp; + const batchInfo = { + firstMessageTimestamp: new Date(firstMessageTimestamp).toISOString(), + batchTimeInQueue, + messagesInBatch: batch.messages.length, + firstOffset: batch.firstOffset(), + lastOffset: batch.lastOffset(), + ...metricTags, + }; + + logger.info({ + at: 'on-batch#onBatch', + message: 'Received batch', + ...batchInfo, + }); + stats.timing( + 'socks.batch_time_in_queue', + batchTimeInQueue, + metricTags, + ); + + let lastCommitTime: number = startTime; + for (let i = 0; i < batch.messages.length; i++) { + const message: KafkaMessage = batch.messages[i]; + await this.onMessage(batch.topic, message); + + // Commit every KAFKA_BATCH_PROCESSING_COMMIT_FREQUENCY_MS to reduce number of roundtrips, and + // also prevent disconnecting from the broker due to inactivity. + const now: number = Date.now(); + if (now - lastCommitTime > config.KAFKA_BATCH_PROCESSING_COMMIT_FREQUENCY_MS) { + logger.info({ + at: 'on-batch#onBatch', + message: 'Committing offsets and sending heart beat', + ...batchInfo, + }); + payload.resolveOffset(message.offset); + await Promise.all([ + payload.heartbeat(), + // commitOffsetsIfNecessary will respect autoCommitThreshold and will not commit if + // fewer messages than the threshold have been processed since the last commit. + payload.commitOffsetsIfNecessary(), + ]); + lastCommitTime = now; + } + } + + const batchProcessingTime: number = Date.now() - startTime; + logger.info({ + at: 'on-batch#onBatch', + message: 'Finished Processing Batch', + batchProcessingTime, + ...batchInfo, + }); + stats.timing( + 'socks.batch_processing_time', + batchProcessingTime, + metricTags, + ); + stats.timing( + 'socks.batch_size', + batch.messages.length, + metricTags, + ); + } + public stop(): void { if (this.stopped) { throw new Error('MessageForwarder already stopped'); @@ -228,6 +329,7 @@ export class MessageForwarder { stats.increment( `${config.SERVICE_NAME}.forward_message_with_subscribers`, 1, + config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE, ); } }