diff --git a/indexer/services/ender/__tests__/handlers/perpetual-market-handler.test.ts b/indexer/services/ender/__tests__/handlers/perpetual-market-handler.test.ts index 67bb3cef7c..02235fd182 100644 --- a/indexer/services/ender/__tests__/handlers/perpetual-market-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/perpetual-market-handler.test.ts @@ -40,6 +40,7 @@ import { } from '../helpers/constants'; import { updateBlockCache } from '../../src/caches/block-cache'; import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions'; +import config from '../../src/config'; describe('perpetualMarketHandler', () => { beforeAll(async () => { @@ -65,6 +66,7 @@ describe('perpetualMarketHandler', () => { afterEach(async () => { await dbHelpers.clearData(); + perpetualMarketRefresher.clear(); jest.clearAllMocks(); liquidityTierRefresher.clear(); }); @@ -103,71 +105,118 @@ describe('perpetualMarketHandler', () => { }); }); - it('fails when market doesnt exist for perpetual market', async () => { - const transactionIndex: number = 0; - const kafkaMessage: KafkaMessage = createKafkaMessageFromPerpetualMarketEvent({ - perpetualMarketEvent: defaultPerpetualMarketCreateEvent, - transactionIndex, - height: defaultHeight, - time: defaultTime, - txHash: defaultTxHash, + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'fails when market doesnt exist for perpetual market (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_PERPETUAL_MARKET_HANDLER_SQL_FUNCTION = useSqlFunction; + const transactionIndex: number = 0; + const kafkaMessage: KafkaMessage = createKafkaMessageFromPerpetualMarketEvent({ + perpetualMarketEvent: defaultPerpetualMarketCreateEvent, + transactionIndex, + height: defaultHeight, + time: defaultTime, + txHash: defaultTxHash, + }); + + await expect(onMessage(kafkaMessage)).rejects.toThrowError(); }); - await expect(onMessage(kafkaMessage)).rejects.toThrowError(); - }); + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'fails when liquidity tier doesnt exist for perpetual market (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_PERPETUAL_MARKET_HANDLER_SQL_FUNCTION = useSqlFunction; + await MarketTable.create(testConstants.defaultMarket); + await marketRefresher.updateMarkets(); + const transactionIndex: number = 0; + const kafkaMessage: KafkaMessage = createKafkaMessageFromPerpetualMarketEvent({ + perpetualMarketEvent: defaultPerpetualMarketCreateEvent, + transactionIndex, + height: defaultHeight, + time: defaultTime, + txHash: defaultTxHash, + }); - it('fails when liquidity tier doesnt exist for perpetual market', async () => { - await MarketTable.create(testConstants.defaultMarket); - await marketRefresher.updateMarkets(); - const transactionIndex: number = 0; - const kafkaMessage: KafkaMessage = createKafkaMessageFromPerpetualMarketEvent({ - perpetualMarketEvent: defaultPerpetualMarketCreateEvent, - transactionIndex, - height: defaultHeight, - time: defaultTime, - txHash: defaultTxHash, + await expect(onMessage(kafkaMessage)).rejects.toThrowError(); }); - await expect(onMessage(kafkaMessage)).rejects.toThrowError(); - }); - - it('creates new perpetual market', async () => { - await Promise.all([ - MarketTable.create(testConstants.defaultMarket), - LiquidityTiersTable.create(testConstants.defaultLiquidityTier), - ]); - await liquidityTierRefresher.updateLiquidityTiers(); - await marketRefresher.updateMarkets(); + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'creates new perpetual market (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_PERPETUAL_MARKET_HANDLER_SQL_FUNCTION = useSqlFunction; + await Promise.all([ + MarketTable.create(testConstants.defaultMarket), + LiquidityTiersTable.create(testConstants.defaultLiquidityTier), + ]); + await liquidityTierRefresher.updateLiquidityTiers(); + await marketRefresher.updateMarkets(); - const transactionIndex: number = 0; + const transactionIndex: number = 0; - const perpetualMarketEvent: PerpetualMarketCreateEventV1 = defaultPerpetualMarketCreateEvent; - const kafkaMessage: KafkaMessage = createKafkaMessageFromPerpetualMarketEvent({ - perpetualMarketEvent, - transactionIndex, - height: defaultHeight, - time: defaultTime, - txHash: defaultTxHash, - }); - // Confirm there is no existing perpetualMarket. - await expectNoExistingPerpetualMarkets(); + const perpetualMarketEvent: PerpetualMarketCreateEventV1 = defaultPerpetualMarketCreateEvent; + const kafkaMessage: KafkaMessage = createKafkaMessageFromPerpetualMarketEvent({ + perpetualMarketEvent, + transactionIndex, + height: defaultHeight, + time: defaultTime, + txHash: defaultTxHash, + }); + // Confirm there is no existing perpetualMarket. + await expectNoExistingPerpetualMarkets(); - const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); - await onMessage(kafkaMessage); + const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); + await onMessage(kafkaMessage); - const newPerpetualMarkets: PerpetualMarketFromDatabase[] = await PerpetualMarketTable.findAll( - {}, - [], { - orderBy: [[PerpetualMarketColumns.id, Ordering.ASC]], - }); - expect(newPerpetualMarkets.length).toEqual(1); - expectPerpetualMarketMatchesEvent(perpetualMarketEvent, newPerpetualMarkets[0]); - expectTimingStats(); - const perpetualMarket: PerpetualMarketFromDatabase | undefined = perpetualMarketRefresher.getPerpetualMarketFromId('0'); - expect(perpetualMarket).toBeDefined(); - expectPerpetualMarket(perpetualMarket!, perpetualMarketEvent); - expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]); - }); + const newPerpetualMarkets: PerpetualMarketFromDatabase[] = await PerpetualMarketTable.findAll( + {}, + [], { + orderBy: [[PerpetualMarketColumns.id, Ordering.ASC]], + }); + expect(newPerpetualMarkets.length).toEqual(1); + expectPerpetualMarketMatchesEvent(perpetualMarketEvent, newPerpetualMarkets[0]); + if (!useSqlFunction) { + expectTimingStats(); + } + const perpetualMarket: PerpetualMarketFromDatabase | undefined = perpetualMarketRefresher.getPerpetualMarketFromId('0'); + expect(perpetualMarket).toBeDefined(); + expectPerpetualMarket(perpetualMarket!, perpetualMarketEvent); + expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]); + }); }); function expectTimingStats() { diff --git a/indexer/services/ender/src/config.ts b/indexer/services/ender/src/config.ts index d18fb3ee96..e9e6f9e868 100644 --- a/indexer/services/ender/src/config.ts +++ b/indexer/services/ender/src/config.ts @@ -44,6 +44,9 @@ export const configSchema = { USE_SUBACCOUNT_UPDATE_SQL_FUNCTION: parseBoolean({ default: true, }), + USE_PERPETUAL_MARKET_HANDLER_SQL_FUNCTION: parseBoolean({ + default: true, + }), USE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION: parseBoolean({ default: true, }), diff --git a/indexer/services/ender/src/handlers/perpetual-market-handler.ts b/indexer/services/ender/src/handlers/perpetual-market-handler.ts index 2c3e961607..3d08f78638 100644 --- a/indexer/services/ender/src/handlers/perpetual-market-handler.ts +++ b/indexer/services/ender/src/handlers/perpetual-market-handler.ts @@ -1,12 +1,15 @@ +import { logger } from '@dydxprotocol-indexer/base'; import { PerpetualMarketCreateObject, - PerpetualMarketFromDatabase, + PerpetualMarketFromDatabase, PerpetualMarketModel, perpetualMarketRefresher, PerpetualMarketTable, - protocolTranslations, + protocolTranslations, storeHelpers, } from '@dydxprotocol-indexer/postgres'; import { PerpetualMarketCreateEventV1 } from '@dydxprotocol-indexer/v4-protos'; +import * as pg from 'pg'; +import config from '../config'; import { generatePerpetualMarketMessage } from '../helpers/kafka-helper'; import { ConsolidatedKafkaEvent } from '../lib/types'; import { Handler } from './handler'; @@ -20,6 +23,43 @@ export class PerpetualMarketCreationHandler extends Handler { + if (config.USE_PERPETUAL_MARKET_HANDLER_SQL_FUNCTION) { + return this.handleViaSqlFunction(); + } + return this.handleViaKnex(); + } + + // eslint-disable-next-line @typescript-eslint/require-await + private async handleViaSqlFunction(): Promise { + const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes; + const result: pg.QueryResult = await storeHelpers.rawQuery( + `SELECT dydx_perpetual_market_handler( + '${JSON.stringify(PerpetualMarketCreateEventV1.decode(eventDataBinary))}' + ) AS result;`, + { txId: this.txId }, + ).catch((error: Error) => { + logger.error({ + at: 'PerpetualMarketCreationHandler#handleViaSqlFunction', + message: 'Failed to handle PerpetualMarketCreateEventV1', + error, + }); + + throw error; + }); + + const perpetualMarket: PerpetualMarketFromDatabase = PerpetualMarketModel.fromJson( + result.rows[0].result.perpetual_market) as PerpetualMarketFromDatabase; + + perpetualMarketRefresher.upsertPerpetualMarket(perpetualMarket); + return [ + this.generateConsolidatedMarketKafkaEvent( + JSON.stringify(generatePerpetualMarketMessage([perpetualMarket])), + ), + ]; + } + + // eslint-disable-next-line @typescript-eslint/require-await + private async handleViaKnex(): Promise { const perpetualMarket: PerpetualMarketFromDatabase = await this.runFuncWithTimingStatAndErrorLogging( this.createPerpetualMarket(), diff --git a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts index 5ebb47045e..b24585a7f4 100644 --- a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts +++ b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts @@ -46,6 +46,7 @@ const scripts: string[] = [ 'dydx_get_weighted_average.sql', 'dydx_liquidation_fill_handler_per_order.sql', 'dydx_order_fill_handler_per_order.sql', + 'dydx_perpetual_market_handler.sql', 'dydx_perpetual_position_and_order_side_matching.sql', 'dydx_subaccount_update_handler.sql', 'dydx_trim_scale.sql', diff --git a/indexer/services/ender/src/scripts/dydx_perpetual_market_handler.sql b/indexer/services/ender/src/scripts/dydx_perpetual_market_handler.sql index e69de29bb2..e34d6867f3 100644 --- a/indexer/services/ender/src/scripts/dydx_perpetual_market_handler.sql +++ b/indexer/services/ender/src/scripts/dydx_perpetual_market_handler.sql @@ -0,0 +1,36 @@ +/** + Parameters: + - event_data: The 'data' field of the IndexerTendermintEvent (https://github.com/dydxprotocol/v4-proto/blob/8d35c86/dydxprotocol/indexer/indexer_manager/event.proto#L25) + converted to JSON format. Conversion to JSON is expected to be done by JSON.stringify. + Returns: JSON object containing fields: + - perpetual_market: The updated perpetual market in perpetual-market-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/perpetual-market-model.ts). +*/ +CREATE OR REPLACE FUNCTION dydx_perpetual_market_handler(event_data jsonb) RETURNS jsonb AS $$ +DECLARE + perpetual_market_record perpetual_markets%ROWTYPE; +BEGIN + perpetual_market_record."id" = (event_data->'id')::bigint; + perpetual_market_record."clobPairId" = (event_data->'clobPairId')::bigint; + perpetual_market_record."ticker" = event_data->>'ticker'; + perpetual_market_record."marketId" = (event_data->'marketId')::integer; + perpetual_market_record."status" = dydx_clob_pair_status_to_market_status(event_data->'status'); + perpetual_market_record."lastPrice" = 0; + perpetual_market_record."priceChange24H" = 0; + perpetual_market_record."trades24H" = 0; + perpetual_market_record."volume24H" = 0; + perpetual_market_record."nextFundingRate" = 0; + perpetual_market_record."openInterest"= 0; + perpetual_market_record."quantumConversionExponent" = (event_data->'quantumConversionExponent')::integer; + perpetual_market_record."atomicResolution" = (event_data->'atomicResolution')::integer; + perpetual_market_record."subticksPerTick" = (event_data->'subticksPerTick')::integer; + perpetual_market_record."stepBaseQuantums" = dydx_from_jsonlib_long(event_data->'stepBaseQuantums'); + perpetual_market_record."liquidityTierId" = (event_data->'liquidityTier')::integer; + + INSERT INTO perpetual_markets VALUES (perpetual_market_record.*) RETURNING * INTO perpetual_market_record; + + RETURN jsonb_build_object( + 'perpetual_market', + dydx_to_jsonb(perpetual_market_record) + ); +END; +$$ LANGUAGE plpgsql; \ No newline at end of file