From ca5bcb454cc64f1d3f3c5d479f0a299263414e9d Mon Sep 17 00:00:00 2001 From: vincentwschau <99756290+vincentwschau@users.noreply.github.com> Date: Mon, 23 Oct 2023 13:08:13 -0400 Subject: [PATCH] [IND-402] Cache and send order updates for stateful orders. (#683) --- .../stateful-order-placement-handler.test.ts | 62 ++++++++- .../stateful-order-placement-handler.ts | 29 +++- .../tasks/remove-old-order-updates.test.ts | 125 ++++++++++++++++++ indexer/services/roundtable/src/config.ts | 7 + indexer/services/roundtable/src/index.ts | 9 ++ .../src/tasks/remove-old-order-updates.ts | 68 ++++++++++ .../handlers/order-update-handler.test.ts | 43 +++++- .../src/handlers/order-update-handler.ts | 33 +++++ 8 files changed, 368 insertions(+), 8 deletions(-) create mode 100644 indexer/services/roundtable/__tests__/tasks/remove-old-order-updates.test.ts create mode 100644 indexer/services/roundtable/src/tasks/remove-old-order-updates.ts diff --git a/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-placement-handler.test.ts b/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-placement-handler.test.ts index cea37d7762..a0d79c6219 100644 --- a/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-placement-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-placement-handler.test.ts @@ -19,6 +19,7 @@ import { IndexerOrder, OrderPlaceV1_OrderPlacementStatus, StatefulOrderEventV1, + OrderUpdateV1, } from '@dydxprotocol-indexer/v4-protos'; import { KafkaMessage } from 'kafkajs'; import { onMessage } from '../../../src/lib/on-message'; @@ -45,6 +46,10 @@ import { STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE } from '../../../src/constants'; import { producer } from '@dydxprotocol-indexer/kafka'; import { ORDER_FLAG_LONG_TERM } from '@dydxprotocol-indexer/v4-proto-parser'; import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions'; +import { + redis, redisTestConstants, StatefulOrderUpdateInfo, StatefulOrderUpdatesCache, +} from '@dydxprotocol-indexer/redis'; +import { redisClient } from '../../../src/helpers/redis/redis-controller'; describe('statefulOrderPlacementHandler', () => { beforeAll(async () => { @@ -64,6 +69,7 @@ describe('statefulOrderPlacementHandler', () => { afterEach(async () => { await dbHelpers.clearData(); + await redis.deleteAllAsync(redisClient); jest.clearAllMocks(); }); @@ -138,15 +144,48 @@ describe('statefulOrderPlacementHandler', () => { it.each([ // TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent - ['stateful order placement', defaultStatefulOrderEvent], - ['stateful long term order placement', defaultStatefulOrderLongTermEvent], + [ + 'stateful order placement and no cached update', + defaultStatefulOrderEvent, + undefined, + ], + [ + 'stateful long term order placement and no cached update', + defaultStatefulOrderLongTermEvent, + undefined, + ], + [ + 'stateful order placement and cached update', + defaultStatefulOrderEvent, + { + ...redisTestConstants.orderUpdate.orderUpdate, + orderId: defaultOrder.orderId, + }, + ], + [ + 'stateful long term order placement and cached update', + defaultStatefulOrderLongTermEvent, + { + ...redisTestConstants.orderUpdate.orderUpdate, + orderId: defaultOrder.orderId, + }, + ], ])('successfully places order with %s', async ( _name: string, statefulOrderEvent: StatefulOrderEventV1, + cachedOrderUpdate: OrderUpdateV1 | undefined, ) => { const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent( statefulOrderEvent, ); + if (cachedOrderUpdate !== undefined) { + await StatefulOrderUpdatesCache.addStatefulOrderUpdate( + orderId, + cachedOrderUpdate, + Date.now(), + redisClient, + ); + } await onMessage(kafkaMessage); const order: OrderFromDatabase | undefined = await OrderTable.findById(orderId); @@ -185,6 +224,25 @@ describe('statefulOrderPlacementHandler', () => { orderId: defaultOrder.orderId!, offchainUpdate: expectedOffchainUpdate, }); + + // If there was a cached order update, expect the cache to be empty and a corresponding + // off-chain update to have been sent to the Kafka producer + if (cachedOrderUpdate !== undefined) { + const orderUpdates: StatefulOrderUpdateInfo[] = await StatefulOrderUpdatesCache + .getOldOrderUpdates( + Date.now(), + redisClient, + ); + expect(orderUpdates).toHaveLength(0); + + expectVulcanKafkaMessage({ + producerSendMock, + orderId: defaultOrder.orderId!, + offchainUpdate: { + orderUpdate: cachedOrderUpdate, + }, + }); + } }); it.each([ diff --git a/indexer/services/ender/src/handlers/stateful-order/stateful-order-placement-handler.ts b/indexer/services/ender/src/handlers/stateful-order/stateful-order-placement-handler.ts index 12331c22fe..36994c6eeb 100644 --- a/indexer/services/ender/src/handlers/stateful-order/stateful-order-placement-handler.ts +++ b/indexer/services/ender/src/handlers/stateful-order/stateful-order-placement-handler.ts @@ -6,14 +6,17 @@ import { perpetualMarketRefresher, OrderStatus, } from '@dydxprotocol-indexer/postgres'; +import { StatefulOrderUpdatesCache } from '@dydxprotocol-indexer/redis'; import { getOrderIdHash } from '@dydxprotocol-indexer/v4-proto-parser'; import { OrderPlaceV1_OrderPlacementStatus, OffChainUpdateV1, IndexerOrder, StatefulOrderEventV1, + OrderUpdateV1, } from '@dydxprotocol-indexer/v4-protos'; +import { redisClient } from '../../helpers/redis/redis-controller'; import { ConsolidatedKafkaEvent } from '../../lib/types'; import { AbstractStatefulOrderHandler } from '../abstract-stateful-order-handler'; @@ -60,18 +63,34 @@ export class StatefulOrderPlacementHandler extends this.generateTimingStatsOptions('upsert_order'), ); + const kafakEvents: ConsolidatedKafkaEvent[] = []; + const offChainUpdate: OffChainUpdateV1 = OffChainUpdateV1.fromPartial({ orderPlace: { order, placementStatus: OrderPlaceV1_OrderPlacementStatus.ORDER_PLACEMENT_STATUS_OPENED, }, }); + kafakEvents.push(this.generateConsolidatedVulcanKafkaEvent( + getOrderIdHash(order.orderId!), + offChainUpdate, + )); - return [ - this.generateConsolidatedVulcanKafkaEvent( + const pendingOrderUpdate: OrderUpdateV1 | undefined = await StatefulOrderUpdatesCache + .removeStatefulOrderUpdate( + OrderTable.orderIdToUuid(order.orderId!), + Date.now(), + redisClient, + ); + if (pendingOrderUpdate !== undefined) { + kafakEvents.push(this.generateConsolidatedVulcanKafkaEvent( getOrderIdHash(order.orderId!), - offChainUpdate, - ), - ]; + OffChainUpdateV1.fromPartial({ + orderUpdate: pendingOrderUpdate, + }), + )); + } + + return kafakEvents; } } diff --git a/indexer/services/roundtable/__tests__/tasks/remove-old-order-updates.test.ts b/indexer/services/roundtable/__tests__/tasks/remove-old-order-updates.test.ts new file mode 100644 index 0000000000..3bda397764 --- /dev/null +++ b/indexer/services/roundtable/__tests__/tasks/remove-old-order-updates.test.ts @@ -0,0 +1,125 @@ +import { stats } from '@dydxprotocol-indexer/base'; +import { + StatefulOrderUpdateInfo, + StatefulOrderUpdatesCache, + redis, + redisTestConstants, +} from '@dydxprotocol-indexer/redis'; +import config from '../../src/config'; +import removeOldOrderUpdatesTask from '../../src/tasks/remove-old-order-updates'; +import { redisClient } from '../../src/helpers/redis'; + +describe('remove-old-order-updates', () => { + const fakeTime: Date = new Date(2023, 9, 25, 0, 0, 0, 0); + + beforeAll(() => { + jest.useFakeTimers().setSystemTime(fakeTime); + }); + + afterAll(() => { + jest.resetAllMocks(); + jest.useRealTimers(); + }); + + beforeEach(() => { + jest.spyOn(stats, 'gauge'); + jest.clearAllMocks(); + }); + + afterEach(async () => { + await redis.deleteAllAsync(redisClient); + jest.clearAllMocks(); + }); + + it('succeeds with no cached order updates', async () => { + await removeOldOrderUpdatesTask(); + expect(stats.gauge).toHaveBeenCalledWith( + `${config.SERVICE_NAME}.remove_old_order_updates.num_removed`, + 0, + ); + }); + + it('succeeds with no old cached order updates', async () => { + await StatefulOrderUpdatesCache.addStatefulOrderUpdate( + redisTestConstants.defaultOrderUuidGoodTilBlockTime, + redisTestConstants.orderUpdate.orderUpdate, + fakeTime.getTime() - 1, + redisClient, + ); + const existingUpdates: StatefulOrderUpdateInfo[] = await StatefulOrderUpdatesCache + .getOldOrderUpdates( + fakeTime.getTime(), + redisClient, + ); + expect(existingUpdates).toHaveLength(1); + + await removeOldOrderUpdatesTask(); + expect(stats.gauge).toHaveBeenCalledWith( + `${config.SERVICE_NAME}.remove_old_order_updates.num_removed`, + 0, + ); + + const updatesAfterTask: StatefulOrderUpdateInfo[] = await StatefulOrderUpdatesCache + .getOldOrderUpdates( + fakeTime.getTime(), + redisClient, + ); + expect(updatesAfterTask).toEqual(existingUpdates); + }); + + it('succeeds with no old cached order updates', async () => { + await StatefulOrderUpdatesCache.addStatefulOrderUpdate( + redisTestConstants.defaultOrderUuidGoodTilBlockTime, + redisTestConstants.orderUpdate.orderUpdate, + fakeTime.getTime() - 1, + redisClient, + ); + const existingUpdates: StatefulOrderUpdateInfo[] = await StatefulOrderUpdatesCache + .getOldOrderUpdates( + fakeTime.getTime(), + redisClient, + ); + expect(existingUpdates).toHaveLength(1); + + await removeOldOrderUpdatesTask(); + expect(stats.gauge).toHaveBeenCalledWith( + `${config.SERVICE_NAME}.remove_old_order_updates.num_removed`, + 0, + ); + + const updatesAfterTask: StatefulOrderUpdateInfo[] = await StatefulOrderUpdatesCache + .getOldOrderUpdates( + fakeTime.getTime(), + redisClient, + ); + expect(updatesAfterTask).toEqual(existingUpdates); + }); + + it('succeeds removing old cached order update', async () => { + await StatefulOrderUpdatesCache.addStatefulOrderUpdate( + redisTestConstants.defaultOrderUuidGoodTilBlockTime, + redisTestConstants.orderUpdate.orderUpdate, + fakeTime.getTime() - config.OLD_CACHED_ORDER_UPDATES_WINDOW_MS, + redisClient, + ); + const existingUpdates: StatefulOrderUpdateInfo[] = await StatefulOrderUpdatesCache + .getOldOrderUpdates( + fakeTime.getTime(), + redisClient, + ); + expect(existingUpdates).toHaveLength(1); + + await removeOldOrderUpdatesTask(); + expect(stats.gauge).toHaveBeenCalledWith( + `${config.SERVICE_NAME}.remove_old_order_updates.num_removed`, + 1, + ); + + const updatesAfterTask: StatefulOrderUpdateInfo[] = await StatefulOrderUpdatesCache + .getOldOrderUpdates( + fakeTime.getTime(), + redisClient, + ); + expect(updatesAfterTask).toHaveLength(0); + }); +}); diff --git a/indexer/services/roundtable/src/config.ts b/indexer/services/roundtable/src/config.ts index 1ef825af81..b444ffd097 100644 --- a/indexer/services/roundtable/src/config.ts +++ b/indexer/services/roundtable/src/config.ts @@ -41,6 +41,7 @@ export const configSchema = { LOOPS_CANCEL_STALE_ORDERS: parseBoolean({ default: true }), LOOPS_ENABLED_UPDATE_RESEARCH_ENVIRONMENT: parseBoolean({ default: true }), LOOPS_ENABLED_TRACK_LAG: parseBoolean({ default: false }), + LOOPS_ENABLED_REMOVE_OLD_ORDER_UPDATES: parseBoolean({ default: true }), // Loop Timing LOOPS_INTERVAL_MS_MARKET_UPDATER: parseInteger({ @@ -70,6 +71,9 @@ export const configSchema = { LOOPS_INTERVAL_MS_TRACK_LAG: parseInteger({ default: TEN_SECONDS_IN_MILLISECONDS, }), + LOOPS_INTERVAL_MS_REMOVE_OLD_ORDER_UPDATES: parseInteger({ + default: THIRTY_SECONDS_IN_MILLISECONDS, + }), // Start delay START_DELAY_ENABLED: parseBoolean({ default: true }), @@ -119,6 +123,9 @@ export const configSchema = { MAX_COMPLIANCE_DATA_QUERY_PER_LOOP: parseInteger({ default: 100 }), COMPLIANCE_PROVIDER_QUERY_BATCH_SIZE: parseInteger({ default: 100 }), COMPLIANCE_PROVIDER_QUERY_DELAY_MS: parseInteger({ default: ONE_SECOND_IN_MILLISECONDS }), + + // Remove old cached order updates + OLD_CACHED_ORDER_UPDATES_WINDOW_MS: parseInteger({ default: 30 * ONE_SECOND_IN_MILLISECONDS }), }; export default parseSchema(configSchema); diff --git a/indexer/services/roundtable/src/index.ts b/indexer/services/roundtable/src/index.ts index 3486c862f1..fb40d616d8 100644 --- a/indexer/services/roundtable/src/index.ts +++ b/indexer/services/roundtable/src/index.ts @@ -14,6 +14,7 @@ import deleteZeroPriceLevelsTask from './tasks/delete-zero-price-levels'; import marketUpdaterTask from './tasks/market-updater'; import orderbookInstrumentationTask from './tasks/orderbook-instrumentation'; import removeExpiredOrdersTask from './tasks/remove-expired-orders'; +import removeOldOrderUpdatesTask from './tasks/remove-old-order-updates'; import trackLag from './tasks/track-lag'; import updateComplianceDataTask from './tasks/update-compliance-data'; import updateResearchEnvironmentTask from './tasks/update-research-environment'; @@ -112,6 +113,14 @@ async function start(): Promise { ); } + if (config.LOOPS_ENABLED_REMOVE_OLD_ORDER_UPDATES) { + startLoop( + removeOldOrderUpdatesTask, + 'remove_old_order_updates', + config.LOOPS_INTERVAL_MS_REMOVE_OLD_ORDER_UPDATES, + ); + } + logger.info({ at: 'index', message: 'Successfully started', diff --git a/indexer/services/roundtable/src/tasks/remove-old-order-updates.ts b/indexer/services/roundtable/src/tasks/remove-old-order-updates.ts new file mode 100644 index 0000000000..e38a2d0c1e --- /dev/null +++ b/indexer/services/roundtable/src/tasks/remove-old-order-updates.ts @@ -0,0 +1,68 @@ +import { + logger, + stats, +} from '@dydxprotocol-indexer/base'; +import { + StatefulOrderUpdateInfo, + StatefulOrderUpdatesCache, +} from '@dydxprotocol-indexer/redis'; +import { OrderUpdateV1 } from '@dydxprotocol-indexer/v4-protos'; + +import config from '../config'; +import { redisClient } from '../helpers/redis'; + +/** + * This task removes any old cached stateful orer updates from the StatefulOrderUpdates cache + */ +export default async function runTask(): Promise { + const start: number = Date.now(); + + try { + const oldUpdateCutoff: number = Date.now() - config.OLD_CACHED_ORDER_UPDATES_WINDOW_MS; + + const oldUpdateInfo: StatefulOrderUpdateInfo[] = await StatefulOrderUpdatesCache + .getOldOrderUpdates( + oldUpdateCutoff, + redisClient, + ); + const removedUpdates: OrderUpdateV1[] = (await Promise.all( + oldUpdateInfo.map( + (updateInfo: StatefulOrderUpdateInfo): Promise => { + return StatefulOrderUpdatesCache.removeStatefulOrderUpdate( + updateInfo.orderId, + updateInfo.timestamp, + redisClient, + ); + }, + ), + )).filter( + (removedUpdate: OrderUpdateV1 | undefined): removedUpdate is OrderUpdateV1 => { + if (removedUpdate !== undefined) { + logger.info({ + at: 'remove-old-order-updates#runTask', + message: 'Removed old stateful order update', + removedUpdate, + }); + return true; + } + return false; + }, + ); + + stats.gauge( + `${config.SERVICE_NAME}.remove_old_order_updates.num_removed`, + removedUpdates.length, + ); + } catch (error) { + logger.error({ + at: 'remove-old-order-updates#runTas', + message: 'Error occurred in task to remove old stateful order updates', + error, + }); + } finally { + stats.timing( + `${config.SERVICE_NAME}.remove_old_order_updates`, + Date.now() - start, + ); + } +} diff --git a/indexer/services/vulcan/__tests__/handlers/order-update-handler.test.ts b/indexer/services/vulcan/__tests__/handlers/order-update-handler.test.ts index 58c781e333..15a9737f3c 100644 --- a/indexer/services/vulcan/__tests__/handlers/order-update-handler.test.ts +++ b/indexer/services/vulcan/__tests__/handlers/order-update-handler.test.ts @@ -4,6 +4,7 @@ import { OrdersDataCache, redis, redisTestConstants, + StatefulOrderUpdatesCache, } from '@dydxprotocol-indexer/redis'; import { expectOpenOrderIds, @@ -33,6 +34,7 @@ import { IndexerOrderId, OrderPlaceV1_OrderPlacementStatus, RedisOrder, + OrderUpdateV1, } from '@dydxprotocol-indexer/v4-protos'; import * as redisPackage from '@dydxprotocol-indexer/redis'; import { @@ -464,7 +466,7 @@ describe('OrderUpdateHandler', () => { })); }); - it('logs error and does not update OrderbookLevels if order not found', async () => { + it('logs error and does not update OrderbookLevels if short-term order not found', async () => { synchronizeWrapBackgroundTask(wrapBackgroundTask); const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); await handleOrderUpdate(redisTestConstants.orderUpdate); @@ -478,6 +480,45 @@ describe('OrderUpdateHandler', () => { expect(stats.increment).toHaveBeenCalledWith( 'vulcan.order_update_order_does_not_exist', 1, + { + orderFlags: String(redisTestConstants.orderUpdate.orderUpdate.orderId!.orderFlags), + }, + ); + }); + + it('adds order update to stateful order update cache if stateful order not found', async () => { + synchronizeWrapBackgroundTask(wrapBackgroundTask); + const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); + const statefulOrderUpdate: redisTestConstants.OffChainUpdateOrderUpdateUpdateMessage = { + ...redisTestConstants.orderUpdate, + orderUpdate: { + ...redisTestConstants.orderUpdate.orderUpdate, + orderId: redisTestConstants.defaultOrderIdGoodTilBlockTime, + }, + }; + await handleOrderUpdate(statefulOrderUpdate); + + const cachedOrderUpdate: OrderUpdateV1 | undefined = await StatefulOrderUpdatesCache + .removeStatefulOrderUpdate( + redisTestConstants.defaultOrderUuidGoodTilBlockTime, + Date.now(), + client, + ); + expect(cachedOrderUpdate).toBeDefined(); + expect(cachedOrderUpdate).toEqual(statefulOrderUpdate.orderUpdate); + + expect(OrderbookLevelsCache.updatePriceLevel).not.toHaveBeenCalled(); + expect(logger.info).toHaveBeenCalledWith(expect.objectContaining({ + at: 'OrderUpdateHandler#handle', + message: expect.stringMatching('Received order update for order that does not exist, order id '), + })); + expectWebsocketMessagesNotSent(producerSendSpy); + expect(stats.increment).toHaveBeenCalledWith( + 'vulcan.order_update_order_does_not_exist', + 1, + { + orderFlags: String(statefulOrderUpdate.orderUpdate.orderId!.orderFlags), + }, ); }); }); diff --git a/indexer/services/vulcan/src/handlers/order-update-handler.ts b/indexer/services/vulcan/src/handlers/order-update-handler.ts index fcfe290b33..58a66254d1 100644 --- a/indexer/services/vulcan/src/handlers/order-update-handler.ts +++ b/indexer/services/vulcan/src/handlers/order-update-handler.ts @@ -8,13 +8,16 @@ import { PerpetualMarketFromDatabase, protocolTranslations, perpetualMarketRefresher, + OrderTable, } from '@dydxprotocol-indexer/postgres'; import { updateOrder, UpdateOrderResult, OrderbookLevelsCache, OpenOrdersCache, + StatefulOrderUpdatesCache, } from '@dydxprotocol-indexer/redis'; +import { isStatefulOrder } from '@dydxprotocol-indexer/v4-proto-parser'; import { OffChainUpdateV1, OrderUpdateV1, @@ -74,6 +77,36 @@ export class OrderUpdateHandler extends Handler { updateResult, }); + if (updateResult.updated !== true) { + const orderFlags: number = orderUpdate.orderId!.orderFlags; + if (isStatefulOrder(orderFlags)) { + // If the order update was for a stateful order, add it to a cache of order updates + // for stateful orders, so it can be re-sent after `ender` processes the on-chain + // event for the stateful order placement + await StatefulOrderUpdatesCache.addStatefulOrderUpdate( + OrderTable.orderIdToUuid(orderUpdate.orderId!), + orderUpdate, + Date.now(), + redisClient, + ); + } + logger.info({ + at: 'OrderUpdateHandler#handle', + message: 'Received order update for order that does not exist, order id ' + + `${JSON.stringify(orderUpdate.orderId!)}`, + update, + updateResult, + }); + stats.increment( + `${config.SERVICE_NAME}.order_update_order_does_not_exist`, + 1, + { + orderFlags: String(orderFlags), + }, + ); + return; + } + if (updateResult.updated !== true) { logger.info({ at: 'OrderUpdateHandler#handle',