Skip to content

Commit

Permalink
[IND-288]: Remove unused socks fields and add candle resolutions for …
Browse files Browse the repository at this point in the history
…websockets (#633)
  • Loading branch information
Christopher-Li authored Oct 16, 2023
1 parent a5a391d commit 4906fc4
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 74 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { CandleResolution } from './candle-types';
import { FillType, Liquidity } from './fill-types';
import {
OrderSide,
Expand Down Expand Up @@ -226,8 +227,8 @@ export interface OraclePriceMarket {

/* ------- CandleMessageContents ------- */

// Does not include resolution, because resolution is included in CandleMessage
export interface CandleMessageContents {
resolution: CandleResolution,
startedAt: IsoString,
ticker: string,
low: string,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ function verifyAllCandlesEqualsKafkaMessages(
Partial<Record<CandleResolution, CandleMessageContents>> = _.chain(expectedCandles)
.keyBy(CandleColumns.resolution)
.mapValues((candle: CandleFromDatabase) => {
return _.omit(candle, [CandleColumns.id, CandleColumns.resolution]);
return _.omit(candle, [CandleColumns.id]);
})
.value();

Expand Down Expand Up @@ -516,7 +516,7 @@ function verifyContainsCandlesKafkaMessages(

_.forEach(expectedCandles, (expectedCandle: CandleFromDatabase) => {
expect(
_.omit(expectedCandle, [CandleColumns.id, CandleColumns.resolution]),
_.omit(expectedCandle, [CandleColumns.id]),
).toEqual(resolutionToContent[expectedCandle.resolution]);
});
}
Expand Down
2 changes: 1 addition & 1 deletion indexer/services/ender/src/lib/candles-generator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ export class CandlesGenerator {
_.forEach(candles, (candle: CandleFromDatabase) => {
const candleMessageContents: CandleMessageContents = _.omit(
candle,
[CandleColumns.id, CandleColumns.resolution],
[CandleColumns.id],
);
const message: CandleMessage = {
contents: JSON.stringify(candleMessageContents),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
import { getChannel, getMessageToForward } from '../../src/helpers/from-kafka-helpers';
import { InvalidForwardMessageError, InvalidTopicError } from '../../src/lib/errors';
import {
CandlesChannelMessageToForward,
Channel,
MessageToForward,
OrderbooksChannelMessageToForward,
SubaccountsChannelMessageToForward,
TradesChannelMessageToForward,
WebsocketTopics,
} from '../../src/types';
import {
Expand All @@ -26,7 +22,7 @@ import {
import { KafkaMessage } from 'kafkajs';
import { createKafkaMessage } from './kafka';
import {
CandleMessage, CandleMessage_Resolution,
CandleMessage,
MarketMessage,
OrderbookMessage,
SubaccountMessage,
Expand All @@ -37,7 +33,6 @@ import {
dbHelpers,
testMocks,
perpetualMarketRefresher,
PROTO_TO_CANDLE_RESOLUTION,
CandleResolution,
} from '@dydxprotocol-indexer/postgres';

Expand Down Expand Up @@ -74,37 +69,28 @@ describe('from-kafka-helpers', () => {
const message: KafkaMessage = createKafkaMessage(
Buffer.from(Uint8Array.from(SubaccountMessage.encode(subaccountMessage).finish())),
);
const messageToForward: SubaccountsChannelMessageToForward = getMessageToForward(
const messageToForward: MessageToForward = getMessageToForward(
Channel.V4_ACCOUNTS,
message,
) as SubaccountsChannelMessageToForward;
);

expect(messageToForward.channel).toEqual(Channel.V4_ACCOUNTS);
expect(messageToForward.id).toEqual(`${defaultOwner}/${defaultAccNumber}`);
expect(messageToForward.contents).toEqual(defaultContents);
expect(messageToForward.blockHeight).toEqual(subaccountMessage.blockHeight);
expect(messageToForward.transactionIndex).toEqual(subaccountMessage.transactionIndex);
expect(messageToForward.eventIndex).toEqual(subaccountMessage.eventIndex);
});

it('gets correct MessageToForward for candles message', () => {
const message: KafkaMessage = createKafkaMessage(
Buffer.from(Uint8Array.from(CandleMessage.encode(candlesMessage).finish())),
);
const messageToForward: CandlesChannelMessageToForward = getMessageToForward(
const messageToForward: MessageToForward = getMessageToForward(
Channel.V4_CANDLES,
message,
) as CandlesChannelMessageToForward;
);

expect(messageToForward.channel).toEqual(Channel.V4_CANDLES);
expect(messageToForward.id).toEqual(`${btcTicker}/${CandleResolution.ONE_MINUTE}`);
expect(messageToForward.contents).toEqual(defaultContents);
expect(messageToForward.clobPairId).toEqual(candlesMessage.clobPairId);
if (candlesMessage.resolution !== CandleMessage_Resolution.UNRECOGNIZED) {
expect(messageToForward.resolution).toEqual(
PROTO_TO_CANDLE_RESOLUTION[candlesMessage.resolution],
);
}
});

it('gets correct MessageToForward for market message', () => {
Expand All @@ -122,30 +108,28 @@ describe('from-kafka-helpers', () => {
const message: KafkaMessage = createKafkaMessage(
Buffer.from(Uint8Array.from(OrderbookMessage.encode(orderbookMessage).finish())),
);
const messageToForward: OrderbooksChannelMessageToForward = getMessageToForward(
const messageToForward: MessageToForward = getMessageToForward(
Channel.V4_ORDERBOOK,
message,
) as OrderbooksChannelMessageToForward;
);

expect(messageToForward.channel).toEqual(Channel.V4_ORDERBOOK);
expect(messageToForward.id).toEqual(btcTicker);
expect(messageToForward.contents).toEqual(defaultContents);
expect(messageToForward.clobPairId).toEqual(orderbookMessage.clobPairId);
});

it('gets correct MessageToForward for trade message', () => {
const message: KafkaMessage = createKafkaMessage(
Buffer.from(Uint8Array.from(TradeMessage.encode(tradesMessage).finish())),
);
const messageToForward: TradesChannelMessageToForward = getMessageToForward(
const messageToForward: MessageToForward = getMessageToForward(
Channel.V4_TRADES,
message,
) as TradesChannelMessageToForward;
);

expect(messageToForward.channel).toEqual(Channel.V4_TRADES);
expect(messageToForward.id).toEqual(btcTicker);
expect(messageToForward.contents).toEqual(defaultContents);
expect(messageToForward.blockHeight).toEqual(tradesMessage.blockHeight);
});

it('throws InvalidForwardMessageError for empty message', () => {
Expand Down
8 changes: 0 additions & 8 deletions indexer/services/socks/src/helpers/from-kafka-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ export function getMessageToForward(
channel,
id: getSubaccountMessageId(subaccountMessage),
contents: JSON.parse(subaccountMessage.contents),
blockHeight: subaccountMessage.blockHeight,
transactionIndex: subaccountMessage.transactionIndex,
eventIndex: subaccountMessage.eventIndex,
version: subaccountMessage.version,
};
}
Expand All @@ -57,8 +54,6 @@ export function getMessageToForward(
channel,
id: getCandleMessageId(candleMessage),
contents: JSON.parse(candleMessage.contents),
clobPairId: candleMessage.clobPairId,
resolution: PROTO_TO_CANDLE_RESOLUTION[candleMessage.resolution],
version: candleMessage.version,
};
}
Expand All @@ -77,7 +72,6 @@ export function getMessageToForward(
channel,
id: getTickerOrThrow(orderbookMessage.clobPairId),
contents: JSON.parse(orderbookMessage.contents),
clobPairId: orderbookMessage.clobPairId,
version: orderbookMessage.version,
};
}
Expand All @@ -87,8 +81,6 @@ export function getMessageToForward(
channel,
id: getTickerOrThrow(tradeMessage.clobPairId),
contents: JSON.parse(tradeMessage.contents),
blockHeight: tradeMessage.blockHeight,
clobPairId: tradeMessage.clobPairId,
version: tradeMessage.version,
};
}
Expand Down
38 changes: 1 addition & 37 deletions indexer/services/socks/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { IncomingMessage as IncomingMessageHttp } from 'http';

import { CandleResolution } from '@dydxprotocol-indexer/postgres';
import express from 'express';
import WebSocket from 'ws';

Expand Down Expand Up @@ -122,49 +121,14 @@ export interface Connection {
disconnect?: NodeJS.Timeout;
}

export interface MessageToForwardBase {
export interface MessageToForward {
channel: Channel;
id: string;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
contents: any;
version: string;
}

export type OrderbooksChannelMessageToForward = MessageToForwardBase & {
channel: Channel.V4_ORDERBOOK;
clobPairId: string;
};

export type SubaccountsChannelMessageToForward = MessageToForwardBase & {
channel: Channel.V4_ACCOUNTS;
blockHeight: string;
transactionIndex: number;
eventIndex: number;
subaccountId?: string;
};

export type TradesChannelMessageToForward = MessageToForwardBase & {
channel: Channel.V4_TRADES;
blockHeight: string;
clobPairId: string;
};

export type MarketsChannelMessageToForward = MessageToForwardBase & {
channel: Channel.V4_MARKETS;
};

export type CandlesChannelMessageToForward = MessageToForwardBase & {
channel: Channel.V4_CANDLES;
clobPairId: string;
resolution: CandleResolution;
};

export type MessageToForward = OrderbooksChannelMessageToForward
| SubaccountsChannelMessageToForward
| TradesChannelMessageToForward
| MarketsChannelMessageToForward
| CandlesChannelMessageToForward;

export interface ResponseWithBody extends express.Response {
body: unknown
}
Expand Down

0 comments on commit 4906fc4

Please sign in to comment.