diff --git a/indexer/packages/v4-protos/src/index.ts b/indexer/packages/v4-protos/src/index.ts index 6d05ae5fc4..8b081abbf9 100644 --- a/indexer/packages/v4-protos/src/index.ts +++ b/indexer/packages/v4-protos/src/index.ts @@ -16,3 +16,4 @@ export * from './codegen/google/protobuf/timestamp'; export * from './codegen/dydxprotocol/indexer/protocol/v1/clob'; export * from './codegen/dydxprotocol/indexer/protocol/v1/subaccount'; export * from './codegen/dydxprotocol/indexer/shared/removal_reason'; +export * from './utils'; diff --git a/indexer/packages/v4-protos/src/utils.ts b/indexer/packages/v4-protos/src/utils.ts new file mode 100644 index 0000000000..dfbd71f77d --- /dev/null +++ b/indexer/packages/v4-protos/src/utils.ts @@ -0,0 +1,12 @@ +import { Timestamp } from './codegen/google/protobuf/timestamp'; + +export const MILLIS_IN_NANOS: number = 1_000_000; +export const SECONDS_IN_MILLIS: number = 1_000; +export function protoTimestampToDate( + protoTime: Timestamp, +): Date { + const timeInMillis: number = Number(protoTime.seconds) * SECONDS_IN_MILLIS + + Math.floor(protoTime.nanos / MILLIS_IN_NANOS); + + return new Date(timeInMillis); +} diff --git a/indexer/services/ender/__tests__/helpers/indexer-proto-helpers.ts b/indexer/services/ender/__tests__/helpers/indexer-proto-helpers.ts index 5a587b2e78..d8005231c9 100644 --- a/indexer/services/ender/__tests__/helpers/indexer-proto-helpers.ts +++ b/indexer/services/ender/__tests__/helpers/indexer-proto-helpers.ts @@ -51,6 +51,7 @@ import { PerpetualMarketCreateEventV1, PerpetualMarketCreateEventV2, DeleveragingEventV1, + protoTimestampToDate, } from '@dydxprotocol-indexer/v4-protos'; import { PerpetualMarketType, @@ -64,7 +65,6 @@ import { generatePerpetualMarketMessage, generatePerpetualPositionsContents, } from '../../src/helpers/kafka-helper'; -import { protoTimestampToDate } from '../../src/lib/helper'; import { DydxIndexerSubtypes, VulcanMessage } from '../../src/lib/types'; // TX Hash is SHA256, so is of length 64 hexadecimal without the '0x'. diff --git a/indexer/services/ender/src/lib/helper.ts b/indexer/services/ender/src/lib/helper.ts index dc7fe89538..6d58002a60 100644 --- a/indexer/services/ender/src/lib/helper.ts +++ b/indexer/services/ender/src/lib/helper.ts @@ -8,7 +8,6 @@ import { import { IndexerTendermintEvent, IndexerTendermintEvent_BlockEvent, - Timestamp, OrderFillEventV1, MarketEventV1, SubaccountUpdateEventV1, @@ -32,10 +31,6 @@ import Big from 'big.js'; import _ from 'lodash'; import { DateTime } from 'luxon'; -import { - MILLIS_IN_NANOS, - SECONDS_IN_MILLIS, -} from '../constants'; import { AnnotatedSubaccountMessage, DydxIndexerSubtypes, @@ -73,15 +68,6 @@ export function convertToSubaccountMessage( return subaccountMessage; } -export function protoTimestampToDate( - protoTime: Timestamp, -): Date { - const timeInMillis: number = Number(protoTime.seconds) * SECONDS_IN_MILLIS + - Math.floor(protoTime.nanos / MILLIS_IN_NANOS); - - return new Date(timeInMillis); -} - export function dateToDateTime( protoTime: Date, ): DateTime { diff --git a/indexer/services/socks/src/lib/message-forwarder.ts b/indexer/services/socks/src/lib/message-forwarder.ts index 61a201b69e..ae17acc99e 100644 --- a/indexer/services/socks/src/lib/message-forwarder.ts +++ b/indexer/services/socks/src/lib/message-forwarder.ts @@ -87,9 +87,10 @@ export class MessageForwarder { } public onMessage(topic: string, message: KafkaMessage): void { + const start: number = Date.now(); stats.timing( `${config.SERVICE_NAME}.message_time_in_queue`, - Date.now() - Number(message.timestamp), + start - Number(message.timestamp), config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE, { topic, @@ -184,10 +185,10 @@ export class MessageForwarder { if (subscriptions.length > 0) { if (message.channel !== Channel.V4_ORDERBOOK || - ( - // Don't log orderbook messages unless enabled - message.channel === Channel.V4_ORDERBOOK && config.ENABLE_ORDERBOOK_LOGS - ) + ( + // Don't log orderbook messages unless enabled + message.channel === Channel.V4_ORDERBOOK && config.ENABLE_ORDERBOOK_LOGS + ) ) { logger.debug({ at: 'message-forwarder#forwardMessage', @@ -200,7 +201,7 @@ export class MessageForwarder { // Buffer messages if the subscription is for batched messages if (this.subscriptions.batchedSubscriptions[message.channel] && - this.subscriptions.batchedSubscriptions[message.channel][message.id]) { + this.subscriptions.batchedSubscriptions[message.channel][message.id]) { const bufferKey: string = this.getMessageBufferKey( message.channel, message.id, diff --git a/indexer/services/vulcan/__tests__/handlers/order-place-handler.test.ts b/indexer/services/vulcan/__tests__/handlers/order-place-handler.test.ts index d26015e30e..ceab04794c 100644 --- a/indexer/services/vulcan/__tests__/handlers/order-place-handler.test.ts +++ b/indexer/services/vulcan/__tests__/handlers/order-place-handler.test.ts @@ -62,6 +62,7 @@ import { expectCanceledOrderStatus, expectOpenOrderIds, handleInitialOrderPlace import { expectOffchainUpdateMessage, expectWebsocketOrderbookMessage, expectWebsocketSubaccountMessage } from '../helpers/websocket-helpers'; import { OrderbookSide } from '../../src/lib/types'; import { getOrderIdHash, isLongTermOrder, isStatefulOrder } from '@dydxprotocol-indexer/v4-proto-parser'; +import { defaultKafkaHeaders } from '../helpers/constants'; import config from '../../src/config'; jest.mock('@dydxprotocol-indexer/base', () => ({ @@ -196,6 +197,12 @@ describe('order-place-handler', () => { const replacementMessageIoc: KafkaMessage = createKafkaMessage( Buffer.from(Uint8Array.from(OffChainUpdateV1.encode(replacementUpdateIoc).finish())), ); + [replacementMessage, replacementMessageGoodTilBlockTime, replacementMessageConditional, + replacementMessageFok, replacementMessageIoc].forEach((message) => { + // eslint-disable-next-line no-param-reassign + message.headers = defaultKafkaHeaders; + }); + const dbDefaultOrder: OrderFromDatabase = { ...testConstants.defaultOrder, id: testConstants.defaultOrderId, @@ -1225,7 +1232,11 @@ function expectWebsocketMessagesSent( version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, }); - expectWebsocketSubaccountMessage(producerSendSpy.mock.calls[callIndex][0], subaccountMessage); + expectWebsocketSubaccountMessage( + producerSendSpy.mock.calls[callIndex][0], + subaccountMessage, + defaultKafkaHeaders, + ); callIndex += 1; } diff --git a/indexer/services/vulcan/__tests__/handlers/order-remove-handler.test.ts b/indexer/services/vulcan/__tests__/handlers/order-remove-handler.test.ts index b0b00075f2..e5c242f510 100644 --- a/indexer/services/vulcan/__tests__/handlers/order-remove-handler.test.ts +++ b/indexer/services/vulcan/__tests__/handlers/order-remove-handler.test.ts @@ -5,6 +5,9 @@ import { STATS_FUNCTION_NAME, wrapBackgroundTask, } from '@dydxprotocol-indexer/base'; +import { + defaultTime, +} from '../helpers/constants'; import { synchronizeWrapBackgroundTask } from '@dydxprotocol-indexer/dev'; import { ORDERBOOKS_WEBSOCKET_MESSAGE_VERSION, @@ -52,14 +55,16 @@ import { OrderRemoveV1_OrderRemovalStatus, RedisOrder, SubaccountMessage, + protoTimestampToDate, } from '@dydxprotocol-indexer/v4-protos'; import Big from 'big.js'; -import { ProducerRecord } from 'kafkajs'; +import { IHeaders, ProducerRecord } from 'kafkajs'; import { DateTime } from 'luxon'; import { OrderRemoveHandler } from '../../src/handlers/order-remove-handler'; import { OrderbookSide } from '../../src/lib/types'; import { redisClient } from '../../src/helpers/redis/redis-controller'; + import { expectCanceledOrderStatus, expectOpenOrderIds, @@ -133,6 +138,10 @@ describe('OrderRemoveHandler', () => { timeInForce: TimeInForce.IOC, }; + const defaultKafkaHeaders: IHeaders = { + message_received_timestamp: String(protoTimestampToDate(defaultTime)), + }; + it.each([ [ { @@ -179,7 +188,10 @@ describe('OrderRemoveHandler', () => { const offChainUpdate: OffChainUpdateV1 = orderRemoveToOffChainUpdate(orderRemoveJson); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await expect(orderRemoveHandler.handleUpdate(offChainUpdate)).rejects.toThrow( + await expect(orderRemoveHandler.handleUpdate( + offChainUpdate, + defaultKafkaHeaders, + )).rejects.toThrow( new ParseMessageError(errorMessage), ); expect(logger.error).toHaveBeenCalledWith(expect.objectContaining({ @@ -194,7 +206,10 @@ describe('OrderRemoveHandler', () => { const offChainUpdate: OffChainUpdateV1 = orderRemoveToOffChainUpdate(defaultOrderRemove); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate(offChainUpdate); + await orderRemoveHandler.handleUpdate( + offChainUpdate, + defaultKafkaHeaders, + ); expect(logger.info).toHaveBeenCalledWith(expect.objectContaining({ at: 'orderRemoveHandler#handleOrderRemoval', @@ -215,7 +230,10 @@ describe('OrderRemoveHandler', () => { const offChainUpdate: OffChainUpdateV1 = orderRemoveToOffChainUpdate(defaultOrderRemove); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate(offChainUpdate); + await orderRemoveHandler.handleUpdate( + offChainUpdate, + defaultKafkaHeaders, + ); const ticker: string = testConstants.defaultPerpetualMarket.ticker; expect(logger.error).toHaveBeenCalledWith({ @@ -320,7 +338,10 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate(offChainUpdate); + await orderRemoveHandler.handleUpdate( + offChainUpdate, + defaultKafkaHeaders, + ); // orderbook level reduced by defaultQuantums const remainingOrderbookLevel: string = Big( @@ -470,7 +491,10 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate(offChainUpdate); + await orderRemoveHandler.handleUpdate( + offChainUpdate, + defaultKafkaHeaders, + ); await Promise.all([ expectOrderStatus(expectedOrderUuid, OrderStatus.BEST_EFFORT_CANCELED), @@ -604,7 +628,10 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate(offChainUpdate); + await orderRemoveHandler.handleUpdate( + offChainUpdate, + defaultKafkaHeaders, + ); await Promise.all([ expectOrderStatus(expectedOrderUuid, OrderStatus.CANCELED), @@ -742,7 +769,10 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate(offChainUpdate); + await orderRemoveHandler.handleUpdate( + offChainUpdate, + defaultKafkaHeaders, + ); await Promise.all([ expectOrderStatus(expectedOrderUuid, OrderStatus.CANCELED), @@ -895,7 +925,10 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate(offChainUpdate); + await orderRemoveHandler.handleUpdate( + offChainUpdate, + defaultKafkaHeaders, + ); await Promise.all([ expectOrderStatus(expectedOrderUuid, removedOrder.status), @@ -978,7 +1011,10 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate(offChainUpdate); + await orderRemoveHandler.handleUpdate( + offChainUpdate, + defaultKafkaHeaders, + ); await Promise.all([ expectOrderStatus(expectedOrderUuid, OrderStatus.FILLED), @@ -1014,7 +1050,10 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate(offChainUpdate); + await orderRemoveHandler.handleUpdate( + offChainUpdate, + defaultKafkaHeaders, + ); expect(producerSendSpy).not.toHaveBeenCalled(); expect(logger.error).toHaveBeenCalledWith(expect.objectContaining({ @@ -1062,7 +1101,10 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate(offChainUpdate); + await orderRemoveHandler.handleUpdate( + offChainUpdate, + defaultKafkaHeaders, + ); // Subaccounts message is sent first followed by orderbooks message const subaccountContents: SubaccountMessageContents = { @@ -1162,7 +1204,10 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate(offChainUpdate); + await orderRemoveHandler.handleUpdate( + offChainUpdate, + defaultKafkaHeaders, + ); await Promise.all([ // orderbook should not be affected, so it will be set to defaultQuantums @@ -1279,7 +1324,10 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate(offChainUpdate); + await orderRemoveHandler.handleUpdate( + offChainUpdate, + defaultKafkaHeaders, + ); // orderbook level reduced by defaultQuantums const remainingOrderbookLevel: string = Big( @@ -1423,7 +1471,10 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate(offChainUpdate); + await orderRemoveHandler.handleUpdate( + offChainUpdate, + defaultKafkaHeaders, + ); await Promise.all([ expectOrderbookLevelCache( @@ -1545,7 +1596,10 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate(offChainUpdate); + await orderRemoveHandler.handleUpdate( + offChainUpdate, + defaultKafkaHeaders, + ); // orderbook level reduced by defaultQuantums const remainingOrderbookLevel: string = Big( @@ -1676,7 +1730,10 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate(offChainUpdate); + await orderRemoveHandler.handleUpdate( + offChainUpdate, + defaultKafkaHeaders, + ); // orderbook level should not be reduced const remainingOrderbookLevel: string = Big( @@ -1738,7 +1795,10 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate(offChainUpdate); + await orderRemoveHandler.handleUpdate( + offChainUpdate, + defaultKafkaHeaders, + ); expect(producerSendSpy).not.toHaveBeenCalled(); expect(logger.error).toHaveBeenCalledWith(expect.objectContaining({ @@ -1805,7 +1865,10 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate(offChainUpdate); + await orderRemoveHandler.handleUpdate( + offChainUpdate, + defaultKafkaHeaders, + ); expect(producerSendSpy).not.toHaveBeenCalled(); expect(stats.increment).toHaveBeenCalledWith('vulcan.indexer_expired_order_not_found', 1); @@ -1878,7 +1941,10 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate(offChainUpdate); + await orderRemoveHandler.handleUpdate( + offChainUpdate, + defaultKafkaHeaders, + ); expect(producerSendSpy).not.toHaveBeenCalled(); expect(logger.error).toHaveBeenCalledWith(expect.objectContaining({ @@ -1940,7 +2006,10 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate(offChainUpdate); + await orderRemoveHandler.handleUpdate( + offChainUpdate, + defaultKafkaHeaders, + ); expect(producerSendSpy).not.toHaveBeenCalled(); expect( @@ -2044,7 +2113,11 @@ describe('OrderRemoveHandler', () => { if (expectedSubaccountMessage !== undefined) { const subaccountProducerRecord: ProducerRecord = producerSendSpy.mock.calls[0][0]; - expectWebsocketSubaccountMessage(subaccountProducerRecord, expectedSubaccountMessage); + expectWebsocketSubaccountMessage( + subaccountProducerRecord, + expectedSubaccountMessage, + defaultKafkaHeaders, + ); } if (expectedOrderbookMessage !== undefined) { diff --git a/indexer/services/vulcan/__tests__/helpers/constants.ts b/indexer/services/vulcan/__tests__/helpers/constants.ts new file mode 100644 index 0000000000..5b31462e29 --- /dev/null +++ b/indexer/services/vulcan/__tests__/helpers/constants.ts @@ -0,0 +1,19 @@ +import { + MILLIS_IN_NANOS, + SECONDS_IN_MILLIS, + Timestamp, + protoTimestampToDate, +} from '@dydxprotocol-indexer/v4-protos'; +import { IHeaders } from 'kafkajs'; +import Long from 'long'; +import { DateTime } from 'luxon'; + +const defaultDateTime: DateTime = DateTime.utc(2022, 6, 1, 12, 1, 1, 2); +export const defaultTime: Timestamp = { + seconds: Long.fromValue(Math.floor(defaultDateTime.toSeconds()), true), + nanos: (defaultDateTime.toMillis() % SECONDS_IN_MILLIS) * MILLIS_IN_NANOS, +}; + +export const defaultKafkaHeaders: IHeaders = { + message_received_timestamp: String(protoTimestampToDate(defaultTime)), +}; diff --git a/indexer/services/vulcan/__tests__/helpers/helpers.ts b/indexer/services/vulcan/__tests__/helpers/helpers.ts index 370949bdf0..f648a0989d 100644 --- a/indexer/services/vulcan/__tests__/helpers/helpers.ts +++ b/indexer/services/vulcan/__tests__/helpers/helpers.ts @@ -13,6 +13,7 @@ import { KafkaMessage } from 'kafkajs'; import { redisClient } from '../../src/helpers/redis/redis-controller'; import { onMessage } from '../../src/lib/on-message'; import { DydxRecordHeaderKeys } from '../../src/lib/types'; +import { defaultKafkaHeaders } from './constants'; export async function handleInitialOrderPlace( orderPlace: redisTestConstants.OffChainUpdateOrderPlaceUpdateMessage, @@ -23,6 +24,7 @@ export async function handleInitialOrderPlace( const message: KafkaMessage = createKafkaMessage( Buffer.from(Uint8Array.from(OffChainUpdateV1.encode(update).finish())), ); + message.headers = defaultKafkaHeaders; await onMessage(message); } @@ -36,6 +38,7 @@ export async function handleOrderUpdate( const message: KafkaMessage = createKafkaMessage( Buffer.from(Uint8Array.from(OffChainUpdateV1.encode(update).finish())), ); + message.headers = defaultKafkaHeaders; await onMessage(message); } diff --git a/indexer/services/vulcan/__tests__/helpers/websocket-helpers.ts b/indexer/services/vulcan/__tests__/helpers/websocket-helpers.ts index 79a2eee145..f97ff27ace 100644 --- a/indexer/services/vulcan/__tests__/helpers/websocket-helpers.ts +++ b/indexer/services/vulcan/__tests__/helpers/websocket-helpers.ts @@ -1,18 +1,21 @@ import { KafkaTopics } from '@dydxprotocol-indexer/kafka'; import { OffChainUpdateV1, OrderbookMessage, SubaccountMessage } from '@dydxprotocol-indexer/v4-protos'; -import { ProducerRecord } from 'kafkajs'; +import { IHeaders, ProducerRecord } from 'kafkajs'; export function expectWebsocketSubaccountMessage( subaccountProducerRecord: ProducerRecord, expectedSubaccountMessage: SubaccountMessage, + expectedHeaders: IHeaders, ): void { expect(subaccountProducerRecord.topic).toEqual(KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS); const subaccountMessageValueBinary: Uint8Array = new Uint8Array( subaccountProducerRecord.messages[0].value as Buffer, ); + const headers: IHeaders | undefined = subaccountProducerRecord.messages[0].headers; const subaccountMessage: SubaccountMessage = SubaccountMessage.decode( subaccountMessageValueBinary, ); + expect(headers).toEqual(expectedHeaders); expect(subaccountMessage).toEqual(expectedSubaccountMessage); } diff --git a/indexer/services/vulcan/__tests__/lib/on-message.test.ts b/indexer/services/vulcan/__tests__/lib/on-message.test.ts index 27167a7143..254d781dc2 100644 --- a/indexer/services/vulcan/__tests__/lib/on-message.test.ts +++ b/indexer/services/vulcan/__tests__/lib/on-message.test.ts @@ -64,7 +64,7 @@ describe('onMessage', () => { await onMessage(message); expect(handler).toHaveBeenCalledTimes(1); - expect(handleUpdateMock).toHaveBeenCalledWith(update); + expect(handleUpdateMock).toHaveBeenCalledWith(update, message.headers ?? {}); expect(handleUpdateMock).toHaveBeenCalledTimes(1); expect(stats.increment).toHaveBeenCalledWith('vulcan.received_kafka_message', 1); diff --git a/indexer/services/vulcan/src/handlers/handler.ts b/indexer/services/vulcan/src/handlers/handler.ts index 7c409d00a2..5c1a859b4b 100644 --- a/indexer/services/vulcan/src/handlers/handler.ts +++ b/indexer/services/vulcan/src/handlers/handler.ts @@ -4,6 +4,7 @@ import { } from '@dydxprotocol-indexer/kafka'; import { OrderbookMessageContents, PerpetualMarketFromDatabase, protocolTranslations } from '@dydxprotocol-indexer/postgres'; import { OffChainUpdateV1, OrderbookMessage, RedisOrder } from '@dydxprotocol-indexer/v4-protos'; +import { IHeaders } from 'kafkajs'; import { OrderbookSide } from 'src/lib/types'; import { orderSideToOrderbookSide } from './helpers'; @@ -15,11 +16,11 @@ export abstract class Handler { this.txHash = txHash; } - protected abstract handle(update: OffChainUpdateV1): Promise; + protected abstract handle(update: OffChainUpdateV1, headers: IHeaders): Promise; // TODO(DEC-1251): Add stats for message handling. - public async handleUpdate(update: OffChainUpdateV1): Promise { - return this.handle(update); + public async handleUpdate(update: OffChainUpdateV1, headers: IHeaders): Promise { + return this.handle(update, headers); } protected logAndThrowParseMessageError( diff --git a/indexer/services/vulcan/src/handlers/order-place-handler.ts b/indexer/services/vulcan/src/handlers/order-place-handler.ts index 41449dc2fd..77223d2bbe 100644 --- a/indexer/services/vulcan/src/handlers/order-place-handler.ts +++ b/indexer/services/vulcan/src/handlers/order-place-handler.ts @@ -32,7 +32,7 @@ import { RedisOrder, } from '@dydxprotocol-indexer/v4-protos'; import Big from 'big.js'; -import { Message } from 'kafkajs'; +import { IHeaders, Message } from 'kafkajs'; import config from '../config'; import { redisClient } from '../helpers/redis/redis-controller'; @@ -52,7 +52,7 @@ import { Handler } from './handler'; * being greater than or equal to the expiry of the order in the OrderPlace message, return */ export class OrderPlaceHandler extends Handler { - protected async handle(update: OffChainUpdateV1): Promise { + protected async handle(update: OffChainUpdateV1, headers: IHeaders): Promise { logger.info({ at: 'OrderPlaceHandler#handle', message: 'Received OffChainUpdate with OrderPlace.', @@ -142,7 +142,7 @@ export class OrderPlaceHandler extends Handler { }); throw new Error(`Stateful order not found in database: ${orderUuid}`); } - await this.sendCachedOrderUpdate(orderUuid); + await this.sendCachedOrderUpdate(orderUuid, headers); } const subaccountMessage: Message = { value: createSubaccountWebsocketMessage( @@ -151,6 +151,7 @@ export class OrderPlaceHandler extends Handler { perpetualMarket, placementStatus, ), + headers, }; sendMessageWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS); } @@ -164,6 +165,7 @@ export class OrderPlaceHandler extends Handler { perpetualMarket, updatedQuantums, ), + headers, }; sendMessageWrapper(orderbookMessage, KafkaTopics.TO_WEBSOCKETS_ORDERBOOKS); } @@ -334,6 +336,7 @@ export class OrderPlaceHandler extends Handler { */ protected async sendCachedOrderUpdate( orderId: string, + headers: IHeaders, ): Promise { const cachedOrderUpdate: OrderUpdateV1 | undefined = await StatefulOrderUpdatesCache .removeStatefulOrderUpdate( @@ -351,6 +354,7 @@ export class OrderPlaceHandler extends Handler { value: Buffer.from( Uint8Array.from(OffChainUpdateV1.encode({ orderUpdate: cachedOrderUpdate }).finish()), ), + headers, }; sendMessageWrapper(orderUpdateMessage, KafkaTopics.TO_VULCAN); } diff --git a/indexer/services/vulcan/src/handlers/order-remove-handler.ts b/indexer/services/vulcan/src/handlers/order-remove-handler.ts index 428c572780..e185f89e49 100644 --- a/indexer/services/vulcan/src/handlers/order-remove-handler.ts +++ b/indexer/services/vulcan/src/handlers/order-remove-handler.ts @@ -38,7 +38,7 @@ import { SubaccountMessage, } from '@dydxprotocol-indexer/v4-protos'; import { Big } from 'big.js'; -import { Message } from 'kafkajs'; +import { IHeaders, Message } from 'kafkajs'; import config from '../config'; import { redisClient } from '../helpers/redis/redis-controller'; @@ -68,7 +68,7 @@ import { getStateRemainingQuantums } from './helpers'; */ export class OrderRemoveHandler extends Handler { // eslint-disable-next-line @typescript-eslint/require-await - protected async handle(update: OffChainUpdateV1): Promise { + protected async handle(update: OffChainUpdateV1, headers: IHeaders): Promise { logger.info({ at: 'OrderRemoveHandler#handle', message: 'Received OffChainUpdate with OrderRemove.', @@ -131,11 +131,11 @@ export class OrderRemoveHandler extends Handler { } if (this.isStatefulOrderCancelation(orderRemove)) { - await this.handleStatefulOrderCancelation(orderRemove, removeOrderResult); + await this.handleStatefulOrderCancelation(orderRemove, removeOrderResult, headers); return; } - await this.handleOrderRemoval(orderRemove, removeOrderResult); + await this.handleOrderRemoval(orderRemove, removeOrderResult, headers); } protected validateOrderRemove(orderRemove: OrderRemoveV1): void { @@ -193,6 +193,7 @@ export class OrderRemoveHandler extends Handler { protected async handleStatefulOrderCancelation( orderRemove: OrderRemoveV1, removeOrderResult: RemoveOrderResult, + headers: IHeaders, ): Promise { const order: OrderFromDatabase | undefined = await runFuncWithTimingStat( OrderTable.findById( @@ -230,6 +231,7 @@ export class OrderRemoveHandler extends Handler { orderRemove, perpetualMarket.ticker, ), + headers, }; sendMessageWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS); @@ -241,7 +243,7 @@ export class OrderRemoveHandler extends Handler { removeOrderResult.removed && removeOrderResult.restingOnBook === true && !requiresImmediateExecution(removeOrderResult.removedOrder!.order!.timeInForce)) { - await this.updateOrderbook(removeOrderResult, perpetualMarket); + await this.updateOrderbook(removeOrderResult, perpetualMarket, headers); } } @@ -259,6 +261,7 @@ export class OrderRemoveHandler extends Handler { protected async handleOrderRemoval( orderRemove: OrderRemoveV1, removeOrderResult: RemoveOrderResult, + headers: IHeaders, ): Promise { if (!removeOrderResult.removed) { logger.info({ @@ -306,6 +309,7 @@ export class OrderRemoveHandler extends Handler { orderRemove, perpetualMarket, ), + headers, }; if (this.shouldSendSubaccountMessage(orderRemove, removeOrderResult, stateRemainingQuantums)) { @@ -322,7 +326,7 @@ export class OrderRemoveHandler extends Handler { !remainingQuantums.eq('0') && removeOrderResult.restingOnBook !== false && !requiresImmediateExecution(removeOrderResult.removedOrder!.order!.timeInForce)) { - await this.updateOrderbook(removeOrderResult, perpetualMarket); + await this.updateOrderbook(removeOrderResult, perpetualMarket, headers); } // TODO: consolidate remove handler logic into a single lua script. await this.addOrderToCanceledOrdersCache( @@ -339,6 +343,7 @@ export class OrderRemoveHandler extends Handler { protected async updateOrderbook( removeOrderResult: RemoveOrderResult, perpetualMarket: PerpetualMarketFromDatabase, + headers: IHeaders, ): Promise { const updatedQuantums: number = await runFuncWithTimingStat( this.updatePriceLevelsCache( @@ -352,6 +357,7 @@ export class OrderRemoveHandler extends Handler { perpetualMarket, updatedQuantums, ), + headers, }; sendMessageWrapper(orderbookMessage, KafkaTopics.TO_WEBSOCKETS_ORDERBOOKS); } diff --git a/indexer/services/vulcan/src/handlers/order-update-handler.ts b/indexer/services/vulcan/src/handlers/order-update-handler.ts index 950b7ee73f..4fe9ea05f3 100644 --- a/indexer/services/vulcan/src/handlers/order-update-handler.ts +++ b/indexer/services/vulcan/src/handlers/order-update-handler.ts @@ -24,7 +24,7 @@ import { RedisOrder, } from '@dydxprotocol-indexer/v4-protos'; import Big from 'big.js'; -import { Message } from 'kafkajs'; +import { IHeaders, Message } from 'kafkajs'; import config from '../config'; import { redisClient } from '../helpers/redis/redis-controller'; @@ -51,7 +51,7 @@ import { Handler } from './handler'; * price level is capped to the size of the order in quantums */ export class OrderUpdateHandler extends Handler { - protected async handle(update: OffChainUpdateV1): Promise { + protected async handle(update: OffChainUpdateV1, headers: IHeaders): Promise { logger.info({ at: 'OrderUpdateHandler#handle', message: 'Received OffChainUpdate with OrderUpdate.', @@ -171,6 +171,7 @@ export class OrderUpdateHandler extends Handler { perpetualMarket, updatedQuantums, ), + headers, }; sendMessageWrapper(orderbookMessage, KafkaTopics.TO_WEBSOCKETS_ORDERBOOKS); } diff --git a/indexer/services/vulcan/src/lib/on-message.ts b/indexer/services/vulcan/src/lib/on-message.ts index 10c49ecf39..0684867448 100644 --- a/indexer/services/vulcan/src/lib/on-message.ts +++ b/indexer/services/vulcan/src/lib/on-message.ts @@ -62,6 +62,19 @@ export async function onMessage(message: KafkaMessage): Promise { }, ); + const originalMessageTimestamp = message.headers?.message_received_timestamp; + if (originalMessageTimestamp !== undefined) { + stats.timing( + `${config.SERVICE_NAME}.message_time_since_received`, + start - Number(originalMessageTimestamp), + STATS_NO_SAMPLING, + { + topic: KafkaTopics.TO_VULCAN, + event_type: String(message.headers?.event_type), + }, + ); + } + const messageValue: Buffer = message.value; const offset: string = message.offset; let update: OffChainUpdateV1; @@ -86,7 +99,40 @@ export async function onMessage(message: KafkaMessage): Promise { const handler: Handler = new (getHandler(update))!( getTransactionHashFromHeaders(message.headers), ); - await handler.handleUpdate(update); + + // If headers don't exist, create them. + const headers = message.headers ?? {}; + // If the message received timestamp doesn't exist + // (i.e when a short term order is directly sent to vulcan via full node) + // set the message_received_timestamp to the message timestamp and the event type + // to be a short term order event type. + if (!headers.message_received_timestamp) { + headers.message_received_timestamp = message.timestamp; + } + if (!headers.event_type) { + if (update.orderPlace) { + headers.event_type = 'ShortTermOrderPlacement'; + } else if (update.orderRemove) { + headers.event_type = 'ShortTermOrderRemoval'; + } else if (update.orderUpdate) { + headers.event_type = 'ShortTermOrderUpdate'; + } + } + + await handler.handleUpdate(update, headers); + + const postProcessingTime: number = Date.now(); + if (originalMessageTimestamp !== undefined) { + stats.timing( + `${config.SERVICE_NAME}.message_time_since_received_post_processing`, + postProcessingTime - Number(originalMessageTimestamp), + STATS_NO_SAMPLING, + { + topic: KafkaTopics.TO_VULCAN, + event_type: String(message.headers?.event_type), + }, + ); + } success = true; } catch (error) {