From 3e0968197bba57a2fba67a69b934be9b826852fd Mon Sep 17 00:00:00 2001 From: Will Liu Date: Wed, 18 Oct 2023 14:35:02 -0400 Subject: [PATCH] add tests --- .../helpers/kafka-publisher-helpers.ts | 11 +- .../__tests__/lib/kafka-publisher.test.ts | 161 +++++++++++++++++- .../services/ender/src/lib/kafka-publisher.ts | 17 +- indexer/services/ender/src/lib/types.ts | 8 + 4 files changed, 190 insertions(+), 7 deletions(-) diff --git a/indexer/services/ender/__tests__/helpers/kafka-publisher-helpers.ts b/indexer/services/ender/__tests__/helpers/kafka-publisher-helpers.ts index 20c6e174a2..472b8ef6c5 100644 --- a/indexer/services/ender/__tests__/helpers/kafka-publisher-helpers.ts +++ b/indexer/services/ender/__tests__/helpers/kafka-publisher-helpers.ts @@ -2,7 +2,7 @@ import { TRADES_WEBSOCKET_MESSAGE_VERSION, KafkaTopics } from '@dydxprotocol-ind import { testConstants, TradeContent, TradeMessageContents } from '@dydxprotocol-indexer/postgres'; import { TradeMessage } from '@dydxprotocol-indexer/v4-protos'; -import { ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types'; +import { AnnotatedSubaccountMessage, ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types'; export function contentToTradeMessage( tradeContent: TradeContent, @@ -40,3 +40,12 @@ export function createConsolidatedKafkaEventFromTrade( message: trade, }; } + +export function createConsolidatedKafkaEventFromSubaccount( + subaccount: AnnotatedSubaccountMessage, +): ConsolidatedKafkaEvent { + return { + topic: KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS, + message: subaccount, + }; +} diff --git a/indexer/services/ender/__tests__/lib/kafka-publisher.test.ts b/indexer/services/ender/__tests__/lib/kafka-publisher.test.ts index 973559f068..4a5d79728a 100644 --- a/indexer/services/ender/__tests__/lib/kafka-publisher.test.ts +++ b/indexer/services/ender/__tests__/lib/kafka-publisher.test.ts @@ -5,7 +5,12 @@ import { testConstants, TradeContent, TradeMessageContents } from '@dydxprotocol import { SubaccountMessage, TradeMessage } from '@dydxprotocol-indexer/v4-protos'; import Big from 'big.js'; import _ from 'lodash'; -import { ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types'; +import { + AnnotatedSubaccountMessage, + ConsolidatedKafkaEvent, + convertToSubaccountMessage, + SingleTradeMessage, +} from '../../src/lib/types'; import { KafkaPublisher } from '../../src/lib/kafka-publisher'; import { @@ -17,6 +22,7 @@ import { import { contentToSingleTradeMessage, contentToTradeMessage, + createConsolidatedKafkaEventFromSubaccount, createConsolidatedKafkaEventFromTrade, } from '../helpers/kafka-publisher-helpers'; @@ -148,6 +154,159 @@ describe('kafka-publisher', () => { }); }); + describe('sortSubaccountEvents', () => { + const subaccount: AnnotatedSubaccountMessage = defaultSubaccountMessage; + const consolidatedSubaccount: + ConsolidatedKafkaEvent = createConsolidatedKafkaEventFromSubaccount(subaccount); + it.each([ + [ + 'blockHeight', + { + ...subaccount, + blockHeight: Big(subaccount.blockHeight).minus(1).toString(), + }, + { + ...subaccount, + blockHeight: Big(subaccount.blockHeight).plus(1).toString(), + }, + ], + [ + 'transactionIndex', + { + ...subaccount, + transactionIndex: subaccount.transactionIndex - 1, + }, + { + ...subaccount, + transactionIndex: subaccount.transactionIndex + 1, + }, + ], + [ + 'eventIndex', + { + ...subaccount, + eventIndex: subaccount.eventIndex - 1, + }, + { + ...subaccount, + eventIndex: subaccount.eventIndex + 1, + }, + ], + ])('successfully subaccounts events by %s', ( + _field: string, + beforeSubaccount: AnnotatedSubaccountMessage, + afterSubaccount: AnnotatedSubaccountMessage, + ) => { + const publisher: KafkaPublisher = new KafkaPublisher(); + const consolidatedBeforeSubaccount: + ConsolidatedKafkaEvent = createConsolidatedKafkaEventFromSubaccount( + beforeSubaccount, + ); + const consolidatedAfterSubaccount: + ConsolidatedKafkaEvent = createConsolidatedKafkaEventFromSubaccount( + afterSubaccount, + ); + + publisher.addEvents([ + consolidatedAfterSubaccount, + consolidatedSubaccount, + consolidatedBeforeSubaccount, + ]); + + publisher.sortEvents(KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS); + expect(publisher.subaccountMessages).toEqual([beforeSubaccount, subaccount, afterSubaccount]); + }); + }); + + describe('retainLastFillEventsForSubaccountMessages', () => { + it('successfully retains the last fill event per order id and sorts messages', async () => { + const publisher: KafkaPublisher = new KafkaPublisher(); + + // over-written by message 3. + const message1: AnnotatedSubaccountMessage = { + blockHeight: '1', + transactionIndex: 1, + eventIndex: 1, + contents: 'Message 1', + subaccountId: { + owner: 'owner1', + number: 0, + }, + version: '1', + orderId: 'order1', + isFill: true, + }; + + const message2: AnnotatedSubaccountMessage = { + blockHeight: '1', + transactionIndex: 2, + eventIndex: 2, + contents: 'Message 2', + subaccountId: { + owner: 'owner2', + number: 0, + }, + version: '1', + orderId: 'order2', + isFill: true, + }; + + const message3: AnnotatedSubaccountMessage = { + blockHeight: '1', + transactionIndex: 3, + eventIndex: 3, + contents: 'Message 3', + subaccountId: { + owner: 'owner3', + number: 0, + }, + version: '1', + orderId: 'order1', + isFill: true, + }; + + // non-fill subaccount message. + const message4: AnnotatedSubaccountMessage = { + blockHeight: '1', + transactionIndex: 3, + eventIndex: 4, + contents: 'Message 3', + subaccountId: { + owner: 'owner3', + number: 0, + }, + version: '4', + }; + + publisher.addEvents([ + createConsolidatedKafkaEventFromSubaccount(message1), + createConsolidatedKafkaEventFromSubaccount(message2), + createConsolidatedKafkaEventFromSubaccount(message3), + createConsolidatedKafkaEventFromSubaccount(message4), + ]); + + publisher.retainLastFillEventsForSubaccountMessages(); + const expectedMsgs: SubaccountMessage[] = [ + convertToSubaccountMessage(message2), + convertToSubaccountMessage(message3), + convertToSubaccountMessage(message4), + ]; + expect(publisher.subaccountMessages).toEqual(expectedMsgs); + + await publisher.publish(); + + expect(producerSendMock).toHaveBeenCalledTimes(1); + expect(producerSendMock).toHaveBeenCalledWith({ + topic: KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS, + messages: _.map(expectedMsgs, (message: SubaccountMessage) => { + return { + value: Buffer.from(Uint8Array.from(SubaccountMessage.encode(message).finish())), + }; + }), + }); + }); + }); + describe('groupKafkaTradesByClobPairId', () => { it('successfully groups kafka trade messages', () => { const kafkaPublisher: KafkaPublisher = new KafkaPublisher(); diff --git a/indexer/services/ender/src/lib/kafka-publisher.ts b/indexer/services/ender/src/lib/kafka-publisher.ts index 4808893d33..ea174c980f 100644 --- a/indexer/services/ender/src/lib/kafka-publisher.ts +++ b/indexer/services/ender/src/lib/kafka-publisher.ts @@ -10,11 +10,16 @@ import { TradeMessageContents } from '@dydxprotocol-indexer/postgres'; import { CandleMessage, MarketMessage, OffChainUpdateV1, SubaccountMessage, TradeMessage, } from '@dydxprotocol-indexer/v4-protos'; +import Big from 'big.js'; import _ from 'lodash'; import config from '../config'; import { - AnnotatedSubaccountMessage, ConsolidatedKafkaEvent, SingleTradeMessage, VulcanMessage, + AnnotatedSubaccountMessage, + ConsolidatedKafkaEvent, + convertToSubaccountMessage, + SingleTradeMessage, + VulcanMessage, } from './types'; type TopicKafkaMessages = { @@ -99,7 +104,9 @@ export class KafkaPublisher { } }); - this.subaccountMessages = Object.values(lastFillEvents).concat(nonFillEvents); + this.subaccountMessages = Object.values(lastFillEvents) + .concat(nonFillEvents) + .map((annotatedMessage) => convertToSubaccountMessage(annotatedMessage)); this.sortEvents(KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS); } @@ -131,9 +138,9 @@ export class KafkaPublisher { if (msgs) { msgs.sort((a: OrderedMessage, b: OrderedMessage) => { - if (a.blockHeight < b.blockHeight) { + if (Big(a.blockHeight).lt(b.blockHeight)) { return -1; - } else if (a.blockHeight > b.blockHeight) { + } else if (Big(a.blockHeight).gt(b.blockHeight)) { return 1; } @@ -143,7 +150,7 @@ export class KafkaPublisher { return 1; } - return a.eventIndex - b.eventIndex; + return a.eventIndex < b.eventIndex ? -1 : 1; }); } } diff --git a/indexer/services/ender/src/lib/types.ts b/indexer/services/ender/src/lib/types.ts index 9d211526fc..6434f3ae9a 100644 --- a/indexer/services/ender/src/lib/types.ts +++ b/indexer/services/ender/src/lib/types.ts @@ -31,6 +31,7 @@ import { UpdatePerpetualEventV1, UpdateClobPairEventV1, } from '@dydxprotocol-indexer/v4-protos'; +import _ from 'lodash'; import Long from 'long'; import { DateTime } from 'luxon'; @@ -193,6 +194,13 @@ export interface AnnotatedSubaccountMessage extends SubaccountMessage { isFill?: boolean, } +export function convertToSubaccountMessage( + annotatedMessage: AnnotatedSubaccountMessage, +): SubaccountMessage { + const subaccountMessage: SubaccountMessage = _.omit(annotatedMessage, ['orderId', 'isFill']); + return subaccountMessage; +} + export interface VulcanMessage { key: Buffer, value: OffChainUpdateV1,