diff --git a/indexer/services/vulcan/__tests__/handlers/order-place-handler.test.ts b/indexer/services/vulcan/__tests__/handlers/order-place-handler.test.ts index e0ff7d5a8e..4fff013d98 100644 --- a/indexer/services/vulcan/__tests__/handlers/order-place-handler.test.ts +++ b/indexer/services/vulcan/__tests__/handlers/order-place-handler.test.ts @@ -7,7 +7,6 @@ import { import { synchronizeWrapBackgroundTask } from '@dydxprotocol-indexer/dev'; import { createKafkaMessage, - ORDERBOOKS_WEBSOCKET_MESSAGE_VERSION, producer, SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, getTriggerPrice, @@ -19,7 +18,6 @@ import { blockHeightRefresher, BlockTable, dbHelpers, - OrderbookMessageContents, OrderFromDatabase, OrderTable, PerpetualMarketFromDatabase, @@ -61,7 +59,6 @@ import { redisClient, redisClient as client } from '../../src/helpers/redis/redi import { onMessage } from '../../src/lib/on-message'; import { expectCanceledOrderStatus, handleInitialOrderPlace } from '../helpers/helpers'; import { expectOffchainUpdateMessage, expectWebsocketOrderbookMessage, expectWebsocketSubaccountMessage } from '../helpers/websocket-helpers'; -import { OrderbookSide } from '../../src/lib/types'; import { getOrderIdHash, isLongTermOrder, isStatefulOrder } from '@dydxprotocol-indexer/v4-proto-parser'; import { defaultKafkaHeaders } from '../helpers/constants'; import config from '../../src/config'; @@ -539,18 +536,9 @@ describe('order-place-handler', () => { ) => { const oldOrderTotalFilled: number = 10; const oldPriceLevelInitialQuantums: number = Number(initialOrderToPlace.quantums) * 2; - // After replacing the order the quantums at the price level of the old order should be: - // initial quantums - (old order quantums - old order total filled) - const expectedPriceLevelQuantums: number = ( - oldPriceLevelInitialQuantums - (Number(initialOrderToPlace.quantums) - oldOrderTotalFilled) - ); - const expectedPriceLevelSize: string = protocolTranslations.quantumsToHumanFixedString( - expectedPriceLevelQuantums.toString(), - testConstants.defaultPerpetualMarket.atomicResolution, - ); const expectedPriceLevel: PriceLevel = { humanPrice: expectedRedisOrder.price, - quantums: expectedPriceLevelQuantums.toString(), + quantums: oldPriceLevelInitialQuantums.toString(), lastUpdated: expect.stringMatching(/^[0-9]{10}$/), }; @@ -582,6 +570,7 @@ describe('order-place-handler', () => { newTotalFilledQuantums: oldOrderTotalFilled, client, }); + // Update the price level in the order book to a value larger than the quantums of the order await OrderbookLevelsCache.updatePriceLevel({ ticker: testConstants.defaultPerpetualMarket.ticker, @@ -614,12 +603,6 @@ describe('order-place-handler', () => { } expect(logger.error).not.toHaveBeenCalled(); - const orderbookContents: OrderbookMessageContents = { - [OrderbookSide.BIDS]: [[ - redisTestConstants.defaultPrice, - expectedPriceLevelSize, - ]], - }; expectWebsocketMessagesSent( producerSendSpy, expectedReplacedOrder, @@ -627,12 +610,6 @@ describe('order-place-handler', () => { testConstants.defaultPerpetualMarket, APIOrderStatusEnum.BEST_EFFORT_OPENED, expectSubaccountMessage, - expectOrderBookUpdate - ? OrderbookMessage.fromPartial({ - contents: JSON.stringify(orderbookContents), - clobPairId: testConstants.defaultPerpetualMarket.clobPairId, - version: ORDERBOOKS_WEBSOCKET_MESSAGE_VERSION, - }) : undefined, ); expectStats(true); }); @@ -1105,11 +1082,6 @@ describe('order-place-handler', () => { APIOrderStatusEnum.BEST_EFFORT_OPENED, true, ); - - expect(logger.info).toHaveBeenCalledWith(expect.objectContaining({ - at: 'OrderPlaceHandler#handle', - message: 'Total filled of order in Redis exceeds order quantums.', - })); }); }); }); diff --git a/indexer/services/vulcan/src/handlers/order-place-handler.ts b/indexer/services/vulcan/src/handlers/order-place-handler.ts index 0c1062fd82..d32021c43b 100644 --- a/indexer/services/vulcan/src/handlers/order-place-handler.ts +++ b/indexer/services/vulcan/src/handlers/order-place-handler.ts @@ -6,11 +6,9 @@ import { OrderTable, PerpetualMarketFromDatabase, perpetualMarketRefresher, - protocolTranslations, } from '@dydxprotocol-indexer/postgres'; import { CanceledOrdersCache, - OrderbookLevelsCache, placeOrder, PlaceOrderResult, StatefulOrderUpdatesCache, @@ -21,7 +19,6 @@ import { isLongTermOrder, isStatefulOrder, ORDER_FLAG_SHORT_TERM, - requiresImmediateExecution, } from '@dydxprotocol-indexer/v4-proto-parser'; import { IndexerOrder, @@ -31,7 +28,6 @@ import { OrderUpdateV1, RedisOrder, } from '@dydxprotocol-indexer/v4-protos'; -import Big from 'big.js'; import { IHeaders, Message } from 'kafkajs'; import config from '../config'; @@ -96,14 +92,6 @@ export class OrderPlaceHandler extends Handler { placeOrderResult, }); - // TODO(IND-68): Remove once order replacement flow in V4 protocol removes the old order and - // places the updated order. - const updatedQuantums: number | undefined = await this.updatePriceLevel( - placeOrderResult, - perpetualMarket, - update, - ); - if (placeOrderResult.replaced) { stats.increment(`${config.SERVICE_NAME}.place_order_handler.replaced_order`, 1); } @@ -145,97 +133,6 @@ export class OrderPlaceHandler extends Handler { }; sendMessageWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS); } - - // TODO(IND-68): Remove once order replacement flow in V4 protocol removes the old order and - // places the updated order. - if (updatedQuantums !== undefined) { - const orderbookMessage: Message = { - value: this.createOrderbookWebsocketMessage( - placeOrderResult.oldOrder!, - perpetualMarket, - updatedQuantums, - ), - headers, - }; - sendMessageWrapper(orderbookMessage, KafkaTopics.TO_WEBSOCKETS_ORDERBOOKS); - } - } - - /** - * Updates the price level given the result of calling `placeOrder`. - * @param result `PlaceOrderResult` from calling `placeOrder` - * @param perpetualMarket Perpetual market object corresponding to the perpetual market of the - * order - * @param update Off-chain update - * @returns - */ - // eslint-disable-next-line @typescript-eslint/require-await - protected async updatePriceLevel( - result: PlaceOrderResult, - perpetualMarket: PerpetualMarketFromDatabase, - update: OffChainUpdateV1, - ): Promise { - // TODO(DEC-1339): Update price levels based on if the order is reduce-only and if the replaced - // order is reduce-only. - if ( - result.replaced !== true || - result.restingOnBook !== true || - requiresImmediateExecution(result.oldOrder!.order!.timeInForce) - ) { - return undefined; - } - - const remainingSizeDeltaInQuantums: Big = this.getRemainingSizeDeltaInQuantums(result); - - if (remainingSizeDeltaInQuantums.eq(0)) { - // No update to the price level if remaining quantums = 0 - // An order could have remaining quantums = 0 intra-block, as an order is only considered - // filled once the fills are committed in a block - return undefined; - } - - if (remainingSizeDeltaInQuantums.lt(0)) { - // Log error and skip updating orderbook levels if old order had negative remaining - // quantums - logger.info({ - at: 'OrderPlaceHandler#handle', - message: 'Total filled of order in Redis exceeds order quantums.', - placeOrderResult: result, - update, - }); - stats.increment(`${config.SERVICE_NAME}.order_place_total_filled_exceeds_size`, 1); - return undefined; - } - - // If the remaining size is not equal or less than 0, it must be greater than 0. - // Remove the remaining size of the replaced order from the orderbook, by decrementing - // the total size of orders at the price of the replaced order - return runFuncWithTimingStat( - OrderbookLevelsCache.updatePriceLevel({ - ticker: perpetualMarket.ticker, - side: protocolTranslations.protocolOrderSideToOrderSide(result.oldOrder!.order!.side), - humanPrice: result.oldOrder!.price, - // Delta should be -1 * remaining size of order in quantums and an integer - sizeDeltaInQuantums: remainingSizeDeltaInQuantums.mul(-1).toFixed(0), - client: redisClient, - }), - this.generateTimingStatsOptions('update_price_level'), - ); - } - - /** - * Gets the remaining size of the old order if the order was replaced. - * @param result Result of placing an order, should be for a replaced order so both `oldOrder` and - * `oldTotalFilledQuantums` properties should exist on the place order result. - * @returns Remaining size of the old order that was replaced. - */ - protected getRemainingSizeDeltaInQuantums(result: PlaceOrderResult): Big { - const sizeDeltaInQuantums: Big = Big( - result.oldOrder!.order!.quantums.toString(), - ).minus( - result.oldTotalFilledQuantums!, - ); - return sizeDeltaInQuantums; } protected validateOrderPlace(orderPlace: OrderPlaceV1): void { diff --git a/protocol/x/clob/memclob/memclob.go b/protocol/x/clob/memclob/memclob.go index cd36a24aa9..72ef7748c9 100644 --- a/protocol/x/clob/memclob/memclob.go +++ b/protocol/x/clob/memclob/memclob.go @@ -495,23 +495,29 @@ func (m *MemClobPriceTimePriority) PlaceOrder( } if m.generateOffchainUpdates { - // If this is a replacement order, then ensure we send the appropriate replacement message. + // Send an order place message. + // For replacement orders, if the price of the existing order is different from the new order, + // create an order removal message first so we can remove the original price level from the orderbook. + // TODO (CT-884): send OrderReplaceV1 message for replacement orders and add order-replace-handler to Vulcan orderId := order.OrderId - if _, found := orderbook.getOrder(orderId); found { - if message, success := off_chain_updates.CreateOrderReplaceMessage( - ctx, - order, - ); success { - offchainUpdates.AddReplaceMessage(orderId, message) - } - } else { - if message, success := off_chain_updates.CreateOrderPlaceMessage( - ctx, - order, - ); success { - offchainUpdates.AddPlaceMessage(order.OrderId, message) + if existingOrder, found := orderbook.getOrder(orderId); found { + if order.Subticks != existingOrder.Subticks { + if message, success := off_chain_updates.CreateOrderRemoveMessageWithReason( + ctx, + orderId, + indexersharedtypes.OrderRemovalReason_ORDER_REMOVAL_REASON_REPLACED, + ocutypes.OrderRemoveV1_ORDER_REMOVAL_STATUS_BEST_EFFORT_CANCELED, + ); success { + offchainUpdates.AddRemoveMessage(orderId, message) + } } } + if message, success := off_chain_updates.CreateOrderPlaceMessage( + ctx, + order, + ); success { + offchainUpdates.AddPlaceMessage(order.OrderId, message) + } } // Attempt to match the order against the orderbook. diff --git a/protocol/x/clob/memclob/memclob_place_order_reduce_only_test.go b/protocol/x/clob/memclob/memclob_place_order_reduce_only_test.go index 52e1370cda..66058116bb 100644 --- a/protocol/x/clob/memclob/memclob_place_order_reduce_only_test.go +++ b/protocol/x/clob/memclob/memclob_place_order_reduce_only_test.go @@ -1385,6 +1385,7 @@ func TestPlaceOrder_ReduceOnly(t *testing.T) { tc.expectedCancelledReduceOnlyOrders, // TODO(IND-261): Add tests for replaced reduce-only orders. false, + false, ) }) } @@ -1668,6 +1669,7 @@ func TestPlaceOrder_LongTermReduceOnlyRemovals(t *testing.T) { tc.expectedNewMatches, tc.expectedCancelledReduceOnlyOrders, false, + false, ) }) } diff --git a/protocol/x/clob/memclob/memclob_place_order_test.go b/protocol/x/clob/memclob/memclob_place_order_test.go index 107bb7ffa3..3744160f78 100644 --- a/protocol/x/clob/memclob/memclob_place_order_test.go +++ b/protocol/x/clob/memclob/memclob_place_order_test.go @@ -319,6 +319,7 @@ func TestPlaceOrder_AddOrderToOrderbook(t *testing.T) { ordersOnBook = append(ordersOnBook, &order) } + expectedReplacementOrderPriceChanged := false for _, matchableOrder := range ordersOnBook { // Note we assume these are regular orders since liquidation orders cannot rest on // the book. @@ -327,6 +328,9 @@ func TestPlaceOrder_AddOrderToOrderbook(t *testing.T) { // is no longer on the book. matchableOrderOrder := matchableOrder.MustGetOrder() if matchableOrderOrder.OrderId == tc.order.OrderId && tc.order.MustCmpReplacementOrder(&matchableOrderOrder) > 0 { + if matchableOrderOrder.Subticks != tc.order.Subticks { + expectedReplacementOrderPriceChanged = true + } continue } @@ -378,6 +382,7 @@ func TestPlaceOrder_AddOrderToOrderbook(t *testing.T) { []expectedMatch{}, []types.OrderId{}, tc.expectedToReplaceOrder, + expectedReplacementOrderPriceChanged, ) }) } @@ -2215,8 +2220,7 @@ func TestPlaceOrder_MatchOrders_PreexistingMatches(t *testing.T) { expectedOrderStatus: types.InternalError, expectedToReplaceOrder: true, - - expectedErr: types.ErrFokOrderCouldNotBeFullyFilled, + expectedErr: types.ErrFokOrderCouldNotBeFullyFilled, }, } @@ -2275,6 +2279,7 @@ func TestPlaceOrder_MatchOrders_PreexistingMatches(t *testing.T) { tc.expectedNewMatches, []types.OrderId{}, tc.expectedToReplaceOrder, + false, ) }) } @@ -3246,6 +3251,7 @@ func TestPlaceOrder_PostOnly(t *testing.T) { []expectedMatch{}, []types.OrderId{}, false, + false, ) }) } @@ -3389,6 +3395,7 @@ func TestPlaceOrder_ImmediateOrCancel(t *testing.T) { tc.expectedCollatCheck, []types.OrderId{}, false, + false, ) }) } @@ -4063,6 +4070,7 @@ func TestPlaceOrder_FillOrKill(t *testing.T) { expectedMatches, []types.OrderId{}, false, + false, ) }) } diff --git a/protocol/x/clob/memclob/memclob_test_util.go b/protocol/x/clob/memclob/memclob_test_util.go index 0c437a26e9..ddac1d590d 100644 --- a/protocol/x/clob/memclob/memclob_test_util.go +++ b/protocol/x/clob/memclob/memclob_test_util.go @@ -1292,6 +1292,7 @@ func assertPlaceOrderOffchainMessages( expectedNewMatches []expectedMatch, expectedCancelledReduceOnlyOrders []types.OrderId, expectedToReplaceOrder bool, + expectedReplacementOrderPriceChanged bool, ) { actualOffchainMessages := offchainUpdates.GetMessages() expectedOffchainMessages := []msgsender.Message{} @@ -1299,21 +1300,27 @@ func assertPlaceOrderOffchainMessages( // If there are no errors expected, an order place message should be sent. if expectedErr == nil || doesErrorProduceOffchainMessages(expectedErr) { - var updateMessage msgsender.Message if expectedToReplaceOrder { - updateMessage = off_chain_updates.MustCreateOrderReplaceMessage( - ctx, - order, - ) - } else { - updateMessage = off_chain_updates.MustCreateOrderPlaceMessage( - ctx, - order, - ) + if expectedReplacementOrderPriceChanged { + removeMessage := off_chain_updates.MustCreateOrderRemoveMessageWithReason( + ctx, + order.OrderId, + indexershared.OrderRemovalReason_ORDER_REMOVAL_REASON_REPLACED, + ocutypes.OrderRemoveV1_ORDER_REMOVAL_STATUS_BEST_EFFORT_CANCELED, + ) + expectedOffchainMessages = append( + expectedOffchainMessages, + removeMessage, + ) + } } + placeMessage := off_chain_updates.MustCreateOrderPlaceMessage( + ctx, + order, + ) expectedOffchainMessages = append( expectedOffchainMessages, - updateMessage, + placeMessage, ) require.Equal(t, expectedOffchainMessages, actualOffchainMessages[:len(expectedOffchainMessages)]) }