Skip to content

Commit

Permalink
[IND-288] send final message per order id for order fills, with aggre…
Browse files Browse the repository at this point in the history
…gated fill sfrom the entire block (#658)
  • Loading branch information
dydxwill authored and Crystal Lemire committed Oct 26, 2023
1 parent 3f0ac87 commit 6f6dba4
Show file tree
Hide file tree
Showing 8 changed files with 442 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { TRADES_WEBSOCKET_MESSAGE_VERSION, KafkaTopics } from '@dydxprotocol-ind
import { testConstants, TradeContent, TradeMessageContents } from '@dydxprotocol-indexer/postgres';
import { TradeMessage } from '@dydxprotocol-indexer/v4-protos';

import { ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types';
import { AnnotatedSubaccountMessage, ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types';

export function contentToTradeMessage(
tradeContent: TradeContent,
Expand Down Expand Up @@ -40,3 +40,12 @@ export function createConsolidatedKafkaEventFromTrade(
message: trade,
};
}

export function createConsolidatedKafkaEventFromSubaccount(
subaccount: AnnotatedSubaccountMessage,
): ConsolidatedKafkaEvent {
return {
topic: KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS,
message: subaccount,
};
}
278 changes: 268 additions & 10 deletions indexer/services/ender/__tests__/lib/kafka-publisher.test.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,51 @@
import {
producer,
TRADES_WEBSOCKET_MESSAGE_VERSION,
KafkaTopics,
ProducerMessage,
KafkaTopics, producer, ProducerMessage, TRADES_WEBSOCKET_MESSAGE_VERSION,
} from '@dydxprotocol-indexer/kafka';
import { testConstants, TradeContent, TradeMessageContents } from '@dydxprotocol-indexer/postgres';
import { SubaccountMessage, TradeMessage } from '@dydxprotocol-indexer/v4-protos';
import {
FillFromDatabase,
FillTable,
FillType,
Liquidity,
OrderFromDatabase,
OrderSide,
OrderStatus,
SubaccountMessageContents,
SubaccountTable,
testConstants,
TradeContent,
TradeMessageContents,
TransferFromDatabase,
} from '@dydxprotocol-indexer/postgres';
import { IndexerSubaccountId, SubaccountMessage, TradeMessage } from '@dydxprotocol-indexer/v4-protos';
import Big from 'big.js';
import _ from 'lodash';
import { ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types';
import {
AnnotatedSubaccountMessage,
ConsolidatedKafkaEvent,
SingleTradeMessage,
} from '../../src/lib/types';

import { KafkaPublisher } from '../../src/lib/kafka-publisher';
import {
defaultSubaccountMessage, defaultTradeContent, defaultTradeMessage, defaultTradeKafkaEvent,
defaultSubaccountMessage,
defaultTradeContent,
defaultTradeKafkaEvent,
defaultTradeMessage,
defaultWalletAddress,
} from '../helpers/constants';
import { contentToSingleTradeMessage, contentToTradeMessage, createConsolidatedKafkaEventFromTrade } from '../helpers/kafka-publisher-helpers';
import {
contentToSingleTradeMessage,
contentToTradeMessage,
createConsolidatedKafkaEventFromSubaccount,
createConsolidatedKafkaEventFromTrade,
} from '../helpers/kafka-publisher-helpers';
import {
generateFillSubaccountMessage,
generateOrderSubaccountMessage,
generateTransferContents,
} from '../../src/helpers/kafka-helper';
import { DateTime } from 'luxon';
import { convertToSubaccountMessage } from '../../src/lib/helper';

describe('kafka-publisher', () => {
let producerSendMock: jest.SpyInstance;
Expand Down Expand Up @@ -139,11 +170,238 @@ describe('kafka-publisher', () => {
consolidatedBeforeTrade,
]);

publisher.sortTradeEvents();
publisher.sortEvents(publisher.tradeMessages);
expect(publisher.tradeMessages).toEqual([beforeTrade, trade, afterTrade]);
});
});

describe('sortSubaccountEvents', () => {
const subaccount: AnnotatedSubaccountMessage = defaultSubaccountMessage;
const consolidatedSubaccount:
ConsolidatedKafkaEvent = createConsolidatedKafkaEventFromSubaccount(subaccount);
it.each([
[
'blockHeight',
{
...subaccount,
blockHeight: Big(subaccount.blockHeight).minus(1).toString(),
},
{
...subaccount,
blockHeight: Big(subaccount.blockHeight).plus(1).toString(),
},
],
[
'transactionIndex',
{
...subaccount,
transactionIndex: subaccount.transactionIndex - 1,
},
{
...subaccount,
transactionIndex: subaccount.transactionIndex + 1,
},
],
[
'eventIndex',
{
...subaccount,
eventIndex: subaccount.eventIndex - 1,
},
{
...subaccount,
eventIndex: subaccount.eventIndex + 1,
},
],
])('successfully subaccounts events by %s', (
_field: string,
beforeSubaccount: AnnotatedSubaccountMessage,
afterSubaccount: AnnotatedSubaccountMessage,
) => {
const publisher: KafkaPublisher = new KafkaPublisher();
const consolidatedBeforeSubaccount:
ConsolidatedKafkaEvent = createConsolidatedKafkaEventFromSubaccount(
beforeSubaccount,
);
const consolidatedAfterSubaccount:
ConsolidatedKafkaEvent = createConsolidatedKafkaEventFromSubaccount(
afterSubaccount,
);

publisher.addEvents([
consolidatedAfterSubaccount,
consolidatedSubaccount,
consolidatedBeforeSubaccount,
]);

publisher.sortEvents(publisher.subaccountMessages);
expect(publisher.subaccountMessages).toEqual([beforeSubaccount, subaccount, afterSubaccount]);
});
});

describe('aggregateFillEventsForSubaccountMessages', () => {
const fill: FillFromDatabase = {
id: FillTable.uuid(testConstants.defaultTendermintEventId, Liquidity.TAKER),
subaccountId: testConstants.defaultSubaccountId,
side: OrderSide.BUY,
liquidity: Liquidity.TAKER,
type: FillType.MARKET,
clobPairId: '1',
orderId: testConstants.defaultOrderId,
size: '10',
price: '20000',
quoteAmount: '200000',
eventId: testConstants.defaultTendermintEventId,
transactionHash: '', // TODO: Add a real transaction Hash
createdAt: testConstants.createdDateTime.toISO(),
createdAtHeight: testConstants.createdHeight,
clientMetadata: '0',
fee: '1.1',
};
const order: OrderFromDatabase = {
...testConstants.defaultOrderGoodTilBlockTime,
id: testConstants.defaultOrderId,
};

const recipientSubaccountId: IndexerSubaccountId = IndexerSubaccountId.fromPartial({
owner: 'recipient',
number: 1,
});
const deposit: TransferFromDatabase = {
id: '',
senderWalletAddress: defaultWalletAddress,
recipientSubaccountId: SubaccountTable.uuid(
recipientSubaccountId.owner,
recipientSubaccountId.number,
),
assetId: testConstants.defaultAsset.id,
size: '10',
eventId: testConstants.defaultTendermintEventId,
transactionHash: 'hash',
createdAt: DateTime.utc().toISO(),
createdAtHeight: '1',
};
it('successfully aggregates all fill events per order id and sorts messages', async () => {
const publisher: KafkaPublisher = new KafkaPublisher();

// merged with message 3.
const msg1Contents: SubaccountMessageContents = {
fills: [
generateFillSubaccountMessage(fill, 'BTC-USD'),
],
orders: [
generateOrderSubaccountMessage(order, 'BTC-USD'),
],
};
const message1: AnnotatedSubaccountMessage = {
blockHeight: '1',
transactionIndex: 1,
eventIndex: 1,
contents: JSON.stringify(msg1Contents),
subaccountId: {
owner: 'owner1',
number: 0,
},
version: '1',
orderId: 'order1',
isFill: true,
subaccountMessageContents: msg1Contents,
};

const msg2Contents: SubaccountMessageContents = {
fills: [
generateFillSubaccountMessage(fill, 'ETH-USD'),
],
};
const message2: AnnotatedSubaccountMessage = {
...message1,
transactionIndex: 2,
contents: JSON.stringify(msg2Contents),
orderId: 'order2',
subaccountMessageContents: msg2Contents,
};

const msg3Contents: SubaccountMessageContents = {
fills: [
generateFillSubaccountMessage({
...fill,
size: '100',
}, 'BTC-USD'),
],
orders: [
generateOrderSubaccountMessage({
...order,
status: OrderStatus.FILLED,
}, 'BTC-USD'),
],
};
const message3: AnnotatedSubaccountMessage = {
...message1,
transactionIndex: 3,
contents: JSON.stringify(msg3Contents),
subaccountMessageContents: msg3Contents,
};

// non-fill subaccount message.
const msg4Contents: SubaccountMessageContents = generateTransferContents(
deposit,
testConstants.defaultAsset,
recipientSubaccountId,
undefined,
recipientSubaccountId,
);
const message4: AnnotatedSubaccountMessage = {
...message1,
eventIndex: 4,
orderId: undefined,
isFill: undefined,
contents: JSON.stringify(msg4Contents),
};

const expectedMergedContents: SubaccountMessageContents = {
fills: [
msg1Contents.fills![0],
msg3Contents.fills![0],
],
orders: [
msg3Contents.orders![0],
],
};
const mergedMessage3: AnnotatedSubaccountMessage = {
...message3,
contents: JSON.stringify(expectedMergedContents),
subaccountMessageContents: expectedMergedContents,
};

publisher.addEvents([
createConsolidatedKafkaEventFromSubaccount(message1),
createConsolidatedKafkaEventFromSubaccount(message2),
createConsolidatedKafkaEventFromSubaccount(message3),
createConsolidatedKafkaEventFromSubaccount(message4),
]);

publisher.aggregateFillEventsForSubaccountMessages();
const expectedMsgs: SubaccountMessage[] = [
convertToSubaccountMessage(message4),
convertToSubaccountMessage(message2),
convertToSubaccountMessage(mergedMessage3),
];
expect(publisher.subaccountMessages).toEqual(expectedMsgs);

await publisher.publish();

expect(producerSendMock).toHaveBeenCalledTimes(1);
expect(producerSendMock).toHaveBeenCalledWith({
topic: KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS,
messages: _.map(expectedMsgs, (message: SubaccountMessage) => {
return {
value: Buffer.from(Uint8Array.from(SubaccountMessage.encode(message).finish())),
};
}),
});
});
});

describe('groupKafkaTradesByClobPairId', () => {
it('successfully groups kafka trade messages', () => {
const kafkaPublisher: KafkaPublisher = new KafkaPublisher();
Expand Down
14 changes: 11 additions & 3 deletions indexer/services/ender/src/handlers/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,21 @@ import {
TRADES_WEBSOCKET_MESSAGE_VERSION,
KafkaTopics,
} from '@dydxprotocol-indexer/kafka';
import { SubaccountMessageContents } from '@dydxprotocol-indexer/postgres';
import {
IndexerTendermintBlock,
IndexerTendermintEvent,
MarketMessage,
OffChainUpdateV1,
SubaccountId,
SubaccountMessage,
} from '@dydxprotocol-indexer/v4-protos';
import { DateTime } from 'luxon';

import config from '../config';
import { indexerTendermintEventToTransactionIndex } from '../lib/helper';
import { ConsolidatedKafkaEvent, EventMessage, SingleTradeMessage } from '../lib/types';
import {
AnnotatedSubaccountMessage, ConsolidatedKafkaEvent, EventMessage, SingleTradeMessage,
} from '../lib/types';

export type HandlerInitializer = new (
block: IndexerTendermintBlock,
Expand Down Expand Up @@ -103,9 +105,12 @@ export abstract class Handler<T> {
protected generateConsolidatedSubaccountKafkaEvent(
contents: string,
subaccountId: SubaccountId,
orderId?: string,
isFill?: boolean,
subaccountMessageContents?: SubaccountMessageContents,
): ConsolidatedKafkaEvent {
stats.increment(`${config.SERVICE_NAME}.create_subaccount_kafka_event`, 1);
const subaccountMessage: SubaccountMessage = {
const subaccountMessage: AnnotatedSubaccountMessage = {
blockHeight: this.block.height.toString(),
transactionIndex: indexerTendermintEventToTransactionIndex(
this.indexerTendermintEvent,
Expand All @@ -114,6 +119,9 @@ export abstract class Handler<T> {
contents,
subaccountId,
version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
orderId,
isFill,
subaccountMessageContents,
};

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,9 @@ export abstract class AbstractOrderFillHandler<T> extends Handler<T> {
return this.generateConsolidatedSubaccountKafkaEvent(
JSON.stringify(message),
subaccountIdProto,
order?.id,
true,
message,
);
}

Expand Down
Loading

0 comments on commit 6f6dba4

Please sign in to comment.