Skip to content

Commit

Permalink
Merge branch 'main' into chenyao/order-replacement-messages-for-vault
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyaoy committed Jun 3, 2024
2 parents 5f3e64e + 86eb47d commit b534c21
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import {
import { synchronizeWrapBackgroundTask } from '@dydxprotocol-indexer/dev';
import {
createKafkaMessage,
ORDERBOOKS_WEBSOCKET_MESSAGE_VERSION,
producer,
SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
getTriggerPrice,
Expand All @@ -19,7 +18,6 @@ import {
blockHeightRefresher,
BlockTable,
dbHelpers,
OrderbookMessageContents,
OrderFromDatabase,
OrderTable,
PerpetualMarketFromDatabase,
Expand Down Expand Up @@ -61,7 +59,6 @@ import { redisClient, redisClient as client } from '../../src/helpers/redis/redi
import { onMessage } from '../../src/lib/on-message';
import { expectCanceledOrderStatus, handleInitialOrderPlace } from '../helpers/helpers';
import { expectOffchainUpdateMessage, expectWebsocketOrderbookMessage, expectWebsocketSubaccountMessage } from '../helpers/websocket-helpers';
import { OrderbookSide } from '../../src/lib/types';
import { getOrderIdHash, isLongTermOrder, isStatefulOrder } from '@dydxprotocol-indexer/v4-proto-parser';
import { defaultKafkaHeaders } from '../helpers/constants';
import config from '../../src/config';
Expand Down Expand Up @@ -539,18 +536,9 @@ describe('order-place-handler', () => {
) => {
const oldOrderTotalFilled: number = 10;
const oldPriceLevelInitialQuantums: number = Number(initialOrderToPlace.quantums) * 2;
// After replacing the order the quantums at the price level of the old order should be:
// initial quantums - (old order quantums - old order total filled)
const expectedPriceLevelQuantums: number = (
oldPriceLevelInitialQuantums - (Number(initialOrderToPlace.quantums) - oldOrderTotalFilled)
);
const expectedPriceLevelSize: string = protocolTranslations.quantumsToHumanFixedString(
expectedPriceLevelQuantums.toString(),
testConstants.defaultPerpetualMarket.atomicResolution,
);
const expectedPriceLevel: PriceLevel = {
humanPrice: expectedRedisOrder.price,
quantums: expectedPriceLevelQuantums.toString(),
quantums: oldPriceLevelInitialQuantums.toString(),
lastUpdated: expect.stringMatching(/^[0-9]{10}$/),
};

Expand Down Expand Up @@ -582,6 +570,7 @@ describe('order-place-handler', () => {
newTotalFilledQuantums: oldOrderTotalFilled,
client,
});

// Update the price level in the order book to a value larger than the quantums of the order
await OrderbookLevelsCache.updatePriceLevel({
ticker: testConstants.defaultPerpetualMarket.ticker,
Expand Down Expand Up @@ -614,25 +603,13 @@ describe('order-place-handler', () => {
}

expect(logger.error).not.toHaveBeenCalled();
const orderbookContents: OrderbookMessageContents = {
[OrderbookSide.BIDS]: [[
redisTestConstants.defaultPrice,
expectedPriceLevelSize,
]],
};
expectWebsocketMessagesSent(
producerSendSpy,
expectedReplacedOrder,
dbOrder,
testConstants.defaultPerpetualMarket,
APIOrderStatusEnum.BEST_EFFORT_OPENED,
expectSubaccountMessage,
expectOrderBookUpdate
? OrderbookMessage.fromPartial({
contents: JSON.stringify(orderbookContents),
clobPairId: testConstants.defaultPerpetualMarket.clobPairId,
version: ORDERBOOKS_WEBSOCKET_MESSAGE_VERSION,
}) : undefined,
);
expectStats(true);
});
Expand Down Expand Up @@ -1105,11 +1082,6 @@ describe('order-place-handler', () => {
APIOrderStatusEnum.BEST_EFFORT_OPENED,
true,
);

expect(logger.info).toHaveBeenCalledWith(expect.objectContaining({
at: 'OrderPlaceHandler#handle',
message: 'Total filled of order in Redis exceeds order quantums.',
}));
});
});
});
Expand Down
103 changes: 0 additions & 103 deletions indexer/services/vulcan/src/handlers/order-place-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ import {
OrderTable,
PerpetualMarketFromDatabase,
perpetualMarketRefresher,
protocolTranslations,
} from '@dydxprotocol-indexer/postgres';
import {
CanceledOrdersCache,
OrderbookLevelsCache,
placeOrder,
PlaceOrderResult,
StatefulOrderUpdatesCache,
Expand All @@ -21,7 +19,6 @@ import {
isLongTermOrder,
isStatefulOrder,
ORDER_FLAG_SHORT_TERM,
requiresImmediateExecution,
} from '@dydxprotocol-indexer/v4-proto-parser';
import {
IndexerOrder,
Expand All @@ -31,7 +28,6 @@ import {
OrderUpdateV1,
RedisOrder,
} from '@dydxprotocol-indexer/v4-protos';
import Big from 'big.js';
import { IHeaders, Message } from 'kafkajs';

import config from '../config';
Expand Down Expand Up @@ -96,14 +92,6 @@ export class OrderPlaceHandler extends Handler {
placeOrderResult,
});

// TODO(IND-68): Remove once order replacement flow in V4 protocol removes the old order and
// places the updated order.
const updatedQuantums: number | undefined = await this.updatePriceLevel(
placeOrderResult,
perpetualMarket,
update,
);

if (placeOrderResult.replaced) {
stats.increment(`${config.SERVICE_NAME}.place_order_handler.replaced_order`, 1);
}
Expand Down Expand Up @@ -145,97 +133,6 @@ export class OrderPlaceHandler extends Handler {
};
sendMessageWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS);
}

