Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IND-288] send final message per order id for order fills, with aggregated fill sfrom the entire block #658

Merged
merged 7 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { TRADES_WEBSOCKET_MESSAGE_VERSION, KafkaTopics } from '@dydxprotocol-ind
import { testConstants, TradeContent, TradeMessageContents } from '@dydxprotocol-indexer/postgres';
import { TradeMessage } from '@dydxprotocol-indexer/v4-protos';

import { ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types';
import { AnnotatedSubaccountMessage, ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types';

export function contentToTradeMessage(
tradeContent: TradeContent,
Expand Down Expand Up @@ -40,3 +40,12 @@ export function createConsolidatedKafkaEventFromTrade(
message: trade,
};
}

export function createConsolidatedKafkaEventFromSubaccount(
subaccount: AnnotatedSubaccountMessage,
): ConsolidatedKafkaEvent {
return {
topic: KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS,
message: subaccount,
};
}
179 changes: 171 additions & 8 deletions indexer/services/ender/__tests__/lib/kafka-publisher.test.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
import {
producer,
TRADES_WEBSOCKET_MESSAGE_VERSION,
KafkaTopics,
ProducerMessage,
KafkaTopics, producer, ProducerMessage, TRADES_WEBSOCKET_MESSAGE_VERSION,
} from '@dydxprotocol-indexer/kafka';
import { testConstants, TradeContent, TradeMessageContents } from '@dydxprotocol-indexer/postgres';
import { SubaccountMessage, TradeMessage } from '@dydxprotocol-indexer/v4-protos';
import Big from 'big.js';
import _ from 'lodash';
import { ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types';
import {
AnnotatedSubaccountMessage,
ConsolidatedKafkaEvent,
convertToSubaccountMessage,
SingleTradeMessage,
} from '../../src/lib/types';

import { KafkaPublisher } from '../../src/lib/kafka-publisher';
import {
defaultSubaccountMessage, defaultTradeContent, defaultTradeMessage, defaultTradeKafkaEvent,
defaultSubaccountMessage,
defaultTradeContent,
defaultTradeKafkaEvent,
defaultTradeMessage,
} from '../helpers/constants';
import { contentToSingleTradeMessage, contentToTradeMessage, createConsolidatedKafkaEventFromTrade } from '../helpers/kafka-publisher-helpers';
import {
contentToSingleTradeMessage,
contentToTradeMessage,
createConsolidatedKafkaEventFromSubaccount,
createConsolidatedKafkaEventFromTrade,
} from '../helpers/kafka-publisher-helpers';

describe('kafka-publisher', () => {
let producerSendMock: jest.SpyInstance;
Expand Down Expand Up @@ -139,11 +149,164 @@ describe('kafka-publisher', () => {
consolidatedBeforeTrade,
]);

publisher.sortTradeEvents();
publisher.sortEvents(KafkaTopics.TO_WEBSOCKETS_TRADES);
expect(publisher.tradeMessages).toEqual([beforeTrade, trade, afterTrade]);
});
});

describe('sortSubaccountEvents', () => {
const subaccount: AnnotatedSubaccountMessage = defaultSubaccountMessage;
const consolidatedSubaccount:
ConsolidatedKafkaEvent = createConsolidatedKafkaEventFromSubaccount(subaccount);
it.each([
[
'blockHeight',
{
...subaccount,
blockHeight: Big(subaccount.blockHeight).minus(1).toString(),
},
{
...subaccount,
blockHeight: Big(subaccount.blockHeight).plus(1).toString(),
},
],
[
'transactionIndex',
{
...subaccount,
transactionIndex: subaccount.transactionIndex - 1,
},
{
...subaccount,
transactionIndex: subaccount.transactionIndex + 1,
},
],
[
'eventIndex',
{
...subaccount,
eventIndex: subaccount.eventIndex - 1,
},
{
...subaccount,
eventIndex: subaccount.eventIndex + 1,
},
],
])('successfully subaccounts events by %s', (
_field: string,
beforeSubaccount: AnnotatedSubaccountMessage,
afterSubaccount: AnnotatedSubaccountMessage,
) => {
const publisher: KafkaPublisher = new KafkaPublisher();
const consolidatedBeforeSubaccount:
ConsolidatedKafkaEvent = createConsolidatedKafkaEventFromSubaccount(
beforeSubaccount,
);
const consolidatedAfterSubaccount:
ConsolidatedKafkaEvent = createConsolidatedKafkaEventFromSubaccount(
afterSubaccount,
);

publisher.addEvents([
consolidatedAfterSubaccount,
consolidatedSubaccount,
consolidatedBeforeSubaccount,
]);

publisher.sortEvents(KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS);
expect(publisher.subaccountMessages).toEqual([beforeSubaccount, subaccount, afterSubaccount]);
});
});

describe('retainLastFillEventsForSubaccountMessages', () => {
it('successfully retains the last fill event per order id and sorts messages', async () => {
const publisher: KafkaPublisher = new KafkaPublisher();

// over-written by message 3.
const message1: AnnotatedSubaccountMessage = {
vincentwschau marked this conversation as resolved.
Show resolved Hide resolved
blockHeight: '1',
transactionIndex: 1,
eventIndex: 1,
contents: 'Message 1',
subaccountId: {
owner: 'owner1',
number: 0,
},
version: '1',
orderId: 'order1',
isFill: true,
};

const message2: AnnotatedSubaccountMessage = {
blockHeight: '1',
transactionIndex: 2,
eventIndex: 2,
contents: 'Message 2',
subaccountId: {
owner: 'owner2',
number: 0,
},
version: '1',
orderId: 'order2',
isFill: true,
};

const message3: AnnotatedSubaccountMessage = {
blockHeight: '1',
transactionIndex: 3,
eventIndex: 3,
contents: 'Message 3',
subaccountId: {
owner: 'owner3',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: For the sake of correct-ness, this should match msg1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

number: 0,
},
version: '1',
orderId: 'order1',
isFill: true,
};

// non-fill subaccount message.
const message4: AnnotatedSubaccountMessage = {
blockHeight: '1',
transactionIndex: 3,
eventIndex: 4,
contents: 'Message 3',
subaccountId: {
owner: 'owner3',
number: 0,
},
version: '4',
};

publisher.addEvents([
createConsolidatedKafkaEventFromSubaccount(message1),
createConsolidatedKafkaEventFromSubaccount(message2),
createConsolidatedKafkaEventFromSubaccount(message3),
createConsolidatedKafkaEventFromSubaccount(message4),
]);

publisher.retainLastFillEventsForSubaccountMessages();
const expectedMsgs: SubaccountMessage[] = [
convertToSubaccountMessage(message2),
convertToSubaccountMessage(message3),
convertToSubaccountMessage(message4),
];
expect(publisher.subaccountMessages).toEqual(expectedMsgs);

await publisher.publish();

expect(producerSendMock).toHaveBeenCalledTimes(1);
expect(producerSendMock).toHaveBeenCalledWith({
topic: KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS,
messages: _.map(expectedMsgs, (message: SubaccountMessage) => {
return {
value: Buffer.from(Uint8Array.from(SubaccountMessage.encode(message).finish())),
};
}),
});
});
});

describe('groupKafkaTradesByClobPairId', () => {
it('successfully groups kafka trade messages', () => {
const kafkaPublisher: KafkaPublisher = new KafkaPublisher();
Expand Down
11 changes: 8 additions & 3 deletions indexer/services/ender/src/handlers/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ import {
MarketMessage,
OffChainUpdateV1,
SubaccountId,
SubaccountMessage,
} from '@dydxprotocol-indexer/v4-protos';
import { DateTime } from 'luxon';

import config from '../config';
import { indexerTendermintEventToTransactionIndex } from '../lib/helper';
import { ConsolidatedKafkaEvent, EventMessage, SingleTradeMessage } from '../lib/types';
import {
AnnotatedSubaccountMessage, ConsolidatedKafkaEvent, EventMessage, SingleTradeMessage,
} from '../lib/types';

export type HandlerInitializer = new (
block: IndexerTendermintBlock,
Expand Down Expand Up @@ -103,9 +104,11 @@ export abstract class Handler<T> {
protected generateConsolidatedSubaccountKafkaEvent(
contents: string,
subaccountId: SubaccountId,
orderId?: string,
isFill?: boolean,
): ConsolidatedKafkaEvent {
stats.increment(`${config.SERVICE_NAME}.create_subaccount_kafka_event`, 1);
const subaccountMessage: SubaccountMessage = {
const subaccountMessage: AnnotatedSubaccountMessage = {
blockHeight: this.block.height.toString(),
transactionIndex: indexerTendermintEventToTransactionIndex(
this.indexerTendermintEvent,
Expand All @@ -114,6 +117,8 @@ export abstract class Handler<T> {
contents,
subaccountId,
version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
orderId,
isFill,
};

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ export abstract class AbstractOrderFillHandler<T> extends Handler<T> {
return this.generateConsolidatedSubaccountKafkaEvent(
JSON.stringify(message),
subaccountIdProto,
order?.id,
true,
vincentwschau marked this conversation as resolved.
Show resolved Hide resolved
);
}

Expand Down
20 changes: 10 additions & 10 deletions indexer/services/ender/src/lib/candles-generator.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import { stats } from '@dydxprotocol-indexer/base';
import { CANDLES_WEBSOCKET_MESSAGE_VERSION, KafkaTopics } from '@dydxprotocol-indexer/kafka';
import {
CANDLE_RESOLUTION_TO_PROTO,
CandleColumns,
CandleCreateObject,
CandleFromDatabase,
CandleMessageContents,
CandleResolution,
CandleTable,
CandleUpdateObject,
CANDLE_RESOLUTION_TO_PROTO,
MarketOpenInterest,
NUM_SECONDS_IN_CANDLE_RESOLUTIONS,
Options,
PerpetualMarketColumns,
PerpetualMarketFromDatabase,
perpetualMarketRefresher,
PerpetualPositionTable,
TradeContent,
TradeMessageContents,
CandleMessageContents,
CandleColumns,
PerpetualPositionTable,
PerpetualMarketFromDatabase,
MarketOpenInterest,
PerpetualMarketColumns,
Options,
NUM_SECONDS_IN_CANDLE_RESOLUTIONS,
} from '@dydxprotocol-indexer/postgres';
import { CandleMessage } from '@dydxprotocol-indexer/v4-protos';
import Big from 'big.js';
Expand Down Expand Up @@ -69,7 +69,7 @@ export class CandlesGenerator {
public async updateCandles(): Promise<CandleFromDatabase[]> {
const start: number = Date.now();
// 1. Sort trade messages by order in the block
this.kafkaPublisher.sortTradeEvents();
this.kafkaPublisher.sortEvents(KafkaTopics.TO_WEBSOCKETS_TRADES);

// 2. Generate BlockCandleUpdatesMap
const blockCandleUpdatesMap: BlockCandleUpdatesMap = this.generateBlockCandleUpdatesMap();
Expand Down
Loading