From f7b673e6519a91fa0614d1047149ed999bd626a5 Mon Sep 17 00:00:00 2001 From: dydxwill <119354122+dydxwill@users.noreply.github.com> Date: Tue, 21 May 2024 10:23:31 -0400 Subject: [PATCH] [CT-846] Send subaccount websocket message when cancel is received for non-existent order (#1540) (cherry picked from commit 8426d41d683c37421605b722ebb3767af445fb12) --- indexer/packages/kafka/src/constants.ts | 2 +- .../src/types/websocket-message-types.ts | 20 +-- .../comlink/public/websocket-documentation.md | 24 ++-- .../handlers/order-remove-handler.test.ts | 108 +++++++++++++++- indexer/services/vulcan/src/config.ts | 3 + .../src/handlers/order-remove-handler.ts | 116 ++++++++++++++++-- 6 files changed, 238 insertions(+), 35 deletions(-) diff --git a/indexer/packages/kafka/src/constants.ts b/indexer/packages/kafka/src/constants.ts index 9f28e5d29e..9a02cb1b3e 100644 --- a/indexer/packages/kafka/src/constants.ts +++ b/indexer/packages/kafka/src/constants.ts @@ -1,7 +1,7 @@ export const TO_ENDER_TOPIC: string = 'to-ender'; export const ORDERBOOKS_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0'; -export const SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION: string = '2.4.0'; +export const SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION: string = '3.0.0'; export const TRADES_WEBSOCKET_MESSAGE_VERSION: string = '2.1.0'; export const MARKETS_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0'; export const CANDLES_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0'; diff --git a/indexer/packages/postgres/src/types/websocket-message-types.ts b/indexer/packages/postgres/src/types/websocket-message-types.ts index 231a986cd9..08faa8dcc7 100644 --- a/indexer/packages/postgres/src/types/websocket-message-types.ts +++ b/indexer/packages/postgres/src/types/websocket-message-types.ts @@ -101,15 +101,15 @@ export interface OrderSubaccountMessageContents { id: string; subaccountId: string; clientId: string; - clobPairId: string; - side: OrderSide; - size: string; - ticker: string, - price: string; - type: OrderType; - timeInForce: APITimeInForce; - postOnly: boolean; - reduceOnly: boolean; + clobPairId?: string; + side?: OrderSide; + size?: string; + ticker?: string, + price?: string; + type?: OrderType; + timeInForce?: APITimeInForce; + postOnly?: boolean; + reduceOnly?: boolean; status: APIOrderStatus; orderFlags: string; @@ -125,7 +125,7 @@ export interface OrderSubaccountMessageContents { removalReason?: string; // This will only be set for stateful orders createdAtHeight?: string; - clientMetadata: string; + clientMetadata?: string; } export interface FillSubaccountMessageContents { diff --git a/indexer/services/comlink/public/websocket-documentation.md b/indexer/services/comlink/public/websocket-documentation.md index 172259c09f..5bd3031d70 100644 --- a/indexer/services/comlink/public/websocket-documentation.md +++ b/indexer/services/comlink/public/websocket-documentation.md @@ -162,27 +162,27 @@ export interface OrderSubaccountMessageContents { id: string; subaccountId: string; clientId: string; - clobPairId: string; - side: OrderSide; - size: string; - ticker: string, - price: string; - type: OrderType; - timeInForce: APITimeInForce; - postOnly: boolean; - reduceOnly: boolean; + clobPairId?: string; + side?: OrderSide; + size?: string; + ticker?: string, + price?: string; + type?: OrderType; + timeInForce?: APITimeInForce; + postOnly?: boolean; + reduceOnly?: boolean; status: APIOrderStatus; orderFlags: string; totalFilled?: string; totalOptimisticFilled?: string; goodTilBlock?: string; goodTilBlockTime?: string; - removalReason?: string; - createdAtHeight?: string; - clientMetadata: string; triggerPrice?: string; updatedAt?: IsoString; updatedAtHeight?: string; + removalReason?: string; + createdAtHeight?: string; + clientMetadata?: string; } export enum OrderSide { diff --git a/indexer/services/vulcan/__tests__/handlers/order-remove-handler.test.ts b/indexer/services/vulcan/__tests__/handlers/order-remove-handler.test.ts index b0b00075f2..dd0e779b23 100644 --- a/indexer/services/vulcan/__tests__/handlers/order-remove-handler.test.ts +++ b/indexer/services/vulcan/__tests__/handlers/order-remove-handler.test.ts @@ -69,6 +69,7 @@ import { import { expectWebsocketOrderbookMessage, expectWebsocketSubaccountMessage } from '../helpers/websocket-helpers'; import { ORDER_FLAG_LONG_TERM } from '@dydxprotocol-indexer/v4-proto-parser'; import Long from 'long'; +import config from '../../src/config'; jest.mock('@dydxprotocol-indexer/base', () => ({ ...jest.requireActual('@dydxprotocol-indexer/base'), @@ -94,6 +95,7 @@ describe('OrderRemoveHandler', () => { await dbHelpers.clearData(); await redis.deleteAllAsync(redisClient); jest.resetAllMocks(); + config.SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_CANCELS_MISSING_ORDERS = false; }); afterAll(async () => { @@ -205,6 +207,108 @@ describe('OrderRemoveHandler', () => { expectTimingStats(); }); + it('successfully sends subaccount websocket message and returns if unable to find order in redis', async () => { + config.SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_CANCELS_MISSING_ORDERS = true; + const offChainUpdate: OffChainUpdateV1 = orderRemoveToOffChainUpdate(defaultOrderRemove); + const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); + + const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); + await orderRemoveHandler.handleUpdate( + offChainUpdate, + defaultKafkaHeaders, + ); + + expect(logger.info).toHaveBeenCalledWith(expect.objectContaining({ + at: 'orderRemoveHandler#handleOrderRemoval', + message: 'Unable to find order', + orderId: defaultOrderRemove.removedOrderId, + })); + + // Subaccounts message is sent + const subaccountContents: SubaccountMessageContents = { + orders: [ + { + id: OrderTable.orderIdToUuid(redisTestConstants.defaultOrderId), + subaccountId: testConstants.defaultSubaccountId, + clientId: redisTestConstants.defaultOrderId.clientId.toString(), + clobPairId: testConstants.defaultPerpetualMarket.clobPairId, + status: OrderStatus.CANCELED, + orderFlags: redisTestConstants.defaultOrderId.orderFlags.toString(), + ticker: redisTestConstants.defaultRedisOrder.ticker, + removalReason: OrderRemovalReason[defaultOrderRemove.reason], + }, + ], + }; + expectWebsocketMessagesSent( + producerSendSpy, + SubaccountMessage.fromPartial({ + contents: JSON.stringify(subaccountContents), + subaccountId: redisTestConstants.defaultSubaccountId, + version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, + }), + ); + + expect(logger.error).not.toHaveBeenCalled(); + expectTimingStats(); + }); + + it('successfully sends subaccount websocket message with db order fields if unable to find order in redis', + async () => { + config.SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_CANCELS_MISSING_ORDERS = true; + await OrderTable.create(testConstants.defaultOrder); + const offChainUpdate: OffChainUpdateV1 = orderRemoveToOffChainUpdate(defaultOrderRemove); + const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); + + const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); + await orderRemoveHandler.handleUpdate( + offChainUpdate, + defaultKafkaHeaders, + ); + + expect(logger.info).toHaveBeenCalledWith(expect.objectContaining({ + at: 'orderRemoveHandler#handleOrderRemoval', + message: 'Unable to find order', + orderId: defaultOrderRemove.removedOrderId, + })); + + // Subaccounts message is sent + const subaccountContents: SubaccountMessageContents = { + orders: [ + { + id: OrderTable.orderIdToUuid(redisTestConstants.defaultOrderId), + subaccountId: testConstants.defaultSubaccountId, + clientId: redisTestConstants.defaultOrderId.clientId.toString(), + clobPairId: testConstants.defaultPerpetualMarket.clobPairId, + status: OrderStatus.CANCELED, + orderFlags: redisTestConstants.defaultOrderId.orderFlags.toString(), + ticker: redisTestConstants.defaultRedisOrder.ticker, + removalReason: OrderRemovalReason[defaultOrderRemove.reason], + updatedAt: testConstants.defaultOrder.updatedAt, + updatedAtHeight: testConstants.defaultOrder.updatedAtHeight, + price: testConstants.defaultOrder.price, + size: testConstants.defaultOrder.size, + clientMetadata: testConstants.defaultOrder.clientMetadata, + side: testConstants.defaultOrder.side, + timeInForce: apiTranslations.orderTIFToAPITIF(testConstants.defaultOrder.timeInForce), + totalFilled: testConstants.defaultOrder.totalFilled, + goodTilBlock: testConstants.defaultOrder.goodTilBlock, + type: testConstants.defaultOrder.type, + }, + ], + }; + expectWebsocketMessagesSent( + producerSendSpy, + SubaccountMessage.fromPartial({ + contents: JSON.stringify(subaccountContents), + subaccountId: redisTestConstants.defaultSubaccountId, + version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, + }), + ); + + expect(logger.error).not.toHaveBeenCalled(); + expectTimingStats(); + }); + it('successfully returns early if unable to find perpetualMarket', async () => { await Promise.all([ dbHelpers.clearData(), @@ -217,10 +321,10 @@ describe('OrderRemoveHandler', () => { const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); await orderRemoveHandler.handleUpdate(offChainUpdate); - const ticker: string = testConstants.defaultPerpetualMarket.ticker; + const clobPairId: string = testConstants.defaultPerpetualMarket.clobPairId; expect(logger.error).toHaveBeenCalledWith({ at: 'orderRemoveHandler#handle', - message: `Unable to find perpetual market with ticker: ${ticker}`, + message: `Unable to find perpetual market with clobPairId: ${clobPairId}`, }); expectTimingStats(); }); diff --git a/indexer/services/vulcan/src/config.ts b/indexer/services/vulcan/src/config.ts index e71f830f76..6c89ab4877 100644 --- a/indexer/services/vulcan/src/config.ts +++ b/indexer/services/vulcan/src/config.ts @@ -37,6 +37,9 @@ export const configSchema = { SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_STATEFUL_ORDERS: parseBoolean({ default: true, }), + SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_CANCELS_MISSING_ORDERS: parseBoolean({ + default: false, + }), }; export default parseSchema(configSchema); diff --git a/indexer/services/vulcan/src/handlers/order-remove-handler.ts b/indexer/services/vulcan/src/handlers/order-remove-handler.ts index 428c572780..657601c0ea 100644 --- a/indexer/services/vulcan/src/handlers/order-remove-handler.ts +++ b/indexer/services/vulcan/src/handlers/order-remove-handler.ts @@ -14,6 +14,9 @@ import { apiTranslations, TimeInForce, IsoString, + OrderSide, + APITimeInForce, + OrderType, } from '@dydxprotocol-indexer/postgres'; import { OpenOrdersCache, @@ -260,6 +263,17 @@ export class OrderRemoveHandler extends Handler { orderRemove: OrderRemoveV1, removeOrderResult: RemoveOrderResult, ): Promise { + const perpetualMarket: PerpetualMarketFromDatabase | undefined = perpetualMarketRefresher + .getPerpetualMarketFromClobPairId(orderRemove.removedOrderId!.clobPairId.toString()); + if (perpetualMarket === undefined) { + const clobPairId: string = orderRemove.removedOrderId!.clobPairId.toString(); + logger.error({ + at: 'orderRemoveHandler#handle', + message: `Unable to find perpetual market with clobPairId: ${clobPairId}`, + }); + return; + } + // This can happen for short term orders if the order place message was not received. if (!removeOrderResult.removed) { logger.info({ at: 'orderRemoveHandler#handleOrderRemoval', @@ -267,22 +281,33 @@ export class OrderRemoveHandler extends Handler { orderId: orderRemove.removedOrderId, orderRemove, }); + if (config.SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_CANCELS_MISSING_ORDERS) { + const canceledOrder: OrderFromDatabase | undefined = await runFuncWithTimingStat( + OrderTable.findById(OrderTable.orderIdToUuid(orderRemove.removedOrderId!)), + this.generateTimingStatsOptions('find_order'), + ); + const subaccountMessage: Message = { + value: this.createSubaccountWebsocketMessageFromOrderRemoveMessage( + canceledOrder, + orderRemove, + perpetualMarket.ticker, + ), + headers, + }; + const reason: OrderRemovalReason = orderRemove.reason; + if (!( + reason === OrderRemovalReason.ORDER_REMOVAL_REASON_INDEXER_EXPIRED || + reason === OrderRemovalReason.ORDER_REMOVAL_REASON_FULLY_FILLED + )) { + sendMessageWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS); + } + } return; } const stateRemainingQuantums: Big = await getStateRemainingQuantums( removeOrderResult.removedOrder!, ); - const perpetualMarket: PerpetualMarketFromDatabase | undefined = perpetualMarketRefresher - .getPerpetualMarketFromTicker(removeOrderResult.removedOrder!.ticker); - if (perpetualMarket === undefined) { - const ticker: string = removeOrderResult.removedOrder!.ticker; - logger.error({ - at: 'orderRemoveHandler#handle', - message: `Unable to find perpetual market with ticker: ${ticker}`, - }); - return; - } // If the remaining amount of the order in state is <= 0, the order is filled and // does not need to have it's status updated @@ -525,6 +550,77 @@ export class OrderRemoveHandler extends Handler { return sizeDelta.toFixed(); } + /** + * Should be called when an OrderRemove message is received for a non-existent order. + * This can happen when the order was not found in redis because the initial order + * placement message wasn't received. + * + * @param canceledOrder + * @param orderRemove + * @param perpetualMarket + * @protected + */ + protected createSubaccountWebsocketMessageFromOrderRemoveMessage( + canceledOrder: OrderFromDatabase | undefined, + orderRemove: OrderRemoveV1, + ticker: string, + ): Buffer { + const createdAtHeight: string | undefined = canceledOrder?.createdAtHeight; + const updatedAt: IsoString | undefined = canceledOrder?.updatedAt; + const updatedAtHeight: string | undefined = canceledOrder?.updatedAtHeight; + const price: string | undefined = canceledOrder?.price; + const size: string | undefined = canceledOrder?.size; + const clientMetadata: string | undefined = canceledOrder?.clientMetadata; + const reduceOnly: boolean | undefined = canceledOrder?.reduceOnly; + const side: OrderSide | undefined = canceledOrder?.side; + const timeInForce: APITimeInForce | undefined = canceledOrder + ? apiTranslations.orderTIFToAPITIF(canceledOrder.timeInForce) : undefined; + const totalFilled: string | undefined = canceledOrder?.totalFilled; + const goodTilBlock: string | undefined = canceledOrder?.goodTilBlock; + const goodTilBlockTime: string | undefined = canceledOrder?.goodTilBlockTime; + const triggerPrice: string | undefined = canceledOrder?.triggerPrice; + const type: OrderType | undefined = canceledOrder?.type; + + const contents: SubaccountMessageContents = { + orders: [ + { + id: OrderTable.orderIdToUuid(orderRemove.removedOrderId!), + subaccountId: SubaccountTable.subaccountIdToUuid( + orderRemove.removedOrderId!.subaccountId!, + ), + clientId: orderRemove.removedOrderId!.clientId.toString(), + clobPairId: orderRemove.removedOrderId!.clobPairId.toString(), + status: this.orderRemovalStatusToOrderStatus(orderRemove.removalStatus), + orderFlags: orderRemove.removedOrderId!.orderFlags.toString(), + ticker, + removalReason: OrderRemovalReason[orderRemove.reason], + ...(createdAtHeight && { createdAtHeight }), + ...(updatedAt && { updatedAt }), + ...(updatedAtHeight && { updatedAtHeight }), + ...(price && { price }), + ...(size && { size }), + ...(clientMetadata && { clientMetadata }), + ...(reduceOnly && { reduceOnly }), + ...(side && { side }), + ...(timeInForce && { timeInForce }), + ...(totalFilled && { totalFilled }), + ...(goodTilBlock && { goodTilBlock }), + ...(goodTilBlockTime && { goodTilBlockTime }), + ...(triggerPrice && { triggerPrice }), + ...(type && { type }), + }, + ], + }; + + const subaccountMessage: SubaccountMessage = SubaccountMessage.fromPartial({ + contents: JSON.stringify(contents), + subaccountId: orderRemove.removedOrderId!.subaccountId!, + version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, + }); + + return Buffer.from(Uint8Array.from(SubaccountMessage.encode(subaccountMessage).finish())); + } + protected createSubaccountWebsocketMessageFromRemoveOrderResult( removeOrderResult: RemoveOrderResult, canceledOrder: OrderFromDatabase | undefined,