Skip to content

Commit

Permalink
decode message in socks only once (backport #1646) (#1657)
Browse files Browse the repository at this point in the history
Co-authored-by: shrenujb <98204323+shrenujb@users.noreply.github.com>
Co-authored-by: Christopher Li <Christopherfooli@gmail.com>
  • Loading branch information
3 people authored Jun 10, 2024
1 parent 37b4368 commit 444676f
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 26 deletions.
71 changes: 68 additions & 3 deletions indexer/services/socks/src/helpers/from-kafka-helpers.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import { logger } from '@dydxprotocol-indexer/base';
import {
parentSubaccountHelpers,
perpetualMarketRefresher,
PROTO_TO_CANDLE_RESOLUTION,
parentSubaccountHelpers,
SubaccountMessageContents,
} from '@dydxprotocol-indexer/postgres';
import { getParentSubaccountNum } from '@dydxprotocol-indexer/postgres/build/src/lib/parent-subaccount-helpers';
import {
CandleMessage,
CandleMessage_Resolution,
MarketMessage,
OrderbookMessage,
TradeMessage,
SubaccountMessage,
CandleMessage_Resolution,
TradeMessage,
} from '@dydxprotocol-indexer/v4-protos';
import { KafkaMessage } from 'kafkajs';

Expand All @@ -30,6 +30,7 @@ export function getChannels(topic: string): Channel[] {
return TOPIC_TO_CHANNEL[topicEnum];
}

// TODO: remove this function and fix all tests to use getMessagesToForward instead
export function getMessageToForward(
channel: Channel,
message: KafkaMessage,
Expand Down Expand Up @@ -103,6 +104,70 @@ export function getMessageToForward(
}
}

export function getMessagesToForward(topic: string, message: KafkaMessage): MessageToForward[] {
if (!message || !message.value) {
throw new InvalidForwardMessageError('Got empty kafka message');
}
const messageBinary: Uint8Array = new Uint8Array(message.value);

switch (topic) {
case WebsocketTopics.TO_WEBSOCKETS_CANDLES: {
const candleMessage: CandleMessage = CandleMessage.decode(messageBinary);
return [{
channel: Channel.V4_CANDLES,
id: getCandleMessageId(candleMessage),
contents: JSON.parse(candleMessage.contents),
version: candleMessage.version,
}];
}
case WebsocketTopics.TO_WEBSOCKETS_MARKETS: {
const marketMessage: MarketMessage = MarketMessage.decode(messageBinary);
return [{
channel: Channel.V4_MARKETS,
id: V4_MARKETS_ID,
contents: JSON.parse(marketMessage.contents),
version: marketMessage.version,
}];
}
case WebsocketTopics.TO_WEBSOCKETS_ORDERBOOKS: {
const orderbookMessage: OrderbookMessage = OrderbookMessage.decode(messageBinary);
return [{
channel: Channel.V4_ORDERBOOK,
id: getTickerOrThrow(orderbookMessage.clobPairId),
contents: JSON.parse(orderbookMessage.contents),
version: orderbookMessage.version,
}];
}
case WebsocketTopics.TO_WEBSOCKETS_TRADES: {
const tradeMessage: TradeMessage = TradeMessage.decode(messageBinary);
return [{
channel: Channel.V4_TRADES,
id: getTickerOrThrow(tradeMessage.clobPairId),
contents: JSON.parse(tradeMessage.contents),
version: tradeMessage.version,
}];
}
case WebsocketTopics.TO_WEBSOCKETS_SUBACCOUNTS: {
const subaccountMessage: SubaccountMessage = SubaccountMessage.decode(messageBinary);
return [{
channel: Channel.V4_ACCOUNTS,
id: getSubaccountMessageId(subaccountMessage),
contents: JSON.parse(subaccountMessage.contents),
version: subaccountMessage.version,
},
{
channel: Channel.V4_PARENT_ACCOUNTS,
id: getParentSubaccountMessageId(subaccountMessage),
subaccountNumber: subaccountMessage.subaccountId!.number,
contents: getParentSubaccountContents(subaccountMessage),
version: subaccountMessage.version,
}];
}
default:
throw new InvalidForwardMessageError(`Unknown topic: ${topic}`);
}
}

function getTickerOrThrow(clobPairId: string): string {
const ticker: string | undefined = perpetualMarketRefresher.getPerpetualMarketTicker(clobPairId);
if (ticker === undefined) {
Expand Down
46 changes: 23 additions & 23 deletions indexer/services/socks/src/lib/message-forwarder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,13 @@ export class MessageForwarder {

if (!this.subscriptions.subscriptions[message.channel] &&
!this.subscriptions.batchedSubscriptions[message.channel]) {
// logger.debug({
// at: 'message-forwarder#forwardMessage',
// message: 'No clients to forward to',
// messageId: message.id,
// messageChannel: message.channel,
// contents: message.contents,
// });
logger.debug({
at: 'message-forwarder#forwardMessage',
message: 'No clients to forward to',
messageId: message.id,
messageChannel: message.channel,
contents: message.contents,
});
return;
}

Expand All @@ -182,25 +182,25 @@ export class MessageForwarder {
}
let forwardedToSubscribers: boolean = false;

// if (subscriptions.length > 0) {
// if (message.channel !== Channel.V4_ORDERBOOK ||
// (
// // Don't log orderbook messages unless enabled
// message.channel === Channel.V4_ORDERBOOK && config.ENABLE_ORDERBOOK_LOGS
// )
// ) {
// logger.debug({
// at: 'message-forwarder#forwardMessage',
// message: 'Forwarding message to clients..',
// messageContents: message,
// connectionIds: subscriptions.map((s: SubscriptionInfo) => s.connectionId),
// });
// }
// }
if (subscriptions.length > 0) {
if (message.channel !== Channel.V4_ORDERBOOK ||
(
// Don't log orderbook messages unless enabled
message.channel === Channel.V4_ORDERBOOK && config.ENABLE_ORDERBOOK_LOGS
)
) {
logger.debug({
at: 'message-forwarder#forwardMessage',
message: 'Forwarding message to clients..',
messageContents: message,
connectionIds: subscriptions.map((s: SubscriptionInfo) => s.connectionId),
});
}
}

// Buffer messages if the subscription is for batched messages
if (this.subscriptions.batchedSubscriptions[message.channel] &&
this.subscriptions.batchedSubscriptions[message.channel][message.id]) {
this.subscriptions.batchedSubscriptions[message.channel][message.id]) {
const bufferKey: string = this.getMessageBufferKey(
message.channel,
message.id,
Expand Down

0 comments on commit 444676f

Please sign in to comment.