// TODO(IND-68): Remove once order replacement flow in V4 protocol removes the old order and
// places the updated order.
if (updatedQuantums !== undefined) {
const orderbookMessage: Message = {
value: this.createOrderbookWebsocketMessage(
placeOrderResult.oldOrder!,
perpetualMarket,
updatedQuantums,
),
headers,
};
sendMessageWrapper(orderbookMessage, KafkaTopics.TO_WEBSOCKETS_ORDERBOOKS);
}
}

/**
* Updates the price level given the result of calling `placeOrder`.
* @param result `PlaceOrderResult` from calling `placeOrder`
* @param perpetualMarket Perpetual market object corresponding to the perpetual market of the
* order
* @param update Off-chain update
* @returns
*/
// eslint-disable-next-line @typescript-eslint/require-await
protected async updatePriceLevel(
result: PlaceOrderResult,
perpetualMarket: PerpetualMarketFromDatabase,
update: OffChainUpdateV1,
): Promise<number | undefined> {
// TODO(DEC-1339): Update price levels based on if the order is reduce-only and if the replaced
// order is reduce-only.
if (
result.replaced !== true ||
result.restingOnBook !== true ||
requiresImmediateExecution(result.oldOrder!.order!.timeInForce)
) {
return undefined;
}

const remainingSizeDeltaInQuantums: Big = this.getRemainingSizeDeltaInQuantums(result);

if (remainingSizeDeltaInQuantums.eq(0)) {
// No update to the price level if remaining quantums = 0
// An order could have remaining quantums = 0 intra-block, as an order is only considered
// filled once the fills are committed in a block
return undefined;
}

if (remainingSizeDeltaInQuantums.lt(0)) {
// Log error and skip updating orderbook levels if old order had negative remaining
// quantums
logger.info({
at: 'OrderPlaceHandler#handle',
message: 'Total filled of order in Redis exceeds order quantums.',
placeOrderResult: result,
update,
});
stats.increment(`${config.SERVICE_NAME}.order_place_total_filled_exceeds_size`, 1);
return undefined;
}

// If the remaining size is not equal or less than 0, it must be greater than 0.
// Remove the remaining size of the replaced order from the orderbook, by decrementing
// the total size of orders at the price of the replaced order
return runFuncWithTimingStat(
OrderbookLevelsCache.updatePriceLevel({
ticker: perpetualMarket.ticker,
side: protocolTranslations.protocolOrderSideToOrderSide(result.oldOrder!.order!.side),
humanPrice: result.oldOrder!.price,
// Delta should be -1 * remaining size of order in quantums and an integer
sizeDeltaInQuantums: remainingSizeDeltaInQuantums.mul(-1).toFixed(0),
client: redisClient,
}),
this.generateTimingStatsOptions('update_price_level'),
);
}

/**
* Gets the remaining size of the old order if the order was replaced.
* @param result Result of placing an order, should be for a replaced order so both `oldOrder` and
* `oldTotalFilledQuantums` properties should exist on the place order result.
* @returns Remaining size of the old order that was replaced.
*/
protected getRemainingSizeDeltaInQuantums(result: PlaceOrderResult): Big {
const sizeDeltaInQuantums: Big = Big(
result.oldOrder!.order!.quantums.toString(),
).minus(
result.oldTotalFilledQuantums!,
);
return sizeDeltaInQuantums;
}

