Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dydxwill committed Oct 18, 2023
1 parent 4cd6724 commit 3e09681
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { TRADES_WEBSOCKET_MESSAGE_VERSION, KafkaTopics } from '@dydxprotocol-ind
import { testConstants, TradeContent, TradeMessageContents } from '@dydxprotocol-indexer/postgres';
import { TradeMessage } from '@dydxprotocol-indexer/v4-protos';

import { ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types';
import { AnnotatedSubaccountMessage, ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types';

export function contentToTradeMessage(
tradeContent: TradeContent,
Expand Down Expand Up @@ -40,3 +40,12 @@ export function createConsolidatedKafkaEventFromTrade(
message: trade,
};
}

export function createConsolidatedKafkaEventFromSubaccount(
subaccount: AnnotatedSubaccountMessage,
): ConsolidatedKafkaEvent {
return {
topic: KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS,
message: subaccount,
};
}
161 changes: 160 additions & 1 deletion indexer/services/ender/__tests__/lib/kafka-publisher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ import { testConstants, TradeContent, TradeMessageContents } from '@dydxprotocol
import { SubaccountMessage, TradeMessage } from '@dydxprotocol-indexer/v4-protos';
import Big from 'big.js';
import _ from 'lodash';
import { ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types';
import {
AnnotatedSubaccountMessage,
ConsolidatedKafkaEvent,
convertToSubaccountMessage,
SingleTradeMessage,
} from '../../src/lib/types';

import { KafkaPublisher } from '../../src/lib/kafka-publisher';
import {
Expand All @@ -17,6 +22,7 @@ import {
import {
contentToSingleTradeMessage,
contentToTradeMessage,
createConsolidatedKafkaEventFromSubaccount,
createConsolidatedKafkaEventFromTrade,
} from '../helpers/kafka-publisher-helpers';

Expand Down Expand Up @@ -148,6 +154,159 @@ describe('kafka-publisher', () => {
});
});

describe('sortSubaccountEvents', () => {
const subaccount: AnnotatedSubaccountMessage = defaultSubaccountMessage;
const consolidatedSubaccount:
ConsolidatedKafkaEvent = createConsolidatedKafkaEventFromSubaccount(subaccount);
it.each([
[
'blockHeight',
{
...subaccount,
blockHeight: Big(subaccount.blockHeight).minus(1).toString(),
},
{
...subaccount,
blockHeight: Big(subaccount.blockHeight).plus(1).toString(),
},
],
[
'transactionIndex',
{
...subaccount,
transactionIndex: subaccount.transactionIndex - 1,
},
{
...subaccount,
transactionIndex: subaccount.transactionIndex + 1,
},
],
[
'eventIndex',
{
...subaccount,
eventIndex: subaccount.eventIndex - 1,
},
{
...subaccount,
eventIndex: subaccount.eventIndex + 1,
},
],
])('successfully subaccounts events by %s', (
_field: string,
beforeSubaccount: AnnotatedSubaccountMessage,
afterSubaccount: AnnotatedSubaccountMessage,
) => {
const publisher: KafkaPublisher = new KafkaPublisher();
const consolidatedBeforeSubaccount:
ConsolidatedKafkaEvent = createConsolidatedKafkaEventFromSubaccount(
beforeSubaccount,
);
const consolidatedAfterSubaccount:
ConsolidatedKafkaEvent = createConsolidatedKafkaEventFromSubaccount(
afterSubaccount,
);

publisher.addEvents([
consolidatedAfterSubaccount,
consolidatedSubaccount,
consolidatedBeforeSubaccount,
]);

publisher.sortEvents(KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS);
expect(publisher.subaccountMessages).toEqual([beforeSubaccount, subaccount, afterSubaccount]);
});
});

describe('retainLastFillEventsForSubaccountMessages', () => {
it('successfully retains the last fill event per order id and sorts messages', async () => {
const publisher: KafkaPublisher = new KafkaPublisher();

// over-written by message 3.
const message1: AnnotatedSubaccountMessage = {
blockHeight: '1',
transactionIndex: 1,
eventIndex: 1,
contents: 'Message 1',
subaccountId: {
owner: 'owner1',
number: 0,
},
version: '1',
orderId: 'order1',
isFill: true,
};

const message2: AnnotatedSubaccountMessage = {
blockHeight: '1',
transactionIndex: 2,
eventIndex: 2,
contents: 'Message 2',
subaccountId: {
owner: 'owner2',
number: 0,
},
version: '1',
orderId: 'order2',
isFill: true,
};

const message3: AnnotatedSubaccountMessage = {
blockHeight: '1',
transactionIndex: 3,
eventIndex: 3,
contents: 'Message 3',
subaccountId: {
owner: 'owner3',
number: 0,
},
version: '1',
orderId: 'order1',
isFill: true,
};

// non-fill subaccount message.
const message4: AnnotatedSubaccountMessage = {
blockHeight: '1',
transactionIndex: 3,
eventIndex: 4,
contents: 'Message 3',
subaccountId: {
owner: 'owner3',
number: 0,
},
version: '4',
};

publisher.addEvents([
createConsolidatedKafkaEventFromSubaccount(message1),
createConsolidatedKafkaEventFromSubaccount(message2),
createConsolidatedKafkaEventFromSubaccount(message3),
createConsolidatedKafkaEventFromSubaccount(message4),
]);

publisher.retainLastFillEventsForSubaccountMessages();
const expectedMsgs: SubaccountMessage[] = [
convertToSubaccountMessage(message2),
convertToSubaccountMessage(message3),
convertToSubaccountMessage(message4),
];
expect(publisher.subaccountMessages).toEqual(expectedMsgs);

await publisher.publish();

expect(producerSendMock).toHaveBeenCalledTimes(1);
expect(producerSendMock).toHaveBeenCalledWith({
topic: KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS,
messages: _.map(expectedMsgs, (message: SubaccountMessage) => {
return {
value: Buffer.from(Uint8Array.from(SubaccountMessage.encode(message).finish())),
};
}),
});
});
});

describe('groupKafkaTradesByClobPairId', () => {
it('successfully groups kafka trade messages', () => {
const kafkaPublisher: KafkaPublisher = new KafkaPublisher();
Expand Down
17 changes: 12 additions & 5 deletions indexer/services/ender/src/lib/kafka-publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ import { TradeMessageContents } from '@dydxprotocol-indexer/postgres';
import {
CandleMessage, MarketMessage, OffChainUpdateV1, SubaccountMessage, TradeMessage,
} from '@dydxprotocol-indexer/v4-protos';
import Big from 'big.js';
import _ from 'lodash';

import config from '../config';
import {
AnnotatedSubaccountMessage, ConsolidatedKafkaEvent, SingleTradeMessage, VulcanMessage,
AnnotatedSubaccountMessage,
ConsolidatedKafkaEvent,
convertToSubaccountMessage,
SingleTradeMessage,
VulcanMessage,
} from './types';

type TopicKafkaMessages = {
Expand Down Expand Up @@ -99,7 +104,9 @@ export class KafkaPublisher {
}
});

this.subaccountMessages = Object.values(lastFillEvents).concat(nonFillEvents);
this.subaccountMessages = Object.values(lastFillEvents)
.concat(nonFillEvents)
.map((annotatedMessage) => convertToSubaccountMessage(annotatedMessage));
this.sortEvents(KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS);
}

Expand Down Expand Up @@ -131,9 +138,9 @@ export class KafkaPublisher {

if (msgs) {
msgs.sort((a: OrderedMessage, b: OrderedMessage) => {
if (a.blockHeight < b.blockHeight) {
if (Big(a.blockHeight).lt(b.blockHeight)) {
return -1;
} else if (a.blockHeight > b.blockHeight) {
} else if (Big(a.blockHeight).gt(b.blockHeight)) {
return 1;
}

Expand All @@ -143,7 +150,7 @@ export class KafkaPublisher {
return 1;
}

return a.eventIndex - b.eventIndex;
return a.eventIndex < b.eventIndex ? -1 : 1;
});
}
}
Expand Down
8 changes: 8 additions & 0 deletions indexer/services/ender/src/lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
UpdatePerpetualEventV1,
UpdateClobPairEventV1,
} from '@dydxprotocol-indexer/v4-protos';
import _ from 'lodash';
import Long from 'long';
import { DateTime } from 'luxon';

Expand Down Expand Up @@ -193,6 +194,13 @@ export interface AnnotatedSubaccountMessage extends SubaccountMessage {
isFill?: boolean,
}

export function convertToSubaccountMessage(
annotatedMessage: AnnotatedSubaccountMessage,
): SubaccountMessage {
const subaccountMessage: SubaccountMessage = _.omit(annotatedMessage, ['orderId', 'isFill']);
return subaccountMessage;
}

export interface VulcanMessage {
key: Buffer,
value: OffChainUpdateV1,
Expand Down

0 comments on commit 3e09681

Please sign in to comment.