Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IND-288] send final message per order id for order fills, with aggregated fill sfrom the entire block #658

Merged
merged 7 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions indexer/services/ender/__tests__/lib/kafka-publisher.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import {
producer,
TRADES_WEBSOCKET_MESSAGE_VERSION,
KafkaTopics,
ProducerMessage,
KafkaTopics, producer, ProducerMessage, TRADES_WEBSOCKET_MESSAGE_VERSION,
} from '@dydxprotocol-indexer/kafka';
import { testConstants, TradeContent, TradeMessageContents } from '@dydxprotocol-indexer/postgres';
import { SubaccountMessage, TradeMessage } from '@dydxprotocol-indexer/v4-protos';
Expand All @@ -12,9 +9,16 @@ import { ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types'

import { KafkaPublisher } from '../../src/lib/kafka-publisher';
import {
defaultSubaccountMessage, defaultTradeContent, defaultTradeMessage, defaultTradeKafkaEvent,
defaultSubaccountMessage,
defaultTradeContent,
defaultTradeKafkaEvent,
defaultTradeMessage,
} from '../helpers/constants';
import { contentToSingleTradeMessage, contentToTradeMessage, createConsolidatedKafkaEventFromTrade } from '../helpers/kafka-publisher-helpers';
import {
contentToSingleTradeMessage,
contentToTradeMessage,
createConsolidatedKafkaEventFromTrade,
} from '../helpers/kafka-publisher-helpers';

describe('kafka-publisher', () => {
let producerSendMock: jest.SpyInstance;
Expand Down Expand Up @@ -139,7 +143,7 @@ describe('kafka-publisher', () => {
consolidatedBeforeTrade,
]);

publisher.sortTradeEvents();
publisher.sortEvents(KafkaTopics.TO_WEBSOCKETS_TRADES);
expect(publisher.tradeMessages).toEqual([beforeTrade, trade, afterTrade]);
});
});
Expand Down
11 changes: 8 additions & 3 deletions indexer/services/ender/src/handlers/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ import {
MarketMessage,
OffChainUpdateV1,
SubaccountId,
SubaccountMessage,
} from '@dydxprotocol-indexer/v4-protos';
import { DateTime } from 'luxon';

import config from '../config';
import { indexerTendermintEventToTransactionIndex } from '../lib/helper';
import { ConsolidatedKafkaEvent, EventMessage, SingleTradeMessage } from '../lib/types';
import {
AnnotatedSubaccountMessage, ConsolidatedKafkaEvent, EventMessage, SingleTradeMessage,
} from '../lib/types';