protected validateOrderPlace(orderPlace: OrderPlaceV1): void {
Expand Down
34 changes: 20 additions & 14 deletions protocol/x/clob/memclob/memclob.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,23 +495,29 @@ func (m *MemClobPriceTimePriority) PlaceOrder(
}

if m.generateOffchainUpdates {
// If this is a replacement order, then ensure we send the appropriate replacement message.
// Send an order place message.
// For replacement orders, if the price of the existing order is different from the new order,
// create an order removal message first so we can remove the original price level from the orderbook.
// TODO (CT-884): send OrderReplaceV1 message for replacement orders and add order-replace-handler to Vulcan
orderId := order.OrderId
if _, found := orderbook.getOrder(orderId); found {
if message, success := off_chain_updates.CreateOrderReplaceMessage(
ctx,
order,
); success {
offchainUpdates.AddReplaceMessage(orderId, message)
}
} else {
if message, success := off_chain_updates.CreateOrderPlaceMessage(
ctx,
order,
); success {
offchainUpdates.AddPlaceMessage(order.OrderId, message)
if existingOrder, found := orderbook.getOrder(orderId); found {
if order.Subticks != existingOrder.Subticks {
if message, success := off_chain_updates.CreateOrderRemoveMessageWithReason(
ctx,
orderId,
indexersharedtypes.OrderRemovalReason_ORDER_REMOVAL_REASON_REPLACED,
ocutypes.OrderRemoveV1_ORDER_REMOVAL_STATUS_BEST_EFFORT_CANCELED,
); success {
offchainUpdates.AddRemoveMessage(orderId, message)
}
}
}
if message, success := off_chain_updates.CreateOrderPlaceMessage(
ctx,
order,
); success {
offchainUpdates.AddPlaceMessage(order.OrderId, message)
}
}

// Attempt to match the order against the orderbook.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1385,6 +1385,7 @@ func TestPlaceOrder_ReduceOnly(t *testing.T) {
tc.expectedCancelledReduceOnlyOrders,
// TODO(IND-261): Add tests for replaced reduce-only orders.
false,
false,
)
})
}
Expand Down Expand Up @@ -1668,6 +1669,7 @@ func TestPlaceOrder_LongTermReduceOnlyRemovals(t *testing.T) {
tc.expectedNewMatches,
tc.expectedCancelledReduceOnlyOrders,
false,
false,
)
})
}
Expand Down
12 changes: 10 additions & 2 deletions protocol/x/clob/memclob/memclob_place_order_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ func TestPlaceOrder_AddOrderToOrderbook(t *testing.T) {
ordersOnBook = append(ordersOnBook, &order)
}

expectedReplacementOrderPriceChanged := false
for _, matchableOrder := range ordersOnBook {
// Note we assume these are regular orders since liquidation orders cannot rest on
// the book.
Expand All @@ -327,6 +328,9 @@ func TestPlaceOrder_AddOrderToOrderbook(t *testing.T) {
// is no longer on the book.
matchableOrderOrder := matchableOrder.MustGetOrder()
if matchableOrderOrder.OrderId == tc.order.OrderId && tc.order.MustCmpReplacementOrder(&matchableOrderOrder) > 0 {
if matchableOrderOrder.Subticks != tc.order.Subticks {
expectedReplacementOrderPriceChanged = true
}
continue
}

Expand Down Expand Up @@ -378,6 +382,7 @@ func TestPlaceOrder_AddOrderToOrderbook(t *testing.T) {
[]expectedMatch{},
[]types.OrderId{},
tc.expectedToReplaceOrder,
expectedReplacementOrderPriceChanged,
)
})
}
Expand Down Expand Up @@ -2215,8 +2220,7 @@ func TestPlaceOrder_MatchOrders_PreexistingMatches(t *testing.T) {

expectedOrderStatus: types.InternalError,
expectedToReplaceOrder: true,

expectedErr: types.ErrFokOrderCouldNotBeFullyFilled,
expectedErr: types.ErrFokOrderCouldNotBeFullyFilled,
},
}

Expand Down Expand Up @@ -2275,6 +2279,7 @@ func TestPlaceOrder_MatchOrders_PreexistingMatches(t *testing.T) {
tc.expectedNewMatches,
[]types.OrderId{},
tc.expectedToReplaceOrder,
false,
)
})
}
Expand Down Expand Up @@ -3246,6 +3251,7 @@ func TestPlaceOrder_PostOnly(t *testing.T) {
[]expectedMatch{},
[]types.OrderId{},
false,
false,
)
})
}
Expand Down Expand Up @@ -3389,6 +3395,7 @@ func TestPlaceOrder_ImmediateOrCancel(t *testing.T) {
tc.expectedCollatCheck,
[]types.OrderId{},
false,
false,
)
})
}
Expand Down Expand Up @@ -4063,6 +4070,7 @@ func TestPlaceOrder_FillOrKill(t *testing.T) {
expectedMatches,
[]types.OrderId{},
false,
false,
)
})
}
Expand Down
Loading

0 comments on commit b534c21

Please sign in to comment.