From 535d05c6c29e8094864bba533cb9192036ced1ac Mon Sep 17 00:00:00 2001 From: vincentwschau <99756290+vincentwschau@users.noreply.github.com> Date: Tue, 24 Oct 2023 13:18:26 -0400 Subject: [PATCH] [IND-402] Have vulcan re-queue cached order updates instead of ender. (#688) --- .../stateful-order-placement-handler.test.ts | 62 +------------- .../stateful-order-placement-handler.ts | 18 ----- .../handlers/order-place-handler.test.ts | 80 +++++++++++++++++-- .../__tests__/helpers/websocket-helpers.ts | 19 ++++- ...er.test.ts => send-message-helper.test.ts} | 44 ++++++---- indexer/services/vulcan/src/config.ts | 2 +- .../src/handlers/order-place-handler.ts | 74 +++++++++++++---- .../src/handlers/order-remove-handler.ts | 45 ++++++----- .../src/handlers/order-update-handler.ts | 17 ++-- indexer/services/vulcan/src/index.ts | 2 +- ...ocket-helper.ts => send-message-helper.ts} | 15 ++-- 11 files changed, 224 insertions(+), 154 deletions(-) rename indexer/services/vulcan/__tests__/lib/{send-websocket-helper.test.ts => send-message-helper.test.ts} (87%) rename indexer/services/vulcan/src/lib/{send-websocket-helper.ts => send-message-helper.ts} (86%) diff --git a/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-placement-handler.test.ts b/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-placement-handler.test.ts index a0d79c6219..cea37d7762 100644 --- a/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-placement-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-placement-handler.test.ts @@ -19,7 +19,6 @@ import { IndexerOrder, OrderPlaceV1_OrderPlacementStatus, StatefulOrderEventV1, - OrderUpdateV1, } from '@dydxprotocol-indexer/v4-protos'; import { KafkaMessage } from 'kafkajs'; import { onMessage } from '../../../src/lib/on-message'; @@ -46,10 +45,6 @@ import { STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE } from '../../../src/constants'; import { producer } from '@dydxprotocol-indexer/kafka'; import { ORDER_FLAG_LONG_TERM } from '@dydxprotocol-indexer/v4-proto-parser'; import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions'; -import { - redis, redisTestConstants, StatefulOrderUpdateInfo, StatefulOrderUpdatesCache, -} from '@dydxprotocol-indexer/redis'; -import { redisClient } from '../../../src/helpers/redis/redis-controller'; describe('statefulOrderPlacementHandler', () => { beforeAll(async () => { @@ -69,7 +64,6 @@ describe('statefulOrderPlacementHandler', () => { afterEach(async () => { await dbHelpers.clearData(); - await redis.deleteAllAsync(redisClient); jest.clearAllMocks(); }); @@ -144,48 +138,15 @@ describe('statefulOrderPlacementHandler', () => { it.each([ // TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent - [ - 'stateful order placement and no cached update', - defaultStatefulOrderEvent, - undefined, - ], - [ - 'stateful long term order placement and no cached update', - defaultStatefulOrderLongTermEvent, - undefined, - ], - [ - 'stateful order placement and cached update', - defaultStatefulOrderEvent, - { - ...redisTestConstants.orderUpdate.orderUpdate, - orderId: defaultOrder.orderId, - }, - ], - [ - 'stateful long term order placement and cached update', - defaultStatefulOrderLongTermEvent, - { - ...redisTestConstants.orderUpdate.orderUpdate, - orderId: defaultOrder.orderId, - }, - ], + ['stateful order placement', defaultStatefulOrderEvent], + ['stateful long term order placement', defaultStatefulOrderLongTermEvent], ])('successfully places order with %s', async ( _name: string, statefulOrderEvent: StatefulOrderEventV1, - cachedOrderUpdate: OrderUpdateV1 | undefined, ) => { const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent( statefulOrderEvent, ); - if (cachedOrderUpdate !== undefined) { - await StatefulOrderUpdatesCache.addStatefulOrderUpdate( - orderId, - cachedOrderUpdate, - Date.now(), - redisClient, - ); - } await onMessage(kafkaMessage); const order: OrderFromDatabase | undefined = await OrderTable.findById(orderId); @@ -224,25 +185,6 @@ describe('statefulOrderPlacementHandler', () => { orderId: defaultOrder.orderId!, offchainUpdate: expectedOffchainUpdate, }); - - // If there was a cached order update, expect the cache to be empty and a corresponding - // off-chain update to have been sent to the Kafka producer - if (cachedOrderUpdate !== undefined) { - const orderUpdates: StatefulOrderUpdateInfo[] = await StatefulOrderUpdatesCache - .getOldOrderUpdates( - Date.now(), - redisClient, - ); - expect(orderUpdates).toHaveLength(0); - - expectVulcanKafkaMessage({ - producerSendMock, - orderId: defaultOrder.orderId!, - offchainUpdate: { - orderUpdate: cachedOrderUpdate, - }, - }); - } }); it.each([ diff --git a/indexer/services/ender/src/handlers/stateful-order/stateful-order-placement-handler.ts b/indexer/services/ender/src/handlers/stateful-order/stateful-order-placement-handler.ts index 36994c6eeb..f2066ead71 100644 --- a/indexer/services/ender/src/handlers/stateful-order/stateful-order-placement-handler.ts +++ b/indexer/services/ender/src/handlers/stateful-order/stateful-order-placement-handler.ts @@ -6,17 +6,14 @@ import { perpetualMarketRefresher, OrderStatus, } from '@dydxprotocol-indexer/postgres'; -import { StatefulOrderUpdatesCache } from '@dydxprotocol-indexer/redis'; import { getOrderIdHash } from '@dydxprotocol-indexer/v4-proto-parser'; import { OrderPlaceV1_OrderPlacementStatus, OffChainUpdateV1, IndexerOrder, StatefulOrderEventV1, - OrderUpdateV1, } from '@dydxprotocol-indexer/v4-protos'; -import { redisClient } from '../../helpers/redis/redis-controller'; import { ConsolidatedKafkaEvent } from '../../lib/types'; import { AbstractStatefulOrderHandler } from '../abstract-stateful-order-handler'; @@ -76,21 +73,6 @@ export class StatefulOrderPlacementHandler extends offChainUpdate, )); - const pendingOrderUpdate: OrderUpdateV1 | undefined = await StatefulOrderUpdatesCache - .removeStatefulOrderUpdate( - OrderTable.orderIdToUuid(order.orderId!), - Date.now(), - redisClient, - ); - if (pendingOrderUpdate !== undefined) { - kafakEvents.push(this.generateConsolidatedVulcanKafkaEvent( - getOrderIdHash(order.orderId!), - OffChainUpdateV1.fromPartial({ - orderUpdate: pendingOrderUpdate, - }), - )); - } - return kafakEvents; } } diff --git a/indexer/services/vulcan/__tests__/handlers/order-place-handler.test.ts b/indexer/services/vulcan/__tests__/handlers/order-place-handler.test.ts index 5db09edc8a..2010a3adb7 100644 --- a/indexer/services/vulcan/__tests__/handlers/order-place-handler.test.ts +++ b/indexer/services/vulcan/__tests__/handlers/order-place-handler.test.ts @@ -40,6 +40,7 @@ import { SubaccountOrderIdsCache, CanceledOrdersCache, updateOrder, + StatefulOrderUpdatesCache, } from '@dydxprotocol-indexer/redis'; import { @@ -50,6 +51,7 @@ import { RedisOrder, SubaccountId, SubaccountMessage, + OrderUpdateV1, } from '@dydxprotocol-indexer/v4-protos'; import { KafkaMessage } from 'kafkajs'; import Long from 'long'; @@ -57,14 +59,20 @@ import { convertToRedisOrder, getTriggerPrice } from '../../src/handlers/helpers import { redisClient, redisClient as client } from '../../src/helpers/redis/redis-controller'; import { onMessage } from '../../src/lib/on-message'; import { expectCanceledOrdersCacheEmpty, expectOpenOrderIds, handleInitialOrderPlace } from '../helpers/helpers'; -import { expectWebsocketOrderbookMessage, expectWebsocketSubaccountMessage } from '../helpers/websocket-helpers'; +import { expectOffchainUpdateMessage, expectWebsocketOrderbookMessage, expectWebsocketSubaccountMessage } from '../helpers/websocket-helpers'; import { OrderbookSide } from '../../src/lib/types'; +import { getOrderIdHash } from '@dydxprotocol-indexer/v4-proto-parser'; jest.mock('@dydxprotocol-indexer/base', () => ({ ...jest.requireActual('@dydxprotocol-indexer/base'), wrapBackgroundTask: jest.fn(), })); +interface OffchainUpdateRecord { + key: Buffer, + offchainUpdate: OffChainUpdateV1 +} + describe('order-place-handler', () => { beforeAll(() => { jest.useFakeTimers(); @@ -640,18 +648,42 @@ describe('order-place-handler', () => { it.each([ [ - 'good-til-block-time', + 'good-til-block-time and no cached update', redisTestConstants.defaultOrderGoodTilBlockTime, redisTestConstants.defaultRedisOrderGoodTilBlockTime, redisTestConstants.defaultOrderUuidGoodTilBlockTime, dbOrderGoodTilBlockTime, + undefined, ], [ - 'conditional', + 'conditional and no cached update', redisTestConstants.defaultConditionalOrder, redisTestConstants.defaultRedisOrderConditional, redisTestConstants.defaultOrderUuidConditional, dbConditionalOrder, + undefined, + ], + [ + 'good-til-block-time and cached update', + redisTestConstants.defaultOrderGoodTilBlockTime, + redisTestConstants.defaultRedisOrderGoodTilBlockTime, + redisTestConstants.defaultOrderUuidGoodTilBlockTime, + dbOrderGoodTilBlockTime, + { + ...redisTestConstants.orderUpdate.orderUpdate, + orderId: redisTestConstants.defaultOrderIdGoodTilBlockTime, + }, + ], + [ + 'conditional and cached update', + redisTestConstants.defaultConditionalOrder, + redisTestConstants.defaultRedisOrderConditional, + redisTestConstants.defaultOrderUuidConditional, + dbConditionalOrder, + { + ...redisTestConstants.orderUpdate.orderUpdate, + orderId: redisTestConstants.defaultOrderIdConditional, + }, ], ])('handles order place with OPEN placement status, does not exist initially (with %s)', async ( _name: string, @@ -659,9 +691,27 @@ describe('order-place-handler', () => { expectedRedisOrder: RedisOrder, expectedOrderUuid: string, placedOrder: OrderFromDatabase, + cachedOrderUpdate: OrderUpdateV1 | undefined, ) => { synchronizeWrapBackgroundTask(wrapBackgroundTask); const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); + + let expectedOffchainUpdate: OffchainUpdateRecord | undefined; + if (cachedOrderUpdate !== undefined) { + await StatefulOrderUpdatesCache.addStatefulOrderUpdate( + expectedOrderUuid, + cachedOrderUpdate, + Date.now(), + redisClient, + ); + expectedOffchainUpdate = { + key: getOrderIdHash(orderToPlace.orderId!), + offchainUpdate: { + orderUpdate: cachedOrderUpdate, + }, + }; + } + await handleInitialOrderPlace({ ...redisTestConstants.orderPlace, orderPlace: { @@ -670,6 +720,7 @@ describe('order-place-handler', () => { OrderPlaceV1_OrderPlacementStatus.ORDER_PLACEMENT_STATUS_OPENED, }, }); + expectWebsocketMessagesSent( producerSendSpy, expectedRedisOrder, @@ -678,7 +729,10 @@ describe('order-place-handler', () => { APIOrderStatusEnum.OPEN, // Subaccount message should be sent for stateful order if status is OPEN true, + undefined, + expectedOffchainUpdate, ); + await checkOrderPlace( expectedOrderUuid, redisTestConstants.defaultSubaccountUuid, @@ -962,6 +1016,7 @@ function expectWebsocketMessagesSent( placementStatus: APIOrderStatus, expectSubaccountMessage: boolean, expectedOrderbookMessage?: OrderbookMessage, + expectedOffchainUpdate?: OffchainUpdateRecord, ): void { jest.runOnlyPendingTimers(); // expect one subaccount update message being sent @@ -972,8 +1027,22 @@ function expectWebsocketMessagesSent( if (expectedOrderbookMessage !== undefined) { numMessages += 1; } + if (expectedOffchainUpdate !== undefined) { + numMessages += 1; + } expect(producerSendSpy).toHaveBeenCalledTimes(numMessages); + let callIndex: number = 0; + + if (expectedOffchainUpdate) { + expectOffchainUpdateMessage( + producerSendSpy.mock.calls[callIndex][0], + expectedOffchainUpdate.key, + expectedOffchainUpdate.offchainUpdate, + ); + callIndex += 1; + } + if (expectSubaccountMessage) { const orderTIF: TimeInForce = protocolTranslations.protocolOrderTIFToTIF( redisOrder.order!.timeInForce, @@ -1016,12 +1085,11 @@ function expectWebsocketMessagesSent( version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, }); - expectWebsocketSubaccountMessage(producerSendSpy.mock.calls[0][0], subaccountMessage); + expectWebsocketSubaccountMessage(producerSendSpy.mock.calls[callIndex][0], subaccountMessage); + callIndex += 1; } if (expectedOrderbookMessage !== undefined) { - // If there is no subaccount message, the orderbook message should be the first message - const callIndex: number = expectSubaccountMessage ? 1 : 0; expectWebsocketOrderbookMessage( producerSendSpy.mock.calls[callIndex][0], expectedOrderbookMessage, diff --git a/indexer/services/vulcan/__tests__/helpers/websocket-helpers.ts b/indexer/services/vulcan/__tests__/helpers/websocket-helpers.ts index 1148a84ec1..79a2eee145 100644 --- a/indexer/services/vulcan/__tests__/helpers/websocket-helpers.ts +++ b/indexer/services/vulcan/__tests__/helpers/websocket-helpers.ts @@ -1,5 +1,5 @@ import { KafkaTopics } from '@dydxprotocol-indexer/kafka'; -import { OrderbookMessage, SubaccountMessage } from '@dydxprotocol-indexer/v4-protos'; +import { OffChainUpdateV1, OrderbookMessage, SubaccountMessage } from '@dydxprotocol-indexer/v4-protos'; import { ProducerRecord } from 'kafkajs'; export function expectWebsocketSubaccountMessage( @@ -29,3 +29,20 @@ export function expectWebsocketOrderbookMessage( ); expect(orderbookMessage).toEqual(expectedOrderbookMessage); } + +export function expectOffchainUpdateMessage( + offchainUpdateProducerRecord: ProducerRecord, + expectedKey: Buffer, + expectedOffchainUpdate: OffChainUpdateV1, +): void { + expect(offchainUpdateProducerRecord.topic).toEqual(KafkaTopics.TO_VULCAN); + const offchainUpdateMessageValueBinary: Uint8Array = new Uint8Array( + offchainUpdateProducerRecord.messages[0].value as Buffer, + ); + const key: Buffer = offchainUpdateProducerRecord.messages[0].key as Buffer; + const offchainUpdate: OffChainUpdateV1 = OffChainUpdateV1.decode( + offchainUpdateMessageValueBinary, + ); + expect(offchainUpdate).toEqual(expectedOffchainUpdate); + expect(key).toEqual(expectedKey); +} diff --git a/indexer/services/vulcan/__tests__/lib/send-websocket-helper.test.ts b/indexer/services/vulcan/__tests__/lib/send-message-helper.test.ts similarity index 87% rename from indexer/services/vulcan/__tests__/lib/send-websocket-helper.test.ts rename to indexer/services/vulcan/__tests__/lib/send-message-helper.test.ts index 2e1cdcd359..8979ffdcc1 100644 --- a/indexer/services/vulcan/__tests__/lib/send-websocket-helper.test.ts +++ b/indexer/services/vulcan/__tests__/lib/send-message-helper.test.ts @@ -4,8 +4,8 @@ import { import { synchronizeWrapBackgroundTask } from '@dydxprotocol-indexer/dev'; import { producer, WebsocketTopics } from '@dydxprotocol-indexer/kafka'; import { - flushAllQueues, sendWebsocketWrapper, sizeStat, timingStat, -} from '../../src/lib/send-websocket-helper'; + flushAllQueues, sendMessageWrapper, sizeStat, timingStat, +} from '../../src/lib/send-message-helper'; import config from '../../src/config'; import { Message, ProducerRecord } from 'kafkajs'; @@ -14,7 +14,7 @@ jest.mock('@dydxprotocol-indexer/base', () => ({ wrapBackgroundTask: jest.fn(), })); -describe('send-websocket-helper', () => { +describe('send-message-helper', () => { let producerSendSpy: jest.SpyInstance; let logErrorSpy: jest.SpyInstance; let statsTimingSpy: jest.SpyInstance; @@ -44,15 +44,17 @@ describe('send-websocket-helper', () => { it('sends messages for all message queues', async () => { const expectedMessagesSent: {[topic: string]: ProducerRecord} = {}; Object.values(WebsocketTopics).forEach((topic: string) => { - const messages: Buffer[] = []; + const messages: Message[] = []; for (let i: number = 0; i < config.MAX_WEBSOCKET_MESSAGES_TO_QUEUE_PER_TOPIC; i++) { - const message: Buffer = Buffer.from(i.toString()); - sendWebsocketWrapper(message, topic); + const message: Message = { + value: Buffer.from(i.toString()), + }; + sendMessageWrapper(message, topic); messages.push(message); } expectedMessagesSent[topic] = { topic, - messages: messages.map((message: Buffer): Message => { return { value: message }; }), + messages, }; }); @@ -76,20 +78,24 @@ describe('send-websocket-helper', () => { describe('sendWebsocketWrapper', () => { it('sends messages for a topic on an interval in batches', async () => { - const messageVal1: Buffer = Buffer.from('some message'); - const messageVal2: Buffer = Buffer.from('another message'); + const messageVal1: Message = { + value: Buffer.from('some message'), + }; + const messageVal2: Message = { + value: Buffer.from('another message'), + }; const topic: string = 'some-topic'; const expectedMessage: ProducerRecord = { topic, - messages: [{ value: messageVal1 }, { value: messageVal2 }], + messages: [messageVal1, messageVal2], }; - sendWebsocketWrapper(messageVal1, topic); + sendMessageWrapper(messageVal1, topic); // No messages should be sent if no timers have been run expect(producerSendSpy).not.toHaveBeenCalled(); - sendWebsocketWrapper(messageVal2, topic); + sendMessageWrapper(messageVal2, topic); // No messages should be sent if no timers have been run expect(producerSendSpy).not.toHaveBeenCalled(); @@ -187,9 +193,11 @@ describe('send-websocket-helper', () => { it('respects SEND_WEBSOCKET_MESSAGES flag', () => { config.SEND_WEBSOCKET_MESSAGES = false; - const messageVal1: Buffer = Buffer.from('some message'); + const messageVal1: Message = { + value: Buffer.from('some message'), + }; const topic: string = 'some-topic'; - sendWebsocketWrapper(messageVal1, topic); + sendMessageWrapper(messageVal1, topic); jest.runOnlyPendingTimers(); // Both messages should be sent in one batch @@ -201,9 +209,11 @@ describe('send-websocket-helper', () => { function sendMessagesForTest(numMessages: number, topic: string): ProducerRecord { const expectedMessage: ProducerRecord = { topic, messages: [] }; for (let i: number = 0; i < numMessages; i++) { - const messageVal: Buffer = Buffer.from(i.toString()); - sendWebsocketWrapper(messageVal, topic); - expectedMessage.messages.push({ value: messageVal }); + const messageVal: Message = { + value: Buffer.from(i.toString()), + }; + sendMessageWrapper(messageVal, topic); + expectedMessage.messages.push(messageVal); } return expectedMessage; } diff --git a/indexer/services/vulcan/src/config.ts b/indexer/services/vulcan/src/config.ts index 3d029ef527..82e21bc18b 100644 --- a/indexer/services/vulcan/src/config.ts +++ b/indexer/services/vulcan/src/config.ts @@ -20,7 +20,7 @@ export const configSchema = { ...kafkaConfigSchema, ...redisConfigSchema, - FLUSH_WEBSOCKET_MESSAGES_INTERVAL_MS: parseNumber({ + FLUSH_KAFKA_MESSAGES_INTERVAL_MS: parseNumber({ default: 10, }), MAX_WEBSOCKET_MESSAGES_TO_QUEUE_PER_TOPIC: parseNumber({ diff --git a/indexer/services/vulcan/src/handlers/order-place-handler.ts b/indexer/services/vulcan/src/handlers/order-place-handler.ts index 05a686e3f3..6f9ba098a2 100644 --- a/indexer/services/vulcan/src/handlers/order-place-handler.ts +++ b/indexer/services/vulcan/src/handlers/order-place-handler.ts @@ -23,21 +23,24 @@ import { PlaceOrderResult, placeOrder, CanceledOrdersCache, + StatefulOrderUpdatesCache, } from '@dydxprotocol-indexer/redis'; -import { ORDER_FLAG_SHORT_TERM } from '@dydxprotocol-indexer/v4-proto-parser'; +import { ORDER_FLAG_SHORT_TERM, getOrderIdHash, isStatefulOrder } from '@dydxprotocol-indexer/v4-proto-parser'; import { OffChainUpdateV1, IndexerOrder, OrderPlaceV1, OrderPlaceV1_OrderPlacementStatus, + OrderUpdateV1, RedisOrder, SubaccountMessage, } from '@dydxprotocol-indexer/v4-protos'; import Big from 'big.js'; +import { Message } from 'kafkajs'; import config from '../config'; import { redisClient } from '../helpers/redis/redis-controller'; -import { sendWebsocketWrapper } from '../lib/send-websocket-helper'; +import { sendMessageWrapper } from '../lib/send-message-helper'; import { Handler } from './handler'; import { convertToRedisOrder, getTriggerPrice } from './helpers'; @@ -47,6 +50,8 @@ import { convertToRedisOrder, getTriggerPrice } from './helpers'; * - Add the order to the OrdersCache, OrdersDataCache, and SubaccountOrderIdsCache * - this is done using the `placeOrder` function from the `redis` package * - Remove the order from the CanceledOrdersCache if it exists + * - If the order is a stateful order, attempt to remove any cached order update from the + * StatefulOrderUpdatesCache, and then queue the order update to be re-sent and re-processed * - If the order doesn't already exist in the caches, return * - If the order exists in the caches, but was not replaced due to the expiry of the existing order * being greater than or equal to the expiry of the order in the OrderPlace message, return @@ -123,9 +128,10 @@ export class OrderPlaceHandler extends Handler { if (this.shouldSendSubaccountMessage(update.orderPlace!)) { // TODO(IND-171): Determine whether we should always be sending a message, even when the cache // isn't updated. - // for stateful and conditional orders, look the order up in the db for the createdAtHeight + // For stateful and conditional orders, look the order up in the db for the createdAtHeight + // and send any cached order updates for the stateful or conditional order let dbOrder: OrderFromDatabase | undefined; - if (redisOrder.order!.goodTilBlockTime !== undefined) { + if (isStatefulOrder(redisOrder.order!.orderId!.orderFlags)) { const orderUuid: string = OrderTable.orderIdToUuid(redisOrder.order!.orderId!); dbOrder = await OrderTable.findById(orderUuid); if (dbOrder === undefined) { @@ -135,25 +141,30 @@ export class OrderPlaceHandler extends Handler { }); throw new Error(`Stateful order not found in database: ${orderUuid}`); } + await this.sendCachedOrderUpdate(orderUuid); } - const subaccountMessage: Buffer = this.createSubaccountWebsocketMessage( - redisOrder, - dbOrder, - perpetualMarket, - placementStatus, - ); - sendWebsocketWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS); + const subaccountMessage: Message = { + value: this.createSubaccountWebsocketMessage( + redisOrder, + dbOrder, + perpetualMarket, + placementStatus, + ), + }; + sendMessageWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS); } // TODO(IND-68): Remove once order replacement flow in V4 protocol removes the old order and // places the updated order. if (updatedQuantums !== undefined) { - const orderbookMessage: Buffer = this.createOrderbookWebsocketMessage( - placeOrderResult.oldOrder!, - perpetualMarket, - updatedQuantums, - ); - sendWebsocketWrapper(orderbookMessage, KafkaTopics.TO_WEBSOCKETS_ORDERBOOKS); + const orderbookMessage: Message = { + value: this.createOrderbookWebsocketMessage( + placeOrderResult.oldOrder!, + perpetualMarket, + updatedQuantums, + ), + }; + sendMessageWrapper(orderbookMessage, KafkaTopics.TO_WEBSOCKETS_ORDERBOOKS); } } @@ -342,4 +353,33 @@ export class OrderPlaceHandler extends Handler { this.generateTimingStatsOptions('remove_order_from_cancel_cache'), ); } + + /** + * Removes and sends the cached order update for the given order id if it exists. + * + * @param orderId + * @returns + */ + protected async sendCachedOrderUpdate( + orderId: string, + ): Promise { + const cachedOrderUpdate: OrderUpdateV1 | undefined = await StatefulOrderUpdatesCache + .removeStatefulOrderUpdate( + orderId, + Date.now(), + redisClient, + ); + + if (cachedOrderUpdate === undefined) { + return; + } + + const orderUpdateMessage: Message = { + key: getOrderIdHash(cachedOrderUpdate.orderId!), + value: Buffer.from( + Uint8Array.from(OffChainUpdateV1.encode({ orderUpdate: cachedOrderUpdate }).finish()), + ), + }; + sendMessageWrapper(orderUpdateMessage, KafkaTopics.TO_VULCAN); + } } diff --git a/indexer/services/vulcan/src/handlers/order-remove-handler.ts b/indexer/services/vulcan/src/handlers/order-remove-handler.ts index af41a58e46..9bcd27dd53 100644 --- a/indexer/services/vulcan/src/handlers/order-remove-handler.ts +++ b/indexer/services/vulcan/src/handlers/order-remove-handler.ts @@ -33,10 +33,11 @@ import { SubaccountMessage, } from '@dydxprotocol-indexer/v4-protos'; import { Big } from 'big.js'; +import { Message } from 'kafkajs'; import config from '../config'; import { redisClient } from '../helpers/redis/redis-controller'; -import { sendWebsocketWrapper } from '../lib/send-websocket-helper'; +import { sendMessageWrapper } from '../lib/send-message-helper'; import { Handler } from './handler'; import { getTriggerPrice } from './helpers'; @@ -218,12 +219,14 @@ export class OrderRemoveHandler extends Handler { return; } - const subaccountMessage: Buffer = this.createSubaccountWebsocketMessageFromPostgresOrder( - order, - orderRemove, - perpetualMarket.ticker, - ); - sendWebsocketWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS); + const subaccountMessage: Message = { + value: this.createSubaccountWebsocketMessageFromPostgresOrder( + order, + orderRemove, + perpetualMarket.ticker, + ), + }; + sendMessageWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS); // If an order was removed from the Orders cache and was resting on the book, update the // orderbook levels cache @@ -273,15 +276,17 @@ export class OrderRemoveHandler extends Handler { this.generateTimingStatsOptions('cancel_order_in_postgres'), ); - const subaccountMessage: Buffer = this.createSubaccountWebsocketMessageFromRemoveOrderResult( - removeOrderResult, - orderRemove, - perpetualMarket, - ); + const subaccountMessage: Message = { + value: this.createSubaccountWebsocketMessageFromRemoveOrderResult( + removeOrderResult, + orderRemove, + perpetualMarket, + ), + }; // TODO(IND-147): Remove this check once fully-filled orders are removed by ender if (this.shouldSendSubaccountMessage(orderRemove, removeOrderResult)) { - sendWebsocketWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS); + sendMessageWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS); } const remainingQuantums: Big = Big(this.getSizeDeltaInQuantums( @@ -315,12 +320,14 @@ export class OrderRemoveHandler extends Handler { ), this.generateTimingStatsOptions('update_price_level_cache'), ); - const orderbookMessage: Buffer = this.createOrderbookWebsocketMessage( - removeOrderResult.removedOrder!, - perpetualMarket, - updatedQuantums, - ); - sendWebsocketWrapper(orderbookMessage, KafkaTopics.TO_WEBSOCKETS_ORDERBOOKS); + const orderbookMessage: Message = { + value: this.createOrderbookWebsocketMessage( + removeOrderResult.removedOrder!, + perpetualMarket, + updatedQuantums, + ), + }; + sendMessageWrapper(orderbookMessage, KafkaTopics.TO_WEBSOCKETS_ORDERBOOKS); } /** diff --git a/indexer/services/vulcan/src/handlers/order-update-handler.ts b/indexer/services/vulcan/src/handlers/order-update-handler.ts index 58a66254d1..f36eed7123 100644 --- a/indexer/services/vulcan/src/handlers/order-update-handler.ts +++ b/indexer/services/vulcan/src/handlers/order-update-handler.ts @@ -24,10 +24,11 @@ import { RedisOrder, } from '@dydxprotocol-indexer/v4-protos'; import Big from 'big.js'; +import { Message } from 'kafkajs'; import config from '../config'; import { redisClient } from '../helpers/redis/redis-controller'; -import { sendWebsocketWrapper } from '../lib/send-websocket-helper'; +import { sendMessageWrapper } from '../lib/send-message-helper'; import { Handler } from './handler'; /** @@ -157,12 +158,14 @@ export class OrderUpdateHandler extends Handler { return; } - const orderbookMessage: Buffer = this.createOrderbookWebsocketMessage( - updateResult.order!, - perpetualMarket, - updatedQuantums, - ); - sendWebsocketWrapper(orderbookMessage, KafkaTopics.TO_WEBSOCKETS_ORDERBOOKS); + const orderbookMessage: Message = { + value: this.createOrderbookWebsocketMessage( + updateResult.order!, + perpetualMarket, + updatedQuantums, + ), + }; + sendMessageWrapper(orderbookMessage, KafkaTopics.TO_WEBSOCKETS_ORDERBOOKS); } /** diff --git a/indexer/services/vulcan/src/index.ts b/indexer/services/vulcan/src/index.ts index 0ff9e9076b..72d0472d93 100644 --- a/indexer/services/vulcan/src/index.ts +++ b/indexer/services/vulcan/src/index.ts @@ -8,7 +8,7 @@ import { connect as connectToRedis, redisClient, } from './helpers/redis/redis-controller'; -import { flushAllQueues } from './lib/send-websocket-helper'; +import { flushAllQueues } from './lib/send-message-helper'; async function startService(): Promise { logger.info({ diff --git a/indexer/services/vulcan/src/lib/send-websocket-helper.ts b/indexer/services/vulcan/src/lib/send-message-helper.ts similarity index 86% rename from indexer/services/vulcan/src/lib/send-websocket-helper.ts rename to indexer/services/vulcan/src/lib/send-message-helper.ts index ab5e297911..cf0d05307c 100644 --- a/indexer/services/vulcan/src/lib/send-websocket-helper.ts +++ b/indexer/services/vulcan/src/lib/send-message-helper.ts @@ -2,10 +2,11 @@ import { logger, stats, STATS_NO_SAMPLING, wrapBackgroundTask, } from '@dydxprotocol-indexer/base'; import { producer } from '@dydxprotocol-indexer/kafka'; +import { Message } from 'kafkajs'; import config from '../config'; -const queuedMessages: {[topic: string]: Buffer[]} = {}; +const queuedMessages: {[topic: string]: Message[]} = {}; const timeouts: {[topic: string]: NodeJS.Timeout } = {}; export const sizeStat: string = `${config.SERVICE_NAME}.flush_websocket.size`; export const timingStat: string = `${config.SERVICE_NAME}.flush_websocket.timing`; @@ -25,12 +26,12 @@ export async function flushAllQueues(): Promise { } /** - * Wrapper to send websocket messages to a Kafka topic. Mesages are batched and sent on an interval + * Wrapper to send messages to a Kafka topic. Mesages are batched and sent on an interval * or when the batch reaches a configurable maximum size. * @param message * @param topic */ -export function sendWebsocketWrapper(message: Buffer, topic: string): void { +export function sendMessageWrapper(message: Message, topic: string): void { if (queuedMessages[topic] === undefined) { queuedMessages[topic] = []; } @@ -47,7 +48,7 @@ export function sendWebsocketWrapper(message: Buffer, topic: string): void { timeouts[topic] = setTimeout(() => { wrapBackgroundTask(sendMessages(topic), true, sendMessagesTaskname); }, - config.FLUSH_WEBSOCKET_MESSAGES_INTERVAL_MS, + config.FLUSH_KAFKA_MESSAGES_INTERVAL_MS, ); } } @@ -74,7 +75,7 @@ function shouldFlush(topic: string): boolean { async function sendMessages(topic: string): Promise { delete timeouts[topic]; - const messages: Buffer[] = queuedMessages[topic]; + const messages: Message[] = queuedMessages[topic]; if (messages === undefined || messages.length === 0) { stats.histogram(sizeStat, 0, STATS_NO_SAMPLING, { topic, success: 'true' }); return; @@ -88,7 +89,7 @@ async function sendMessages(topic: string): Promise { try { await producer.send({ topic, - messages: messages.map((message: Buffer) => { return { value: message }; }), + messages, }); success = true; } catch (error) { @@ -101,7 +102,7 @@ async function sendMessages(topic: string): Promise { }); // Re-enqueue all messages if they failed to be sent - messages.forEach((message: Buffer) => sendWebsocketWrapper(message, topic)); + messages.forEach((message: Message) => sendMessageWrapper(message, topic)); } finally { const tags: {[name: string]: string} = { topic,