Skip to content

Commit

Permalink
[CT-846] Send subaccount websocket message when cancel is received fo…
Browse files Browse the repository at this point in the history
…r non-existent order (#1540)

(cherry picked from commit 8426d41)
  • Loading branch information
dydxwill authored and mergify[bot] committed May 21, 2024
1 parent e5841c3 commit f7b673e
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 35 deletions.
2 changes: 1 addition & 1 deletion indexer/packages/kafka/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
export const TO_ENDER_TOPIC: string = 'to-ender';

export const ORDERBOOKS_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0';
export const SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION: string = '2.4.0';
export const SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION: string = '3.0.0';
export const TRADES_WEBSOCKET_MESSAGE_VERSION: string = '2.1.0';
export const MARKETS_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0';
export const CANDLES_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0';
20 changes: 10 additions & 10 deletions indexer/packages/postgres/src/types/websocket-message-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,15 @@ export interface OrderSubaccountMessageContents {
id: string;
subaccountId: string;
clientId: string;
clobPairId: string;
side: OrderSide;
size: string;
ticker: string,
price: string;
type: OrderType;
timeInForce: APITimeInForce;
postOnly: boolean;
reduceOnly: boolean;
clobPairId?: string;
side?: OrderSide;
size?: string;
ticker?: string,
price?: string;
type?: OrderType;
timeInForce?: APITimeInForce;
postOnly?: boolean;
reduceOnly?: boolean;
status: APIOrderStatus;
orderFlags: string;

Expand All @@ -125,7 +125,7 @@ export interface OrderSubaccountMessageContents {
removalReason?: string;
// This will only be set for stateful orders
createdAtHeight?: string;
clientMetadata: string;
clientMetadata?: string;
}

export interface FillSubaccountMessageContents {
Expand Down
24 changes: 12 additions & 12 deletions indexer/services/comlink/public/websocket-documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,27 +162,27 @@ export interface OrderSubaccountMessageContents {
id: string;
subaccountId: string;
clientId: string;
clobPairId: string;
side: OrderSide;
size: string;
ticker: string,
price: string;
type: OrderType;
timeInForce: APITimeInForce;
postOnly: boolean;
reduceOnly: boolean;
clobPairId?: string;
side?: OrderSide;
size?: string;
ticker?: string,
price?: string;
type?: OrderType;
timeInForce?: APITimeInForce;
postOnly?: boolean;
reduceOnly?: boolean;
status: APIOrderStatus;
orderFlags: string;
totalFilled?: string;
totalOptimisticFilled?: string;
goodTilBlock?: string;
goodTilBlockTime?: string;
removalReason?: string;
createdAtHeight?: string;
clientMetadata: string;
triggerPrice?: string;
updatedAt?: IsoString;
updatedAtHeight?: string;
removalReason?: string;
createdAtHeight?: string;
clientMetadata?: string;
}

export enum OrderSide {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import {
import { expectWebsocketOrderbookMessage, expectWebsocketSubaccountMessage } from '../helpers/websocket-helpers';
import { ORDER_FLAG_LONG_TERM } from '@dydxprotocol-indexer/v4-proto-parser';
import Long from 'long';
import config from '../../src/config';

jest.mock('@dydxprotocol-indexer/base', () => ({
...jest.requireActual('@dydxprotocol-indexer/base'),
Expand All @@ -94,6 +95,7 @@ describe('OrderRemoveHandler', () => {
await dbHelpers.clearData();
await redis.deleteAllAsync(redisClient);
jest.resetAllMocks();
config.SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_CANCELS_MISSING_ORDERS = false;
});

afterAll(async () => {
Expand Down Expand Up @@ -205,6 +207,108 @@ describe('OrderRemoveHandler', () => {
expectTimingStats();
});

it('successfully sends subaccount websocket message and returns if unable to find order in redis', async () => {
config.SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_CANCELS_MISSING_ORDERS = true;
const offChainUpdate: OffChainUpdateV1 = orderRemoveToOffChainUpdate(defaultOrderRemove);
const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis();

const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler();
await orderRemoveHandler.handleUpdate(
offChainUpdate,
defaultKafkaHeaders,
);

expect(logger.info).toHaveBeenCalledWith(expect.objectContaining({
at: 'orderRemoveHandler#handleOrderRemoval',
message: 'Unable to find order',
orderId: defaultOrderRemove.removedOrderId,
}));

// Subaccounts message is sent
const subaccountContents: SubaccountMessageContents = {
orders: [
{
id: OrderTable.orderIdToUuid(redisTestConstants.defaultOrderId),
subaccountId: testConstants.defaultSubaccountId,
clientId: redisTestConstants.defaultOrderId.clientId.toString(),
clobPairId: testConstants.defaultPerpetualMarket.clobPairId,
status: OrderStatus.CANCELED,
orderFlags: redisTestConstants.defaultOrderId.orderFlags.toString(),
ticker: redisTestConstants.defaultRedisOrder.ticker,
removalReason: OrderRemovalReason[defaultOrderRemove.reason],
},
],
};
expectWebsocketMessagesSent(
producerSendSpy,
SubaccountMessage.fromPartial({
contents: JSON.stringify(subaccountContents),
subaccountId: redisTestConstants.defaultSubaccountId,
version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
}),
);

expect(logger.error).not.toHaveBeenCalled();
expectTimingStats();
});

it('successfully sends subaccount websocket message with db order fields if unable to find order in redis',
async () => {
config.SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_CANCELS_MISSING_ORDERS = true;
await OrderTable.create(testConstants.defaultOrder);
const offChainUpdate: OffChainUpdateV1 = orderRemoveToOffChainUpdate(defaultOrderRemove);
const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis();

const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler();
await orderRemoveHandler.handleUpdate(
offChainUpdate,
defaultKafkaHeaders,
);

expect(logger.info).toHaveBeenCalledWith(expect.objectContaining({
at: 'orderRemoveHandler#handleOrderRemoval',
message: 'Unable to find order',
orderId: defaultOrderRemove.removedOrderId,
}));

// Subaccounts message is sent
const subaccountContents: SubaccountMessageContents = {
orders: [
{
id: OrderTable.orderIdToUuid(redisTestConstants.defaultOrderId),
subaccountId: testConstants.defaultSubaccountId,
clientId: redisTestConstants.defaultOrderId.clientId.toString(),
clobPairId: testConstants.defaultPerpetualMarket.clobPairId,
status: OrderStatus.CANCELED,
orderFlags: redisTestConstants.defaultOrderId.orderFlags.toString(),
ticker: redisTestConstants.defaultRedisOrder.ticker,
removalReason: OrderRemovalReason[defaultOrderRemove.reason],
updatedAt: testConstants.defaultOrder.updatedAt,
updatedAtHeight: testConstants.defaultOrder.updatedAtHeight,
price: testConstants.defaultOrder.price,
size: testConstants.defaultOrder.size,
clientMetadata: testConstants.defaultOrder.clientMetadata,
side: testConstants.defaultOrder.side,
timeInForce: apiTranslations.orderTIFToAPITIF(testConstants.defaultOrder.timeInForce),
totalFilled: testConstants.defaultOrder.totalFilled,
goodTilBlock: testConstants.defaultOrder.goodTilBlock,
type: testConstants.defaultOrder.type,
},
],
};
expectWebsocketMessagesSent(
producerSendSpy,
SubaccountMessage.fromPartial({
contents: JSON.stringify(subaccountContents),
subaccountId: redisTestConstants.defaultSubaccountId,
version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
}),
);

expect(logger.error).not.toHaveBeenCalled();
expectTimingStats();
});

it('successfully returns early if unable to find perpetualMarket', async () => {
await Promise.all([
dbHelpers.clearData(),
Expand All @@ -217,10 +321,10 @@ describe('OrderRemoveHandler', () => {
const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler();
await orderRemoveHandler.handleUpdate(offChainUpdate);

const ticker: string = testConstants.defaultPerpetualMarket.ticker;
const clobPairId: string = testConstants.defaultPerpetualMarket.clobPairId;
expect(logger.error).toHaveBeenCalledWith({
at: 'orderRemoveHandler#handle',
message: `Unable to find perpetual market with ticker: ${ticker}`,
message: `Unable to find perpetual market with clobPairId: ${clobPairId}`,
});
expectTimingStats();
});
Expand Down
3 changes: 3 additions & 0 deletions indexer/services/vulcan/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ export const configSchema = {
SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_STATEFUL_ORDERS: parseBoolean({
default: true,
}),
SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_CANCELS_MISSING_ORDERS: parseBoolean({
default: false,
}),
};

export default parseSchema(configSchema);
116 changes: 106 additions & 10 deletions indexer/services/vulcan/src/handlers/order-remove-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import {
apiTranslations,
TimeInForce,
IsoString,
OrderSide,
APITimeInForce,
OrderType,
} from '@dydxprotocol-indexer/postgres';
import {
OpenOrdersCache,
Expand Down Expand Up @@ -260,29 +263,51 @@ export class OrderRemoveHandler extends Handler {
orderRemove: OrderRemoveV1,
removeOrderResult: RemoveOrderResult,
): Promise<void> {
const perpetualMarket: PerpetualMarketFromDatabase | undefined = perpetualMarketRefresher
.getPerpetualMarketFromClobPairId(orderRemove.removedOrderId!.clobPairId.toString());
if (perpetualMarket === undefined) {
const clobPairId: string = orderRemove.removedOrderId!.clobPairId.toString();
logger.error({
at: 'orderRemoveHandler#handle',
message: `Unable to find perpetual market with clobPairId: ${clobPairId}`,
});
return;
}
// This can happen for short term orders if the order place message was not received.
if (!removeOrderResult.removed) {
logger.info({
at: 'orderRemoveHandler#handleOrderRemoval',
message: 'Unable to find order',
orderId: orderRemove.removedOrderId,
orderRemove,
});
if (config.SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_CANCELS_MISSING_ORDERS) {
const canceledOrder: OrderFromDatabase | undefined = await runFuncWithTimingStat(
OrderTable.findById(OrderTable.orderIdToUuid(orderRemove.removedOrderId!)),
this.generateTimingStatsOptions('find_order'),
);
const subaccountMessage: Message = {
value: this.createSubaccountWebsocketMessageFromOrderRemoveMessage(
canceledOrder,
orderRemove,
perpetualMarket.ticker,
),
headers,
};
const reason: OrderRemovalReason = orderRemove.reason;
if (!(
reason === OrderRemovalReason.ORDER_REMOVAL_REASON_INDEXER_EXPIRED ||
reason === OrderRemovalReason.ORDER_REMOVAL_REASON_FULLY_FILLED
)) {
sendMessageWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS);
}
}
return;
}

const stateRemainingQuantums: Big = await getStateRemainingQuantums(
removeOrderResult.removedOrder!,
);
const perpetualMarket: PerpetualMarketFromDatabase | undefined = perpetualMarketRefresher
.getPerpetualMarketFromTicker(removeOrderResult.removedOrder!.ticker);
if (perpetualMarket === undefined) {
const ticker: string = removeOrderResult.removedOrder!.ticker;
logger.error({
at: 'orderRemoveHandler#handle',
message: `Unable to find perpetual market with ticker: ${ticker}`,
});
return;
}

// If the remaining amount of the order in state is <= 0, the order is filled and
// does not need to have it's status updated
Expand Down Expand Up @@ -525,6 +550,77 @@ export class OrderRemoveHandler extends Handler {
return sizeDelta.toFixed();
}

/**
* Should be called when an OrderRemove message is received for a non-existent order.
* This can happen when the order was not found in redis because the initial order
* placement message wasn't received.
*
* @param canceledOrder
* @param orderRemove
* @param perpetualMarket
* @protected
*/
protected createSubaccountWebsocketMessageFromOrderRemoveMessage(
canceledOrder: OrderFromDatabase | undefined,
orderRemove: OrderRemoveV1,
ticker: string,
): Buffer {
const createdAtHeight: string | undefined = canceledOrder?.createdAtHeight;
const updatedAt: IsoString | undefined = canceledOrder?.updatedAt;
const updatedAtHeight: string | undefined = canceledOrder?.updatedAtHeight;
const price: string | undefined = canceledOrder?.price;
const size: string | undefined = canceledOrder?.size;
const clientMetadata: string | undefined = canceledOrder?.clientMetadata;
const reduceOnly: boolean | undefined = canceledOrder?.reduceOnly;
const side: OrderSide | undefined = canceledOrder?.side;
const timeInForce: APITimeInForce | undefined = canceledOrder
? apiTranslations.orderTIFToAPITIF(canceledOrder.timeInForce) : undefined;
const totalFilled: string | undefined = canceledOrder?.totalFilled;
const goodTilBlock: string | undefined = canceledOrder?.goodTilBlock;
const goodTilBlockTime: string | undefined = canceledOrder?.goodTilBlockTime;
const triggerPrice: string | undefined = canceledOrder?.triggerPrice;
const type: OrderType | undefined = canceledOrder?.type;

const contents: SubaccountMessageContents = {
orders: [
{
id: OrderTable.orderIdToUuid(orderRemove.removedOrderId!),
subaccountId: SubaccountTable.subaccountIdToUuid(
orderRemove.removedOrderId!.subaccountId!,
),
clientId: orderRemove.removedOrderId!.clientId.toString(),
clobPairId: orderRemove.removedOrderId!.clobPairId.toString(),
status: this.orderRemovalStatusToOrderStatus(orderRemove.removalStatus),
orderFlags: orderRemove.removedOrderId!.orderFlags.toString(),
ticker,
removalReason: OrderRemovalReason[orderRemove.reason],
...(createdAtHeight && { createdAtHeight }),
...(updatedAt && { updatedAt }),
...(updatedAtHeight && { updatedAtHeight }),
...(price && { price }),
...(size && { size }),
...(clientMetadata && { clientMetadata }),
...(reduceOnly && { reduceOnly }),
...(side && { side }),
...(timeInForce && { timeInForce }),
...(totalFilled && { totalFilled }),
...(goodTilBlock && { goodTilBlock }),
...(goodTilBlockTime && { goodTilBlockTime }),
...(triggerPrice && { triggerPrice }),
...(type && { type }),
},
],
};

const subaccountMessage: SubaccountMessage = SubaccountMessage.fromPartial({
contents: JSON.stringify(contents),
subaccountId: orderRemove.removedOrderId!.subaccountId!,
version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
});

return Buffer.from(Uint8Array.from(SubaccountMessage.encode(subaccountMessage).finish()));
}

protected createSubaccountWebsocketMessageFromRemoveOrderResult(
removeOrderResult: RemoveOrderResult,
canceledOrder: OrderFromDatabase | undefined,
Expand Down

0 comments on commit f7b673e

Please sign in to comment.