diff --git a/indexer/services/ender/__tests__/helpers/kafka-publisher-helpers.ts b/indexer/services/ender/__tests__/helpers/kafka-publisher-helpers.ts index 20c6e174a2..472b8ef6c5 100644 --- a/indexer/services/ender/__tests__/helpers/kafka-publisher-helpers.ts +++ b/indexer/services/ender/__tests__/helpers/kafka-publisher-helpers.ts @@ -2,7 +2,7 @@ import { TRADES_WEBSOCKET_MESSAGE_VERSION, KafkaTopics } from '@dydxprotocol-ind import { testConstants, TradeContent, TradeMessageContents } from '@dydxprotocol-indexer/postgres'; import { TradeMessage } from '@dydxprotocol-indexer/v4-protos'; -import { ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types'; +import { AnnotatedSubaccountMessage, ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types'; export function contentToTradeMessage( tradeContent: TradeContent, @@ -40,3 +40,12 @@ export function createConsolidatedKafkaEventFromTrade( message: trade, }; } + +export function createConsolidatedKafkaEventFromSubaccount( + subaccount: AnnotatedSubaccountMessage, +): ConsolidatedKafkaEvent { + return { + topic: KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS, + message: subaccount, + }; +} diff --git a/indexer/services/ender/__tests__/lib/kafka-publisher.test.ts b/indexer/services/ender/__tests__/lib/kafka-publisher.test.ts index 86e88f824f..add0830794 100644 --- a/indexer/services/ender/__tests__/lib/kafka-publisher.test.ts +++ b/indexer/services/ender/__tests__/lib/kafka-publisher.test.ts @@ -1,20 +1,51 @@ import { - producer, - TRADES_WEBSOCKET_MESSAGE_VERSION, - KafkaTopics, - ProducerMessage, + KafkaTopics, producer, ProducerMessage, TRADES_WEBSOCKET_MESSAGE_VERSION, } from '@dydxprotocol-indexer/kafka'; -import { testConstants, TradeContent, TradeMessageContents } from '@dydxprotocol-indexer/postgres'; -import { SubaccountMessage, TradeMessage } from '@dydxprotocol-indexer/v4-protos'; +import { + FillFromDatabase, + FillTable, + FillType, + Liquidity, + OrderFromDatabase, + OrderSide, + OrderStatus, + SubaccountMessageContents, + SubaccountTable, + testConstants, + TradeContent, + TradeMessageContents, + TransferFromDatabase, +} from '@dydxprotocol-indexer/postgres'; +import { IndexerSubaccountId, SubaccountMessage, TradeMessage } from '@dydxprotocol-indexer/v4-protos'; import Big from 'big.js'; import _ from 'lodash'; -import { ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types'; +import { + AnnotatedSubaccountMessage, + ConsolidatedKafkaEvent, + SingleTradeMessage, +} from '../../src/lib/types'; import { KafkaPublisher } from '../../src/lib/kafka-publisher'; import { - defaultSubaccountMessage, defaultTradeContent, defaultTradeMessage, defaultTradeKafkaEvent, + defaultSubaccountMessage, + defaultTradeContent, + defaultTradeKafkaEvent, + defaultTradeMessage, + defaultWalletAddress, } from '../helpers/constants'; -import { contentToSingleTradeMessage, contentToTradeMessage, createConsolidatedKafkaEventFromTrade } from '../helpers/kafka-publisher-helpers'; +import { + contentToSingleTradeMessage, + contentToTradeMessage, + createConsolidatedKafkaEventFromSubaccount, + createConsolidatedKafkaEventFromTrade, +} from '../helpers/kafka-publisher-helpers'; +import { + generateFillSubaccountMessage, + generateOrderSubaccountMessage, + generateTransferContents, +} from '../../src/helpers/kafka-helper'; +import { DateTime } from 'luxon'; +import { convertToSubaccountMessage } from '../../src/lib/helper'; describe('kafka-publisher', () => { let producerSendMock: jest.SpyInstance; @@ -139,11 +170,238 @@ describe('kafka-publisher', () => { consolidatedBeforeTrade, ]); - publisher.sortTradeEvents(); + publisher.sortEvents(publisher.tradeMessages); expect(publisher.tradeMessages).toEqual([beforeTrade, trade, afterTrade]); }); }); + describe('sortSubaccountEvents', () => { + const subaccount: AnnotatedSubaccountMessage = defaultSubaccountMessage; + const consolidatedSubaccount: + ConsolidatedKafkaEvent = createConsolidatedKafkaEventFromSubaccount(subaccount); + it.each([ + [ + 'blockHeight', + { + ...subaccount, + blockHeight: Big(subaccount.blockHeight).minus(1).toString(), + }, + { + ...subaccount, + blockHeight: Big(subaccount.blockHeight).plus(1).toString(), + }, + ], + [ + 'transactionIndex', + { + ...subaccount, + transactionIndex: subaccount.transactionIndex - 1, + }, + { + ...subaccount, + transactionIndex: subaccount.transactionIndex + 1, + }, + ], + [ + 'eventIndex', + { + ...subaccount, + eventIndex: subaccount.eventIndex - 1, + }, + { + ...subaccount, + eventIndex: subaccount.eventIndex + 1, + }, + ], + ])('successfully subaccounts events by %s', ( + _field: string, + beforeSubaccount: AnnotatedSubaccountMessage, + afterSubaccount: AnnotatedSubaccountMessage, + ) => { + const publisher: KafkaPublisher = new KafkaPublisher(); + const consolidatedBeforeSubaccount: + ConsolidatedKafkaEvent = createConsolidatedKafkaEventFromSubaccount( + beforeSubaccount, + ); + const consolidatedAfterSubaccount: + ConsolidatedKafkaEvent = createConsolidatedKafkaEventFromSubaccount( + afterSubaccount, + ); + + publisher.addEvents([ + consolidatedAfterSubaccount, + consolidatedSubaccount, + consolidatedBeforeSubaccount, + ]); + + publisher.sortEvents(publisher.subaccountMessages); + expect(publisher.subaccountMessages).toEqual([beforeSubaccount, subaccount, afterSubaccount]); + }); + }); + + describe('aggregateFillEventsForSubaccountMessages', () => { + const fill: FillFromDatabase = { + id: FillTable.uuid(testConstants.defaultTendermintEventId, Liquidity.TAKER), + subaccountId: testConstants.defaultSubaccountId, + side: OrderSide.BUY, + liquidity: Liquidity.TAKER, + type: FillType.MARKET, + clobPairId: '1', + orderId: testConstants.defaultOrderId, + size: '10', + price: '20000', + quoteAmount: '200000', + eventId: testConstants.defaultTendermintEventId, + transactionHash: '', // TODO: Add a real transaction Hash + createdAt: testConstants.createdDateTime.toISO(), + createdAtHeight: testConstants.createdHeight, + clientMetadata: '0', + fee: '1.1', + }; + const order: OrderFromDatabase = { + ...testConstants.defaultOrderGoodTilBlockTime, + id: testConstants.defaultOrderId, + }; + + const recipientSubaccountId: IndexerSubaccountId = IndexerSubaccountId.fromPartial({ + owner: 'recipient', + number: 1, + }); + const deposit: TransferFromDatabase = { + id: '', + senderWalletAddress: defaultWalletAddress, + recipientSubaccountId: SubaccountTable.uuid( + recipientSubaccountId.owner, + recipientSubaccountId.number, + ), + assetId: testConstants.defaultAsset.id, + size: '10', + eventId: testConstants.defaultTendermintEventId, + transactionHash: 'hash', + createdAt: DateTime.utc().toISO(), + createdAtHeight: '1', + }; + it('successfully aggregates all fill events per order id and sorts messages', async () => { + const publisher: KafkaPublisher = new KafkaPublisher(); + + // merged with message 3. + const msg1Contents: SubaccountMessageContents = { + fills: [ + generateFillSubaccountMessage(fill, 'BTC-USD'), + ], + orders: [ + generateOrderSubaccountMessage(order, 'BTC-USD'), + ], + }; + const message1: AnnotatedSubaccountMessage = { + blockHeight: '1', + transactionIndex: 1, + eventIndex: 1, + contents: JSON.stringify(msg1Contents), + subaccountId: { + owner: 'owner1', + number: 0, + }, + version: '1', + orderId: 'order1', + isFill: true, + subaccountMessageContents: msg1Contents, + }; + + const msg2Contents: SubaccountMessageContents = { + fills: [ + generateFillSubaccountMessage(fill, 'ETH-USD'), + ], + }; + const message2: AnnotatedSubaccountMessage = { + ...message1, + transactionIndex: 2, + contents: JSON.stringify(msg2Contents), + orderId: 'order2', + subaccountMessageContents: msg2Contents, + }; + + const msg3Contents: SubaccountMessageContents = { + fills: [ + generateFillSubaccountMessage({ + ...fill, + size: '100', + }, 'BTC-USD'), + ], + orders: [ + generateOrderSubaccountMessage({ + ...order, + status: OrderStatus.FILLED, + }, 'BTC-USD'), + ], + }; + const message3: AnnotatedSubaccountMessage = { + ...message1, + transactionIndex: 3, + contents: JSON.stringify(msg3Contents), + subaccountMessageContents: msg3Contents, + }; + + // non-fill subaccount message. + const msg4Contents: SubaccountMessageContents = generateTransferContents( + deposit, + testConstants.defaultAsset, + recipientSubaccountId, + undefined, + recipientSubaccountId, + ); + const message4: AnnotatedSubaccountMessage = { + ...message1, + eventIndex: 4, + orderId: undefined, + isFill: undefined, + contents: JSON.stringify(msg4Contents), + }; + + const expectedMergedContents: SubaccountMessageContents = { + fills: [ + msg1Contents.fills![0], + msg3Contents.fills![0], + ], + orders: [ + msg3Contents.orders![0], + ], + }; + const mergedMessage3: AnnotatedSubaccountMessage = { + ...message3, + contents: JSON.stringify(expectedMergedContents), + subaccountMessageContents: expectedMergedContents, + }; + + publisher.addEvents([ + createConsolidatedKafkaEventFromSubaccount(message1), + createConsolidatedKafkaEventFromSubaccount(message2), + createConsolidatedKafkaEventFromSubaccount(message3), + createConsolidatedKafkaEventFromSubaccount(message4), + ]); + + publisher.aggregateFillEventsForSubaccountMessages(); + const expectedMsgs: SubaccountMessage[] = [ + convertToSubaccountMessage(message4), + convertToSubaccountMessage(message2), + convertToSubaccountMessage(mergedMessage3), + ]; + expect(publisher.subaccountMessages).toEqual(expectedMsgs); + + await publisher.publish(); + + expect(producerSendMock).toHaveBeenCalledTimes(1); + expect(producerSendMock).toHaveBeenCalledWith({ + topic: KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS, + messages: _.map(expectedMsgs, (message: SubaccountMessage) => { + return { + value: Buffer.from(Uint8Array.from(SubaccountMessage.encode(message).finish())), + }; + }), + }); + }); + }); + describe('groupKafkaTradesByClobPairId', () => { it('successfully groups kafka trade messages', () => { const kafkaPublisher: KafkaPublisher = new KafkaPublisher(); diff --git a/indexer/services/ender/src/handlers/handler.ts b/indexer/services/ender/src/handlers/handler.ts index fdcf81c0ca..95fa0a54a7 100644 --- a/indexer/services/ender/src/handlers/handler.ts +++ b/indexer/services/ender/src/handlers/handler.ts @@ -10,19 +10,21 @@ import { TRADES_WEBSOCKET_MESSAGE_VERSION, KafkaTopics, } from '@dydxprotocol-indexer/kafka'; +import { SubaccountMessageContents } from '@dydxprotocol-indexer/postgres'; import { IndexerTendermintBlock, IndexerTendermintEvent, MarketMessage, OffChainUpdateV1, SubaccountId, - SubaccountMessage, } from '@dydxprotocol-indexer/v4-protos'; import { DateTime } from 'luxon'; import config from '../config'; import { indexerTendermintEventToTransactionIndex } from '../lib/helper'; -import { ConsolidatedKafkaEvent, EventMessage, SingleTradeMessage } from '../lib/types'; +import { + AnnotatedSubaccountMessage, ConsolidatedKafkaEvent, EventMessage, SingleTradeMessage, +} from '../lib/types'; export type HandlerInitializer = new ( block: IndexerTendermintBlock, @@ -103,9 +105,12 @@ export abstract class Handler { protected generateConsolidatedSubaccountKafkaEvent( contents: string, subaccountId: SubaccountId, + orderId?: string, + isFill?: boolean, + subaccountMessageContents?: SubaccountMessageContents, ): ConsolidatedKafkaEvent { stats.increment(`${config.SERVICE_NAME}.create_subaccount_kafka_event`, 1); - const subaccountMessage: SubaccountMessage = { + const subaccountMessage: AnnotatedSubaccountMessage = { blockHeight: this.block.height.toString(), transactionIndex: indexerTendermintEventToTransactionIndex( this.indexerTendermintEvent, @@ -114,6 +119,9 @@ export abstract class Handler { contents, subaccountId, version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, + orderId, + isFill, + subaccountMessageContents, }; return { diff --git a/indexer/services/ender/src/handlers/order-fills/abstract-order-fill-handler.ts b/indexer/services/ender/src/handlers/order-fills/abstract-order-fill-handler.ts index 12f8ba7fac..1d42fe4da0 100644 --- a/indexer/services/ender/src/handlers/order-fills/abstract-order-fill-handler.ts +++ b/indexer/services/ender/src/handlers/order-fills/abstract-order-fill-handler.ts @@ -367,6 +367,9 @@ export abstract class AbstractOrderFillHandler extends Handler { return this.generateConsolidatedSubaccountKafkaEvent( JSON.stringify(message), subaccountIdProto, + order?.id, + true, + message, ); } diff --git a/indexer/services/ender/src/lib/candles-generator.ts b/indexer/services/ender/src/lib/candles-generator.ts index fd4747b45b..27cd7538f3 100644 --- a/indexer/services/ender/src/lib/candles-generator.ts +++ b/indexer/services/ender/src/lib/candles-generator.ts @@ -1,23 +1,23 @@ import { stats } from '@dydxprotocol-indexer/base'; import { CANDLES_WEBSOCKET_MESSAGE_VERSION, KafkaTopics } from '@dydxprotocol-indexer/kafka'; import { + CANDLE_RESOLUTION_TO_PROTO, + CandleColumns, CandleCreateObject, CandleFromDatabase, + CandleMessageContents, CandleResolution, CandleTable, CandleUpdateObject, - CANDLE_RESOLUTION_TO_PROTO, + MarketOpenInterest, + NUM_SECONDS_IN_CANDLE_RESOLUTIONS, + Options, + PerpetualMarketColumns, + PerpetualMarketFromDatabase, perpetualMarketRefresher, + PerpetualPositionTable, TradeContent, TradeMessageContents, - CandleMessageContents, - CandleColumns, - PerpetualPositionTable, - PerpetualMarketFromDatabase, - MarketOpenInterest, - PerpetualMarketColumns, - Options, - NUM_SECONDS_IN_CANDLE_RESOLUTIONS, } from '@dydxprotocol-indexer/postgres'; import { CandleMessage } from '@dydxprotocol-indexer/v4-protos'; import Big from 'big.js'; @@ -69,7 +69,7 @@ export class CandlesGenerator { public async updateCandles(): Promise { const start: number = Date.now(); // 1. Sort trade messages by order in the block - this.kafkaPublisher.sortTradeEvents(); + this.kafkaPublisher.sortEvents(this.kafkaPublisher.tradeMessages); // 2. Generate BlockCandleUpdatesMap const blockCandleUpdatesMap: BlockCandleUpdatesMap = this.generateBlockCandleUpdatesMap(); diff --git a/indexer/services/ender/src/lib/helper.ts b/indexer/services/ender/src/lib/helper.ts index e9dfab6812..34d048e123 100644 --- a/indexer/services/ender/src/lib/helper.ts +++ b/indexer/services/ender/src/lib/helper.ts @@ -21,8 +21,10 @@ import { LiquidityTierUpsertEventV1, UpdatePerpetualEventV1, UpdateClobPairEventV1, + SubaccountMessage, } from '@dydxprotocol-indexer/v4-protos'; import Big from 'big.js'; +import _ from 'lodash'; import { DateTime } from 'luxon'; import { @@ -30,6 +32,7 @@ import { SECONDS_IN_MILLIS, } from '../constants'; import { + AnnotatedSubaccountMessage, DydxIndexerSubtypes, EventProtoWithTypeAndVersion, } from './types'; @@ -55,6 +58,16 @@ export function indexerTendermintEventToTransactionIndex( ); } +export function convertToSubaccountMessage( + annotatedMessage: AnnotatedSubaccountMessage, +): SubaccountMessage { + const subaccountMessage: SubaccountMessage = _.omit( + annotatedMessage, + ['orderId', 'isFill', 'subaccountMessageContents'], + ); + return subaccountMessage; +} + export function protoTimestampToDate( protoTime: Timestamp, ): Date { diff --git a/indexer/services/ender/src/lib/kafka-publisher.ts b/indexer/services/ender/src/lib/kafka-publisher.ts index c74e8f9c3c..36aa302d15 100644 --- a/indexer/services/ender/src/lib/kafka-publisher.ts +++ b/indexer/services/ender/src/lib/kafka-publisher.ts @@ -6,23 +6,35 @@ import { ProducerMessage, TRADES_WEBSOCKET_MESSAGE_VERSION, } from '@dydxprotocol-indexer/kafka'; -import { TradeMessageContents } from '@dydxprotocol-indexer/postgres'; +import { FillSubaccountMessageContents, TradeMessageContents } from '@dydxprotocol-indexer/postgres'; import { - CandleMessage, MarketMessage, OffChainUpdateV1, SubaccountMessage, TradeMessage, + CandleMessage, + MarketMessage, + OffChainUpdateV1, + SubaccountMessage, + TradeMessage, } from '@dydxprotocol-indexer/v4-protos'; import Big from 'big.js'; import _ from 'lodash'; import config from '../config'; -import { ConsolidatedKafkaEvent, SingleTradeMessage, VulcanMessage } from './types'; +import { convertToSubaccountMessage } from './helper'; +import { + AnnotatedSubaccountMessage, ConsolidatedKafkaEvent, SingleTradeMessage, VulcanMessage, +} from './types'; type TopicKafkaMessages = { topic: KafkaTopics; messages: ProducerMessage[]; }; +type OrderedMessage = AnnotatedSubaccountMessage | SingleTradeMessage; + +type Message = AnnotatedSubaccountMessage | SingleTradeMessage | MarketMessage | +CandleMessage | VulcanMessage; + export class KafkaPublisher { - subaccountMessages: SubaccountMessage[]; + subaccountMessages: AnnotatedSubaccountMessage[]; tradeMessages: SingleTradeMessage[]; marketMessages: MarketMessage[]; candleMessages: CandleMessage[]; @@ -43,49 +55,124 @@ export class KafkaPublisher { } public addEvent(event: ConsolidatedKafkaEvent) { - switch (event.topic) { + this.getMessages(event.topic)!.push(event.message); + } + + /** + * Helper function to get messages for a given topic. + * + * @param kafkaTopic + * @private + */ + private getMessages(kafkaTopic: KafkaTopics): Message[] | undefined { + switch (kafkaTopic) { case KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS: - this.subaccountMessages.push(event.message); - break; + return this.subaccountMessages; case KafkaTopics.TO_WEBSOCKETS_TRADES: - this.tradeMessages.push(event.message); - break; + return this.tradeMessages; case KafkaTopics.TO_WEBSOCKETS_MARKETS: - this.marketMessages.push(event.message); - break; + return this.marketMessages; case KafkaTopics.TO_WEBSOCKETS_CANDLES: - this.candleMessages.push(event.message); - break; + return this.candleMessages; case KafkaTopics.TO_VULCAN: - this.vulcanMessages.push(event.message); - break; + return this.vulcanMessages; default: throw new Error('Invalid Topic'); } } /** - * Sort trade events by block height, transaction index, and event index in ascending order, - * where the first trade event should be the earliest trade in the block. + * Sort subaccountMessages that represent fills by block height, transaction index, + * and event index in ascending order per order id. Only keep the subaccount message if + * it represents the last fill event per order id, but make sure the subaccount message + * contents contains all individual fills from that block. + * + * Due to separate handlers for order fills, we can be sure that if a message is annotated + * with a fill, it should only contain data about a single fill / order and not transfers + * or positions. */ - public sortTradeEvents() { - this.tradeMessages = this.tradeMessages.sort( - (a: SingleTradeMessage, b: SingleTradeMessage) => { - if (Big(a.blockHeight).lt(b.blockHeight)) { - return -1; - } else if (Big(a.blockHeight).gt(b.blockHeight)) { - return 1; + // TODO(IND-453): Generalize this to beyond subaccount messages. + public aggregateFillEventsForSubaccountMessages() { + // Create a map to store the last event for fills per order ID + const lastEventForFills: Record = {}; + // Create a map to store all fill events per order ID + const allFillEvents: Record = {}; + const nonFillEvents: AnnotatedSubaccountMessage[] = []; + + this.subaccountMessages.forEach((message: AnnotatedSubaccountMessage) => { + if (message.isFill && message.orderId) { + const fills: + FillSubaccountMessageContents[] | undefined = message.subaccountMessageContents?.fills; + const orderId: string = message.orderId; + if (fills !== undefined) { + allFillEvents[orderId] = allFillEvents[orderId] + ? allFillEvents[orderId].concat(fills) + : fills; } - if (a.transactionIndex < b.transactionIndex) { - return -1; - } else if (a.transactionIndex > b.transactionIndex) { - return 1; + // If we haven't seen this order ID before or if the current message + // was associated with a later event, update the lastFillEvents for this order ID + if ( + !lastEventForFills[orderId] || + this.compareMessages(message, lastEventForFills[orderId]) > 0 + ) { + lastEventForFills[orderId] = message; } + } else { + nonFillEvents.push(message); + } + }); - return a.eventIndex < b.eventIndex ? -1 : 1; - }, - ); + // Update the last event for the order ID such that it has all the fills + // that occurred for the order ID. + Object.keys(lastEventForFills).forEach((orderId: string) => { + const lastEvent: AnnotatedSubaccountMessage = lastEventForFills[orderId]; + const fills: FillSubaccountMessageContents[] = allFillEvents[orderId]; + if (fills) { + lastEvent.subaccountMessageContents!.fills = fills; + lastEvent.contents = JSON.stringify(lastEvent.subaccountMessageContents); + } + }); + + this.subaccountMessages = Object.values(lastEventForFills) + .concat(nonFillEvents) + .map((annotatedMessage) => convertToSubaccountMessage(annotatedMessage)); + this.sortEvents(this.subaccountMessages); + } + + /** Helper function to compare two AnnotatedSubaccountMessages based on block height, + * transaction index, and event index. + */ + private compareMessages(a: AnnotatedSubaccountMessage, b: AnnotatedSubaccountMessage) { + if (a.blockHeight === b.blockHeight) { + if (a.transactionIndex === b.transactionIndex) { + return a.eventIndex - b.eventIndex; + } + return a.transactionIndex - b.transactionIndex; + } + return Number(a.blockHeight) - Number(b.blockHeight); + } + + /** + * Sort events by block height, transaction index, and event index in ascending order, + * where the first event should be the earliest event in the block. + */ + public sortEvents(msgs: OrderedMessage[]) { + msgs.sort((a: OrderedMessage, b: OrderedMessage) => { + if (Big(a.blockHeight).lt(b.blockHeight)) { + return -1; + } else if (Big(a.blockHeight).gt(b.blockHeight)) { + return 1; + } + + if (a.transactionIndex < b.transactionIndex) { + return -1; + } else if (a.transactionIndex > b.transactionIndex) { + return 1; + } + + return a.eventIndex < b.eventIndex ? -1 : 1; + }); } public async publish() { @@ -112,6 +199,7 @@ export class KafkaPublisher { private generateAllTopicKafkaMessages(): TopicKafkaMessages[] { const allTopicKafkaMessages: TopicKafkaMessages[] = []; if (this.subaccountMessages.length > 0) { + this.aggregateFillEventsForSubaccountMessages(); allTopicKafkaMessages.push({ topic: KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS, messages: _.map(this.subaccountMessages, (message: SubaccountMessage) => { diff --git a/indexer/services/ender/src/lib/types.ts b/indexer/services/ender/src/lib/types.ts index dc104c85d8..2e07c42aea 100644 --- a/indexer/services/ender/src/lib/types.ts +++ b/indexer/services/ender/src/lib/types.ts @@ -3,10 +3,10 @@ import { Liquidity, PerpetualPositionColumns, PerpetualPositionFromDatabase, + SubaccountMessageContents, } from '@dydxprotocol-indexer/postgres'; import { StatefulOrderEventV1, - IndexerTendermintBlock, IndexerTendermintEvent, CandleMessage, LiquidationOrderV1, @@ -32,14 +32,6 @@ import { UpdateClobPairEventV1, } from '@dydxprotocol-indexer/v4-protos'; import Long from 'long'; -import { DateTime } from 'luxon'; - -export interface EventHandlerData { - block: IndexerTendermintBlock, - event: IndexerTendermintEvent, - timestamp: DateTime, - txId: number, -} // Type sourced from protocol: // https://github.com/dydxprotocol/v4-chain/blob/main/protocol/indexer/events/constants.go @@ -188,6 +180,12 @@ export interface SingleTradeMessage extends TradeMessage { eventIndex: number, } +export interface AnnotatedSubaccountMessage extends SubaccountMessage { + orderId?: string, + isFill?: boolean, + subaccountMessageContents?: SubaccountMessageContents, +} + export interface VulcanMessage { key: Buffer, value: OffChainUpdateV1, @@ -195,7 +193,7 @@ export interface VulcanMessage { export type ConsolidatedKafkaEvent = { topic: KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS, - message: SubaccountMessage, + message: AnnotatedSubaccountMessage, } | { topic: KafkaTopics.TO_WEBSOCKETS_TRADES, message: SingleTradeMessage,