export type HandlerInitializer = new (
block: IndexerTendermintBlock,
Expand Down Expand Up @@ -103,9 +104,11 @@ export abstract class Handler<T> {
protected generateConsolidatedSubaccountKafkaEvent(
contents: string,
subaccountId: SubaccountId,
orderId?: string,
isFill?: boolean,
): ConsolidatedKafkaEvent {
stats.increment(`${config.SERVICE_NAME}.create_subaccount_kafka_event`, 1);
const subaccountMessage: SubaccountMessage = {
const subaccountMessage: AnnotatedSubaccountMessage = {
blockHeight: this.block.height.toString(),
transactionIndex: indexerTendermintEventToTransactionIndex(
this.indexerTendermintEvent,
Expand All @@ -114,6 +117,8 @@ export abstract class Handler<T> {
contents,
subaccountId,
version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
orderId,
isFill,
};

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ export abstract class AbstractOrderFillHandler<T> extends Handler<T> {
return this.generateConsolidatedSubaccountKafkaEvent(
JSON.stringify(message),
subaccountIdProto,
order?.id,
true,
vincentwschau marked this conversation as resolved.
Show resolved Hide resolved
);
}

Expand Down
20 changes: 10 additions & 10 deletions indexer/services/ender/src/lib/candles-generator.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import { stats } from '@dydxprotocol-indexer/base';
import { CANDLES_WEBSOCKET_MESSAGE_VERSION, KafkaTopics } from '@dydxprotocol-indexer/kafka';
import {
CANDLE_RESOLUTION_TO_PROTO,
CandleColumns,
CandleCreateObject,
CandleFromDatabase,
CandleMessageContents,
CandleResolution,
CandleTable,
CandleUpdateObject,
CANDLE_RESOLUTION_TO_PROTO,
MarketOpenInterest,
NUM_SECONDS_IN_CANDLE_RESOLUTIONS,
Options,
PerpetualMarketColumns,
PerpetualMarketFromDatabase,
perpetualMarketRefresher,
PerpetualPositionTable,
TradeContent,
TradeMessageContents,
CandleMessageContents,
CandleColumns,
PerpetualPositionTable,
PerpetualMarketFromDatabase,
MarketOpenInterest,
PerpetualMarketColumns,
Options,
NUM_SECONDS_IN_CANDLE_RESOLUTIONS,
} from '@dydxprotocol-indexer/postgres';
import { CandleMessage } from '@dydxprotocol-indexer/v4-protos';
import Big from 'big.js';
Expand Down Expand Up @@ -69,7 +69,7 @@ export class CandlesGenerator {
public async updateCandles(): Promise<CandleFromDatabase[]> {
const start: number = Date.now();
// 1. Sort trade messages by order in the block
this.kafkaPublisher.sortTradeEvents();
this.kafkaPublisher.sortEvents(KafkaTopics.TO_WEBSOCKETS_TRADES);

// 2. Generate BlockCandleUpdatesMap
const blockCandleUpdatesMap: BlockCandleUpdatesMap = this.generateBlockCandleUpdatesMap();
Expand Down
109 changes: 85 additions & 24 deletions indexer/services/ender/src/lib/kafka-publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,22 @@ import { TradeMessageContents } from '@dydxprotocol-indexer/postgres';
import {
CandleMessage, MarketMessage, OffChainUpdateV1, SubaccountMessage, TradeMessage,
} from '@dydxprotocol-indexer/v4-protos';
import Big from 'big.js';
import _ from 'lodash';

import config from '../config';
import { ConsolidatedKafkaEvent, SingleTradeMessage, VulcanMessage } from './types';
import {
AnnotatedSubaccountMessage, ConsolidatedKafkaEvent, SingleTradeMessage, VulcanMessage,
} from './types';

type TopicKafkaMessages = {
topic: KafkaTopics;
messages: ProducerMessage[];
};

type OrderedMessage = AnnotatedSubaccountMessage | SingleTradeMessage;

export class KafkaPublisher {
subaccountMessages: SubaccountMessage[];
subaccountMessages: AnnotatedSubaccountMessage[];
tradeMessages: SingleTradeMessage[];
marketMessages: MarketMessage[];
candleMessages: CandleMessage[];
Expand All @@ -43,37 +46,94 @@ export class KafkaPublisher {
}

public addEvent(event: ConsolidatedKafkaEvent) {
switch (event.topic) {
this.getMessages(event.topic)!.push(event.message);
}

/**
* Helper function to get messages for a given topic.
*
* @param kafkaTopic
* @private
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private getMessages(kafkaTopic: KafkaTopics): any[] | undefined {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is it possible to create a type other than any for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

switch (kafkaTopic) {
case KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS:
this.subaccountMessages.push(event.message);
break;
return this.subaccountMessages;
case KafkaTopics.TO_WEBSOCKETS_TRADES:
this.tradeMessages.push(event.message);
break;
return this.tradeMessages;
case KafkaTopics.TO_WEBSOCKETS_MARKETS:
this.marketMessages.push(event.message);
break;
return this.marketMessages;
case KafkaTopics.TO_WEBSOCKETS_CANDLES:
this.candleMessages.push(event.message);
break;
return this.candleMessages;
case KafkaTopics.TO_VULCAN:
this.vulcanMessages.push(event.message);
break;
return this.vulcanMessages;
default:
throw new Error('Invalid Topic');
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getMessages method returns an array of messages for a given Kafka topic. It uses a switch statement to determine which message array to return based on the topic. This is a good use of a switch statement as it makes the code more readable and maintainable. However, the method returns any[] | undefined, which is not type-safe. Consider using a more specific type or a union of specific types instead of any.

- private getMessages(kafkaTopic: KafkaTopics): any[] | undefined {
+ private getMessages(kafkaTopic: KafkaTopics): (AnnotatedSubaccountMessage | SingleTradeMessage | MarketMessage | CandleMessage)[] | undefined {

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The addEvent method has been updated to use a helper function getMessages to retrieve the appropriate message array based on the topic. This change simplifies the addEvent method and makes it easier to add new topics in the future. However, the getMessages method returns any[] | undefined, which could lead to potential type safety issues. Consider refining the return type to be more specific.

- private getMessages(kafkaTopic: KafkaTopics): any[] | undefined {
+ private getMessages(kafkaTopic: KafkaTopics): (AnnotatedSubaccountMessage | SingleTradeMessage)[] | undefined {


/**
* Sort trade events by block height, transaction index, and event index in ascending order,
* where the first trade event should be the earliest trade in the block.
* Sort subaccountMessages that represent fills by block height, transaction index,
vincentwschau marked this conversation as resolved.
Show resolved Hide resolved
* and event index in ascending order per order id. Only keep the subaccount message if
* it represents the last fill event per order id.
*/
public retainLastFillEventsForSubaccountMessages() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would lead to all prior fills for an order (if there were multiple) to also be omitted, and that would be incorrect.

E.g. Imagine order A has 2 fills:

  • message 1: {fills: [ .... 40% of order size], order: [ .... status is OPEN, total filled is 40%]}
  • message 2: {fills: [ .... 60% of order size], order: [..... status is FILLED, total filled is 100%]}

The websocket message I expect to see as a client is:

  • message: {fills: [.... 40% of order size, .... 60% of order size], order: [....status is FILLED, total filled is 100%]}

However with this logic what the client sees is:

  • message: {fills: [....60% of order size], order: [....status is FILLED, total filled is 100%]}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// Create a map to store the last fill event per order ID
const lastFillEvents: Record<string, AnnotatedSubaccountMessage> = {};
const nonFillEvents: AnnotatedSubaccountMessage[] = [];

this.subaccountMessages.forEach((message) => {
if (message.isFill && message.orderId) {
const orderId = message.orderId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// If we haven't seen this order ID before or if the current message
// has a higher block height, update the lastFillEvents for this order ID
Copy link
Contributor

@vincentwschau vincentwschau Oct 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// has a higher block height, update the lastFillEvents for this order ID
// was associated with a later event, update the lastFillEvents for this order ID

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (
!lastFillEvents[orderId] ||
this.compareMessages(message, lastFillEvents[orderId]) > 0
) {
lastFillEvents[orderId] = message;
}
} else {
nonFillEvents.push(message);
}
});

this.subaccountMessages = Object.values(lastFillEvents).concat(nonFillEvents);
this.sortEvents(KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS);
}

/** Helper function to compare two AnnotatedSubaccountMessages based on block height,
* transaction index, and event index.
*/
private compareMessages(a: AnnotatedSubaccountMessage, b: AnnotatedSubaccountMessage) {
if (a.blockHeight === b.blockHeight) {
if (a.transactionIndex === b.transactionIndex) {
return a.eventIndex - b.eventIndex;
}
return a.transactionIndex - b.transactionIndex;
}
return Number(a.blockHeight) - Number(b.blockHeight);
}

/**
* Sort events by block height, transaction index, and event index in ascending order,
* where the first event should be the earliest event in the block.
*/
public sortTradeEvents() {
this.tradeMessages = this.tradeMessages.sort(
(a: SingleTradeMessage, b: SingleTradeMessage) => {
if (Big(a.blockHeight).lt(b.blockHeight)) {
public sortEvents(kafkaTopic: KafkaTopics) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can this be made a generic function that takes in OrderedMessage and sorts them and returns a sorted list rather than checking if the topic is sortable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (![
KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS,
KafkaTopics.TO_WEBSOCKETS_TRADES,
].includes(kafkaTopic)) {
throw new Error('Sorting events is only supported for subaccount and trade kafka websocket topics');
}
const msgs: OrderedMessage[] = this.getMessages(kafkaTopic) as OrderedMessage[];

if (msgs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? Would calling .sort on an empty array lead to an error / undesirable outcome?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

msgs.sort((a: OrderedMessage, b: OrderedMessage) => {
if (a.blockHeight < b.blockHeight) {
return -1;
} else if (Big(a.blockHeight).gt(b.blockHeight)) {
} else if (a.blockHeight > b.blockHeight) {
return 1;
}

Expand All @@ -83,9 +143,9 @@ export class KafkaPublisher {
return 1;
}

return a.eventIndex < b.eventIndex ? -1 : 1;
},
);
return a.eventIndex - b.eventIndex;
});
}
}

public async publish() {
Expand All @@ -112,6 +172,7 @@ export class KafkaPublisher {
private generateAllTopicKafkaMessages(): TopicKafkaMessages[] {
const allTopicKafkaMessages: TopicKafkaMessages[] = [];
if (this.subaccountMessages.length > 0) {
this.retainLastFillEventsForSubaccountMessages();
allTopicKafkaMessages.push({
topic: KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS,
messages: _.map(this.subaccountMessages, (message: SubaccountMessage) => {
Expand Down
7 changes: 6 additions & 1 deletion indexer/services/ender/src/lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,19 @@ export interface SingleTradeMessage extends TradeMessage {
eventIndex: number,
}

export interface AnnotatedSubaccountMessage extends SubaccountMessage {
orderId?: string,
isFill?: boolean,
}

export interface VulcanMessage {
key: Buffer,
value: OffChainUpdateV1,
}

export type ConsolidatedKafkaEvent = {
topic: KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS,
message: SubaccountMessage,
message: AnnotatedSubaccountMessage,
} | {
topic: KafkaTopics.TO_WEBSOCKETS_TRADES,
message: SingleTradeMessage,
Expand Down
Loading