Skip to content

Commit

Permalink
Indexer e2e latency round 2 (backport #1314) (#1558)
Browse files Browse the repository at this point in the history
Co-authored-by: Jonathan Fung <121899091+jonfung-dydx@users.noreply.github.com>
Co-authored-by: Will Liu <will@dydx.exchange>
  • Loading branch information
3 people authored May 21, 2024
1 parent e5841c3 commit a59d580
Show file tree
Hide file tree
Showing 16 changed files with 228 additions and 61 deletions.
1 change: 1 addition & 0 deletions indexer/packages/v4-protos/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ export * from './codegen/google/protobuf/timestamp';
export * from './codegen/dydxprotocol/indexer/protocol/v1/clob';
export * from './codegen/dydxprotocol/indexer/protocol/v1/subaccount';
export * from './codegen/dydxprotocol/indexer/shared/removal_reason';
export * from './utils';
12 changes: 12 additions & 0 deletions indexer/packages/v4-protos/src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { Timestamp } from './codegen/google/protobuf/timestamp';

export const MILLIS_IN_NANOS: number = 1_000_000;
export const SECONDS_IN_MILLIS: number = 1_000;
export function protoTimestampToDate(
protoTime: Timestamp,
): Date {
const timeInMillis: number = Number(protoTime.seconds) * SECONDS_IN_MILLIS +
Math.floor(protoTime.nanos / MILLIS_IN_NANOS);

return new Date(timeInMillis);
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import {
PerpetualMarketCreateEventV1,
PerpetualMarketCreateEventV2,
DeleveragingEventV1,
protoTimestampToDate,
} from '@dydxprotocol-indexer/v4-protos';
import {
PerpetualMarketType,
Expand All @@ -64,7 +65,6 @@ import {
generatePerpetualMarketMessage,
generatePerpetualPositionsContents,
} from '../../src/helpers/kafka-helper';
import { protoTimestampToDate } from '../../src/lib/helper';
import { DydxIndexerSubtypes, VulcanMessage } from '../../src/lib/types';

// TX Hash is SHA256, so is of length 64 hexadecimal without the '0x'.
Expand Down
14 changes: 0 additions & 14 deletions indexer/services/ender/src/lib/helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {
import {
IndexerTendermintEvent,
IndexerTendermintEvent_BlockEvent,
Timestamp,
OrderFillEventV1,
MarketEventV1,
SubaccountUpdateEventV1,
Expand All @@ -32,10 +31,6 @@ import Big from 'big.js';
import _ from 'lodash';
import { DateTime } from 'luxon';

import {
MILLIS_IN_NANOS,
SECONDS_IN_MILLIS,
} from '../constants';
import {
AnnotatedSubaccountMessage,
DydxIndexerSubtypes,
Expand Down Expand Up @@ -73,15 +68,6 @@ export function convertToSubaccountMessage(
return subaccountMessage;
}

export function protoTimestampToDate(
protoTime: Timestamp,
): Date {
const timeInMillis: number = Number(protoTime.seconds) * SECONDS_IN_MILLIS +
Math.floor(protoTime.nanos / MILLIS_IN_NANOS);

return new Date(timeInMillis);
}

export function dateToDateTime(
protoTime: Date,
): DateTime {
Expand Down
13 changes: 7 additions & 6 deletions indexer/services/socks/src/lib/message-forwarder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ export class MessageForwarder {
}

public onMessage(topic: string, message: KafkaMessage): void {
const start: number = Date.now();
stats.timing(
`${config.SERVICE_NAME}.message_time_in_queue`,
Date.now() - Number(message.timestamp),
start - Number(message.timestamp),
config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE,
{
topic,
Expand Down Expand Up @@ -184,10 +185,10 @@ export class MessageForwarder {

if (subscriptions.length > 0) {
if (message.channel !== Channel.V4_ORDERBOOK ||
(
// Don't log orderbook messages unless enabled
message.channel === Channel.V4_ORDERBOOK && config.ENABLE_ORDERBOOK_LOGS
)
(
// Don't log orderbook messages unless enabled
message.channel === Channel.V4_ORDERBOOK && config.ENABLE_ORDERBOOK_LOGS
)
) {
logger.debug({
at: 'message-forwarder#forwardMessage',
Expand All @@ -200,7 +201,7 @@ export class MessageForwarder {

// Buffer messages if the subscription is for batched messages
if (this.subscriptions.batchedSubscriptions[message.channel] &&
this.subscriptions.batchedSubscriptions[message.channel][message.id]) {
this.subscriptions.batchedSubscriptions[message.channel][message.id]) {
const bufferKey: string = this.getMessageBufferKey(
message.channel,
message.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import { expectCanceledOrderStatus, expectOpenOrderIds, handleInitialOrderPlace
import { expectOffchainUpdateMessage, expectWebsocketOrderbookMessage, expectWebsocketSubaccountMessage } from '../helpers/websocket-helpers';
import { OrderbookSide } from '../../src/lib/types';
import { getOrderIdHash, isLongTermOrder, isStatefulOrder } from '@dydxprotocol-indexer/v4-proto-parser';
import { defaultKafkaHeaders } from '../helpers/constants';
import config from '../../src/config';

jest.mock('@dydxprotocol-indexer/base', () => ({
Expand Down Expand Up @@ -196,6 +197,12 @@ describe('order-place-handler', () => {
const replacementMessageIoc: KafkaMessage = createKafkaMessage(
Buffer.from(Uint8Array.from(OffChainUpdateV1.encode(replacementUpdateIoc).finish())),
);
[replacementMessage, replacementMessageGoodTilBlockTime, replacementMessageConditional,
replacementMessageFok, replacementMessageIoc].forEach((message) => {
// eslint-disable-next-line no-param-reassign
message.headers = defaultKafkaHeaders;
});

const dbDefaultOrder: OrderFromDatabase = {
...testConstants.defaultOrder,
id: testConstants.defaultOrderId,
Expand Down Expand Up @@ -1225,7 +1232,11 @@ function expectWebsocketMessagesSent(
version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
});

expectWebsocketSubaccountMessage(producerSendSpy.mock.calls[callIndex][0], subaccountMessage);
expectWebsocketSubaccountMessage(
producerSendSpy.mock.calls[callIndex][0],
subaccountMessage,
defaultKafkaHeaders,
);
callIndex += 1;
}

Expand Down
Loading

0 comments on commit a59d580

Please sign in to comment.