Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IND-288] send final message per order id for order fills, with aggregated fill sfrom the entire block #658

Merged
merged 7 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { TRADES_WEBSOCKET_MESSAGE_VERSION, KafkaTopics } from '@dydxprotocol-ind
import { testConstants, TradeContent, TradeMessageContents } from '@dydxprotocol-indexer/postgres';
import { TradeMessage } from '@dydxprotocol-indexer/v4-protos';

import { ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types';
import { AnnotatedSubaccountMessage, ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types';

export function contentToTradeMessage(
tradeContent: TradeContent,
Expand Down Expand Up @@ -40,3 +40,12 @@ export function createConsolidatedKafkaEventFromTrade(
message: trade,
};
}

export function createConsolidatedKafkaEventFromSubaccount(
subaccount: AnnotatedSubaccountMessage,
): ConsolidatedKafkaEvent {
return {
topic: KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS,
message: subaccount,
};
}
278 changes: 268 additions & 10 deletions indexer/services/ender/__tests__/lib/kafka-publisher.test.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,51 @@
import {
producer,
TRADES_WEBSOCKET_MESSAGE_VERSION,
KafkaTopics,
ProducerMessage,
KafkaTopics, producer, ProducerMessage, TRADES_WEBSOCKET_MESSAGE_VERSION,
} from '@dydxprotocol-indexer/kafka';
import { testConstants, TradeContent, TradeMessageContents } from '@dydxprotocol-indexer/postgres';
import { SubaccountMessage, TradeMessage } from '@dydxprotocol-indexer/v4-protos';
import {
FillFromDatabase,
FillTable,
FillType,
Liquidity,
OrderFromDatabase,
OrderSide,
OrderStatus,
SubaccountMessageContents,
SubaccountTable,
testConstants,
TradeContent,
TradeMessageContents,
TransferFromDatabase,
} from '@dydxprotocol-indexer/postgres';
import { IndexerSubaccountId, SubaccountMessage, TradeMessage } from '@dydxprotocol-indexer/v4-protos';
import Big from 'big.js';
import _ from 'lodash';
import { ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types';
import {
AnnotatedSubaccountMessage,
ConsolidatedKafkaEvent,
SingleTradeMessage,
} from '../../src/lib/types';

import { KafkaPublisher } from '../../src/lib/kafka-publisher';
import {
defaultSubaccountMessage, defaultTradeContent, defaultTradeMessage, defaultTradeKafkaEvent,
defaultSubaccountMessage,
defaultTradeContent,
defaultTradeKafkaEvent,
defaultTradeMessage,
defaultWalletAddress,
} from '../helpers/constants';
import { contentToSingleTradeMessage, contentToTradeMessage, createConsolidatedKafkaEventFromTrade } from '../helpers/kafka-publisher-helpers';
import {
contentToSingleTradeMessage,
contentToTradeMessage,
createConsolidatedKafkaEventFromSubaccount,
createConsolidatedKafkaEventFromTrade,
} from '../helpers/kafka-publisher-helpers';
import {
generateFillSubaccountMessage,
generateOrderSubaccountMessage,
generateTransferContents,
} from '../../src/helpers/kafka-helper';
import { DateTime } from 'luxon';
import { convertToSubaccountMessage } from '../../src/lib/helper';

describe('kafka-publisher', () => {
let producerSendMock: jest.SpyInstance;
Expand Down Expand Up @@ -139,11 +170,238 @@ describe('kafka-publisher', () => {
consolidatedBeforeTrade,
]);

publisher.sortTradeEvents();
publisher.sortEvents(publisher.tradeMessages);
expect(publisher.tradeMessages).toEqual([beforeTrade, trade, afterTrade]);
});
});

describe('sortSubaccountEvents', () => {
const subaccount: AnnotatedSubaccountMessage = defaultSubaccountMessage;
const consolidatedSubaccount:
ConsolidatedKafkaEvent = createConsolidatedKafkaEventFromSubaccount(subaccount);
it.each([
[
'blockHeight',
{
...subaccount,
blockHeight: Big(subaccount.blockHeight).minus(1).toString(),
},
{
...subaccount,
blockHeight: Big(subaccount.blockHeight).plus(1).toString(),
},
],
[
'transactionIndex',
{
...subaccount,
transactionIndex: subaccount.transactionIndex - 1,
},
{
...subaccount,
transactionIndex: subaccount.transactionIndex + 1,
},
],
[
'eventIndex',
{
...subaccount,
eventIndex: subaccount.eventIndex - 1,
},
{
...subaccount,
eventIndex: subaccount.eventIndex + 1,
},
],
])('successfully subaccounts events by %s', (
_field: string,
beforeSubaccount: AnnotatedSubaccountMessage,
afterSubaccount: AnnotatedSubaccountMessage,
) => {
const publisher: KafkaPublisher = new KafkaPublisher();
const consolidatedBeforeSubaccount:
ConsolidatedKafkaEvent = createConsolidatedKafkaEventFromSubaccount(
beforeSubaccount,
);
const consolidatedAfterSubaccount:
ConsolidatedKafkaEvent = createConsolidatedKafkaEventFromSubaccount(
afterSubaccount,
);

publisher.addEvents([
consolidatedAfterSubaccount,
consolidatedSubaccount,
consolidatedBeforeSubaccount,
]);

publisher.sortEvents(publisher.subaccountMessages);
expect(publisher.subaccountMessages).toEqual([beforeSubaccount, subaccount, afterSubaccount]);
});
});

describe('aggregateFillEventsForSubaccountMessages', () => {
const fill: FillFromDatabase = {
id: FillTable.uuid(testConstants.defaultTendermintEventId, Liquidity.TAKER),
subaccountId: testConstants.defaultSubaccountId,
side: OrderSide.BUY,
liquidity: Liquidity.TAKER,
type: FillType.MARKET,
clobPairId: '1',
orderId: testConstants.defaultOrderId,
size: '10',
price: '20000',
quoteAmount: '200000',
eventId: testConstants.defaultTendermintEventId,
transactionHash: '', // TODO: Add a real transaction Hash
createdAt: testConstants.createdDateTime.toISO(),
createdAtHeight: testConstants.createdHeight,
clientMetadata: '0',
fee: '1.1',
};
const order: OrderFromDatabase = {
...testConstants.defaultOrderGoodTilBlockTime,
id: testConstants.defaultOrderId,
};

const recipientSubaccountId: IndexerSubaccountId = IndexerSubaccountId.fromPartial({
owner: 'recipient',
number: 1,
});
const deposit: TransferFromDatabase = {
id: '',
senderWalletAddress: defaultWalletAddress,
recipientSubaccountId: SubaccountTable.uuid(
recipientSubaccountId.owner,
recipientSubaccountId.number,
),
assetId: testConstants.defaultAsset.id,
size: '10',
eventId: testConstants.defaultTendermintEventId,
transactionHash: 'hash',
createdAt: DateTime.utc().toISO(),
createdAtHeight: '1',
};
it('successfully aggregates all fill events per order id and sorts messages', async () => {
const publisher: KafkaPublisher = new KafkaPublisher();

// merged with message 3.
const msg1Contents: SubaccountMessageContents = {
vincentwschau marked this conversation as resolved.
Show resolved Hide resolved
fills: [
generateFillSubaccountMessage(fill, 'BTC-USD'),
],
orders: [
generateOrderSubaccountMessage(order, 'BTC-USD'),
],
};
const message1: AnnotatedSubaccountMessage = {
vincentwschau marked this conversation as resolved.
Show resolved Hide resolved
blockHeight: '1',
transactionIndex: 1,
eventIndex: 1,
contents: JSON.stringify(msg1Contents),
subaccountId: {
owner: 'owner1',
number: 0,
},
version: '1',
orderId: 'order1',
isFill: true,
subaccountMessageContents: msg1Contents,
};

const msg2Contents: SubaccountMessageContents = {
fills: [
generateFillSubaccountMessage(fill, 'ETH-USD'),
],
};
const message2: AnnotatedSubaccountMessage = {
...message1,
transactionIndex: 2,
contents: JSON.stringify(msg2Contents),
orderId: 'order2',
subaccountMessageContents: msg2Contents,
};

const msg3Contents: SubaccountMessageContents = {
fills: [
generateFillSubaccountMessage({
...fill,
size: '100',
}, 'BTC-USD'),
],
orders: [
generateOrderSubaccountMessage({
...order,
status: OrderStatus.FILLED,
}, 'BTC-USD'),
],
};
const message3: AnnotatedSubaccountMessage = {
...message1,
transactionIndex: 3,
contents: JSON.stringify(msg3Contents),
subaccountMessageContents: msg3Contents,
};

// non-fill subaccount message.
const msg4Contents: SubaccountMessageContents = generateTransferContents(
deposit,
testConstants.defaultAsset,
recipientSubaccountId,
undefined,
recipientSubaccountId,
);
const message4: AnnotatedSubaccountMessage = {
...message1,
eventIndex: 4,
orderId: undefined,
isFill: undefined,
contents: JSON.stringify(msg4Contents),
};

const expectedMergedContents: SubaccountMessageContents = {
fills: [
msg1Contents.fills![0],
msg3Contents.fills![0],
],
orders: [
msg3Contents.orders![0],
],
};
const mergedMessage3: AnnotatedSubaccountMessage = {
...message3,
contents: JSON.stringify(expectedMergedContents),
subaccountMessageContents: expectedMergedContents,
};

publisher.addEvents([
createConsolidatedKafkaEventFromSubaccount(message1),
createConsolidatedKafkaEventFromSubaccount(message2),
createConsolidatedKafkaEventFromSubaccount(message3),
createConsolidatedKafkaEventFromSubaccount(message4),
]);

publisher.aggregateFillEventsForSubaccountMessages();
const expectedMsgs: SubaccountMessage[] = [
convertToSubaccountMessage(message4),
convertToSubaccountMessage(message2),
convertToSubaccountMessage(mergedMessage3),
Comment on lines +385 to +387
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the right ordering? Shouldn't message 4 come after message 3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.

message 4 has txn index 1/event index 4.
msg 2 has txn index 2/ event index1.
msg 3 has txn index 3/event index 1.

];
expect(publisher.subaccountMessages).toEqual(expectedMsgs);

await publisher.publish();

expect(producerSendMock).toHaveBeenCalledTimes(1);
expect(producerSendMock).toHaveBeenCalledWith({
topic: KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS,
messages: _.map(expectedMsgs, (message: SubaccountMessage) => {
return {
value: Buffer.from(Uint8Array.from(SubaccountMessage.encode(message).finish())),
};
}),
});
});
});

describe('groupKafkaTradesByClobPairId', () => {
it('successfully groups kafka trade messages', () => {
const kafkaPublisher: KafkaPublisher = new KafkaPublisher();
Expand Down
14 changes: 11 additions & 3 deletions indexer/services/ender/src/handlers/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,21 @@ import {
TRADES_WEBSOCKET_MESSAGE_VERSION,
KafkaTopics,
} from '@dydxprotocol-indexer/kafka';
import { SubaccountMessageContents } from '@dydxprotocol-indexer/postgres';
import {
IndexerTendermintBlock,
IndexerTendermintEvent,
MarketMessage,
OffChainUpdateV1,
SubaccountId,
SubaccountMessage,
} from '@dydxprotocol-indexer/v4-protos';
import { DateTime } from 'luxon';

import config from '../config';
import { indexerTendermintEventToTransactionIndex } from '../lib/helper';
import { ConsolidatedKafkaEvent, EventMessage, SingleTradeMessage } from '../lib/types';
import {
AnnotatedSubaccountMessage, ConsolidatedKafkaEvent, EventMessage, SingleTradeMessage,
} from '../lib/types';

export type HandlerInitializer = new (
block: IndexerTendermintBlock,
Expand Down Expand Up @@ -103,9 +105,12 @@ export abstract class Handler<T> {
protected generateConsolidatedSubaccountKafkaEvent(
contents: string,
subaccountId: SubaccountId,
orderId?: string,
isFill?: boolean,
subaccountMessageContents?: SubaccountMessageContents,
): ConsolidatedKafkaEvent {
stats.increment(`${config.SERVICE_NAME}.create_subaccount_kafka_event`, 1);
const subaccountMessage: SubaccountMessage = {
const subaccountMessage: AnnotatedSubaccountMessage = {
blockHeight: this.block.height.toString(),
transactionIndex: indexerTendermintEventToTransactionIndex(
this.indexerTendermintEvent,
Expand All @@ -114,6 +119,9 @@ export abstract class Handler<T> {
contents,
subaccountId,
version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
orderId,
isFill,
subaccountMessageContents,
};

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@ export abstract class AbstractOrderFillHandler<T> extends Handler<T> {
return this.generateConsolidatedSubaccountKafkaEvent(
JSON.stringify(message),
subaccountIdProto,
order?.id,
true,
vincentwschau marked this conversation as resolved.
Show resolved Hide resolved
message,
);
}

Expand Down
Loading