From 5b2cd3cb72239fbab33b96179f36abc9c6d993f7 Mon Sep 17 00:00:00 2001 From: Lukasz Cwik <126621805+lcwik@users.noreply.github.com> Date: Fri, 3 Nov 2023 19:30:37 -0700 Subject: [PATCH] [IND-475] Update ender `update clob pair` handler to execute via a SQL function. (#752) * [IND-475] Update ender `update clob pair` handler to execute via a SQL function. --- .../handlers/update-clob-pair-handler.test.ts | 72 +++++++++++-------- .../ender/__tests__/scripts/scripts.test.ts | 9 +++ indexer/services/ender/src/config.ts | 3 + .../src/handlers/update-clob-pair-handler.ts | 48 ++++++++++++- .../helpers/postgres/postgres-functions.ts | 2 + ...dydx_clob_pair_status_to_market_status.sql | 21 ++++++ .../scripts/dydx_update_clob_pair_handler.sql | 38 ++++++++++ 7 files changed, 162 insertions(+), 31 deletions(-) create mode 100644 indexer/services/ender/src/scripts/dydx_clob_pair_status_to_market_status.sql create mode 100644 indexer/services/ender/src/scripts/dydx_update_clob_pair_handler.sql diff --git a/indexer/services/ender/__tests__/handlers/update-clob-pair-handler.test.ts b/indexer/services/ender/__tests__/handlers/update-clob-pair-handler.test.ts index f0041ac215..1d5e5b25e3 100644 --- a/indexer/services/ender/__tests__/handlers/update-clob-pair-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/update-clob-pair-handler.test.ts @@ -5,7 +5,6 @@ import { dbHelpers, liquidityTierRefresher, perpetualMarketRefresher, - protocolTranslations, testMocks, } from '@dydxprotocol-indexer/postgres'; import { updateBlockCache } from '../../src/caches/block-cache'; @@ -33,6 +32,7 @@ import { createKafkaMessage, producer } from '@dydxprotocol-indexer/kafka'; import { KafkaMessage } from 'kafkajs'; import { onMessage } from '../../src/lib/on-message'; import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions'; +import config from '../../src/config'; describe('update-clob-pair-handler', () => { beforeAll(async () => { @@ -91,35 +91,49 @@ describe('update-clob-pair-handler', () => { }); }); - it('updates an existing perpetual market', async () => { - const transactionIndex: number = 0; - const kafkaMessage: KafkaMessage = createKafkaMessageFromUpdateClobPairEvent({ - updatePerpetualEvent: defaultUpdateClobPairEvent, - transactionIndex, - height: defaultHeight, - time: defaultTime, - txHash: defaultTxHash, + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'updates an existing perpetual market (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION = useSqlFunction; + const transactionIndex: number = 0; + const kafkaMessage: KafkaMessage = createKafkaMessageFromUpdateClobPairEvent({ + updatePerpetualEvent: defaultUpdateClobPairEvent, + transactionIndex, + height: defaultHeight, + time: defaultTime, + txHash: defaultTxHash, + }); + const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); + await onMessage(kafkaMessage); + + const perpetualMarketId: string = perpetualMarketRefresher.getPerpetualMarketFromClobPairId( + defaultUpdateClobPairEvent.clobPairId.toString(), + )!.id; + const perpetualMarket: + PerpetualMarketFromDatabase | undefined = await PerpetualMarketTable.findById( + perpetualMarketId, + ); + + expect(perpetualMarket).toEqual( + perpetualMarketRefresher.getPerpetualMarketFromId(perpetualMarketId)); + + if (!useSqlFunction) { + expectTimingStats(); + } + expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]); }); - const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); - await onMessage(kafkaMessage); - - const perpetualMarketId: string = perpetualMarketRefresher.getPerpetualMarketFromClobPairId( - defaultUpdateClobPairEvent.clobPairId.toString(), - )!.id; - const perpetualMarket: - PerpetualMarketFromDatabase | undefined = await PerpetualMarketTable.findById( - perpetualMarketId, - ); - expect(perpetualMarket).toEqual(expect.objectContaining({ - clobPairId: defaultUpdateClobPairEvent.clobPairId.toString(), - status: protocolTranslations.clobStatusToMarketStatus(defaultUpdateClobPairEvent.status), - quantumConversionExponent: defaultUpdateClobPairEvent.quantumConversionExponent, - subticksPerTick: defaultUpdateClobPairEvent.subticksPerTick, - stepBaseQuantums: defaultUpdateClobPairEvent.stepBaseQuantums.toNumber(), - })); - expectTimingStats(); - expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]); - }); }); function expectTimingStats() { diff --git a/indexer/services/ender/__tests__/scripts/scripts.test.ts b/indexer/services/ender/__tests__/scripts/scripts.test.ts index 932435dac4..d0d4b51c75 100644 --- a/indexer/services/ender/__tests__/scripts/scripts.test.ts +++ b/indexer/services/ender/__tests__/scripts/scripts.test.ts @@ -11,6 +11,7 @@ import { } from '@dydxprotocol-indexer/v4-protos'; import { BUFFER_ENCODING_UTF_8, + CLOB_STATUS_TO_MARKET_STATUS, dbHelpers, AssetPositionTable, PerpetualPositionTable, @@ -377,6 +378,14 @@ describe('SQL Function Tests', () => { ); }); + it('dydx_clob_pair_status_to_market_status should convert all statuses', async () => { + for (const [key, value] of Object.entries(CLOB_STATUS_TO_MARKET_STATUS)) { + const result = await getSingleRawQueryResultRow( + `SELECT dydx_clob_pair_status_to_market_status('${key}') AS result`); + expect(result).toEqual(value); + } + }); + it('dydx_create_transaction.sql should insert a transaction and return correct jsonb', async () => { const transactionHash: string = 'txnhash'; const blockHeight: string = '1'; diff --git a/indexer/services/ender/src/config.ts b/indexer/services/ender/src/config.ts index ce74d46858..485611ab0a 100644 --- a/indexer/services/ender/src/config.ts +++ b/indexer/services/ender/src/config.ts @@ -44,6 +44,9 @@ export const configSchema = { USE_SUBACCOUNT_UPDATE_SQL_FUNCTION: parseBoolean({ default: true, }), + USE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION: parseBoolean({ + default: true, + }), USE_SQL_FUNCTION_TO_CREATE_INITIAL_ROWS: parseBoolean({ default: true, }), diff --git a/indexer/services/ender/src/handlers/update-clob-pair-handler.ts b/indexer/services/ender/src/handlers/update-clob-pair-handler.ts index 95c28a7d7b..90662029f0 100644 --- a/indexer/services/ender/src/handlers/update-clob-pair-handler.ts +++ b/indexer/services/ender/src/handlers/update-clob-pair-handler.ts @@ -1,10 +1,18 @@ import assert from 'assert'; +import { logger } from '@dydxprotocol-indexer/base'; import { - PerpetualMarketFromDatabase, PerpetualMarketTable, perpetualMarketRefresher, protocolTranslations, + PerpetualMarketFromDatabase, + PerpetualMarketModel, + PerpetualMarketTable, + perpetualMarketRefresher, + protocolTranslations, + storeHelpers, } from '@dydxprotocol-indexer/postgres'; import { UpdateClobPairEventV1 } from '@dydxprotocol-indexer/v4-protos'; +import * as pg from 'pg'; +import config from '../config'; import { generatePerpetualMarketMessage } from '../helpers/kafka-helper'; import { ConsolidatedKafkaEvent } from '../lib/types'; import { Handler } from './handler'; @@ -18,6 +26,42 @@ export class UpdateClobPairHandler extends Handler { // eslint-disable-next-line @typescript-eslint/require-await public async internalHandle(): Promise { + if (config.USE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION) { + return this.handleViaSqlFunction(); + } + return this.handleViaKnex(); + } + + private async handleViaSqlFunction(): Promise { + const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes; + const result: pg.QueryResult = await storeHelpers.rawQuery( + `SELECT dydx_update_clob_pair_handler( + '${JSON.stringify(UpdateClobPairEventV1.decode(eventDataBinary))}' + ) AS result;`, + { txId: this.txId }, + ).catch((error: Error) => { + logger.error({ + at: 'UpdateClobPairHandler#handleViaSqlFunction', + message: 'Failed to handle UpdateClobPairEventV1', + error, + }); + + throw error; + }); + + const perpetualMarket: PerpetualMarketFromDatabase = PerpetualMarketModel.fromJson( + result.rows[0].result.perpetual_market) as PerpetualMarketFromDatabase; + + perpetualMarketRefresher.upsertPerpetualMarket(perpetualMarket); + + return [ + this.generateConsolidatedMarketKafkaEvent( + JSON.stringify(generatePerpetualMarketMessage([perpetualMarket])), + ), + ]; + } + + private async handleViaKnex(): Promise { const perpetualMarket: PerpetualMarketFromDatabase = await this.runFuncWithTimingStatAndErrorLogging( this.updateClobPair(), @@ -47,7 +91,7 @@ export class UpdateClobPairHandler extends Handler { if (perpetualMarket === undefined) { this.logAndThrowParseMessageError( - 'Could not find perpetual market with corresponding updatePerpetualEvent.id', + 'Could not find perpetual market with corresponding clobPairId', { event: this.event }, ); // This assert should never be hit because a ParseMessageError should be thrown above. diff --git a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts index 83cb99fe2e..3ca32a7e1a 100644 --- a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts +++ b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts @@ -30,6 +30,7 @@ const scripts: string[] = [ 'create_extension_pg_stat_statements.sql', 'create_extension_uuid_ossp.sql', 'dydx_asset_create_handler.sql', + 'dydx_clob_pair_status_to_market_status.sql', 'dydx_market_create_handler.sql', 'dydx_market_modify_handler.sql', 'dydx_market_price_update_handler.sql', @@ -48,6 +49,7 @@ const scripts: string[] = [ 'dydx_perpetual_position_and_order_side_matching.sql', 'dydx_subaccount_update_handler.sql', 'dydx_trim_scale.sql', + 'dydx_update_clob_pair_handler.sql', 'dydx_uuid.sql', 'dydx_uuid_from_asset_position_parts.sql', 'dydx_uuid_from_fill_event_parts.sql', diff --git a/indexer/services/ender/src/scripts/dydx_clob_pair_status_to_market_status.sql b/indexer/services/ender/src/scripts/dydx_clob_pair_status_to_market_status.sql new file mode 100644 index 0000000000..6d69b9dd7f --- /dev/null +++ b/indexer/services/ender/src/scripts/dydx_clob_pair_status_to_market_status.sql @@ -0,0 +1,21 @@ +/** + Returns the market status (https://github.com/dydxprotocol/v4-chain/blob/ea4f6895a73627aaa9bc5e21eed1ba51313b1ce4/indexer/packages/postgres/src/types/perpetual-market-types.ts#L60) + from the clob pair status (https://github.com/dydxprotocol/v4-chain/blob/ea4f6895a73627aaa9bc5e21eed1ba51313b1ce4/proto/dydxprotocol/indexer/protocol/v1/clob.proto#L157). + The conversion is equivalent to https://github.com/dydxprotocol/v4-chain/blob/ea4f6895a73627aaa9bc5e21eed1ba51313b1ce4/indexer/packages/postgres/src/lib/protocol-translations.ts#L351. + + Parameters: + - status: the ClobPairStatus (https://github.com/dydxprotocol/v4-chain/blob/ea4f6895a73627aaa9bc5e21eed1ba51313b1ce4/proto/dydxprotocol/indexer/protocol/v1/clob.proto#L157) +*/ +CREATE OR REPLACE FUNCTION dydx_clob_pair_status_to_market_status(status jsonb) + RETURNS text AS $$ +BEGIN + CASE status + WHEN '1'::jsonb THEN RETURN 'ACTIVE'; /** CLOB_PAIR_STATUS_ACTIVE */ + WHEN '2'::jsonb THEN RETURN 'PAUSED'; /** CLOB_PAIR_STATUS_PAUSED */ + WHEN '3'::jsonb THEN RETURN 'CANCEL_ONLY'; /** CLOB_PAIR_STATUS_CANCEL_ONLY */ + WHEN '4'::jsonb THEN RETURN 'POST_ONLY'; /** CLOB_PAIR_STATUS_POST_ONLY */ + WHEN '5'::jsonb THEN RETURN 'INITIALIZING'; /** CLOB_PAIR_STATUS_INITIALIZING */ + ELSE RAISE EXCEPTION 'Invalid clob pair status: %', status; + END CASE; +END; +$$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE; \ No newline at end of file diff --git a/indexer/services/ender/src/scripts/dydx_update_clob_pair_handler.sql b/indexer/services/ender/src/scripts/dydx_update_clob_pair_handler.sql new file mode 100644 index 0000000000..9bbd5b670e --- /dev/null +++ b/indexer/services/ender/src/scripts/dydx_update_clob_pair_handler.sql @@ -0,0 +1,38 @@ +/** + Parameters: + - event_data: The 'data' field of the IndexerTendermintEvent (https://github.com/dydxprotocol/v4-proto/blob/8d35c86/dydxprotocol/indexer/indexer_manager/event.proto#L25) + converted to JSON format. Conversion to JSON is expected to be done by JSON.stringify. + Returns: JSON object containing fields: + - perpetual_market: The updated perpetual market in perpetual-market-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/perpetual-market-model.ts). +*/ +CREATE OR REPLACE FUNCTION dydx_update_clob_pair_handler(event_data jsonb) RETURNS jsonb AS $$ +DECLARE + row_count integer; + clob_pair_id bigint; + perpetual_market_record perpetual_markets%ROWTYPE; +BEGIN + clob_pair_id = (event_data->'clobPairId')::bigint; + perpetual_market_record."status" = dydx_clob_pair_status_to_market_status(event_data->'status'); + perpetual_market_record."quantumConversionExponent" = (event_data->'quantumConversionExponent')::integer; + perpetual_market_record."subticksPerTick" = (event_data->'subticksPerTick')::integer; + perpetual_market_record."stepBaseQuantums" = dydx_from_jsonlib_long(event_data->'stepBaseQuantums'); + + UPDATE perpetual_markets + SET + "status" = perpetual_market_record."status", + "quantumConversionExponent" = perpetual_market_record."quantumConversionExponent", + "subticksPerTick" = perpetual_market_record."subticksPerTick", + "stepBaseQuantums" = perpetual_market_record."stepBaseQuantums" + WHERE "clobPairId" = clob_pair_id + RETURNING * INTO perpetual_market_record; + + IF NOT FOUND THEN + RAISE EXCEPTION 'Could not find perpetual market with corresponding clobPairId %', event_data; + END IF; + + RETURN jsonb_build_object( + 'perpetual_market', + dydx_to_jsonb(perpetual_market_record) + ); +END; +$$ LANGUAGE plpgsql; \ No newline at end of file