diff --git a/indexer/packages/postgres/src/constants.ts b/indexer/packages/postgres/src/constants.ts index 0cd5eaf83f..a636204313 100644 --- a/indexer/packages/postgres/src/constants.ts +++ b/indexer/packages/postgres/src/constants.ts @@ -4,6 +4,7 @@ import config from './config'; import AssetModel from './models/asset-model'; import AssetPositionModel from './models/asset-position-model'; import FillModel from './models/fill-model'; +import LiquidityTiersModel from './models/liquidity-tiers-model'; import MarketModel from './models/market-model'; import OraclePriceModel from './models/oracle-price-model'; import OrderModel from './models/order-model'; @@ -87,6 +88,7 @@ export const SQL_TO_JSON_DEFINED_MODELS = [ AssetModel, AssetPositionModel, FillModel, + LiquidityTiersModel, MarketModel, OraclePriceModel, OrderModel, diff --git a/indexer/packages/postgres/src/index.ts b/indexer/packages/postgres/src/index.ts index f5c40fe28d..69e941c779 100644 --- a/indexer/packages/postgres/src/index.ts +++ b/indexer/packages/postgres/src/index.ts @@ -6,6 +6,7 @@ export { postgresConfigSchema } from './config'; export { default as AssetModel } from './models/asset-model'; export { default as AssetPositionModel } from './models/asset-position-model'; export { default as FillModel } from './models/fill-model'; +export { default as LiquidityTiersModel } from './models/liquidity-tiers-model'; export { default as MarketModel } from './models/market-model'; export { default as OraclePriceModel } from './models/oracle-price-model'; export { default as OrderModel } from './models/order-model'; diff --git a/indexer/packages/postgres/src/models/liquidity-tiers-model.ts b/indexer/packages/postgres/src/models/liquidity-tiers-model.ts index 470d9f94e7..97e595c837 100644 --- a/indexer/packages/postgres/src/models/liquidity-tiers-model.ts +++ b/indexer/packages/postgres/src/models/liquidity-tiers-model.ts @@ -33,6 +33,22 @@ export default class LiquidityTiersModel extends BaseModel { }; } + /** + * A mapping from column name to JSON conversion expected. + * See getSqlConversionForDydxModelTypes for valid conversions. + * + * TODO(IND-239): Ensure that jsonSchema() / sqlToJsonConversions() / model fields match. + */ + static get sqlToJsonConversions() { + return { + id: 'integer', + name: 'string', + initialMarginPpm: 'string', + maintenanceFractionPpm: 'string', + basePositionNotional: 'string', + }; + } + id!: number; QueryBuilderType!: UpsertQueryBuilder; diff --git a/indexer/services/ender/__tests__/handlers/liquidity-tier-handler.test.ts b/indexer/services/ender/__tests__/handlers/liquidity-tier-handler.test.ts index f00ac7faa3..bdeeae8322 100644 --- a/indexer/services/ender/__tests__/handlers/liquidity-tier-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/liquidity-tier-handler.test.ts @@ -39,6 +39,7 @@ import { updateBlockCache } from '../../src/caches/block-cache'; import { defaultLiquidityTier } from '@dydxprotocol-indexer/postgres/build/__tests__/helpers/constants'; import _ from 'lodash'; import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions'; +import config from '../../src/config'; describe('liquidityTierHandler', () => { beforeAll(async () => { @@ -103,73 +104,107 @@ describe('liquidityTierHandler', () => { }); }); - it('creates new liquidity tier', async () => { - const transactionIndex: number = 0; - const liquidityTierEvent: LiquidityTierUpsertEventV1 = defaultLiquidityTierUpsertEvent; - const kafkaMessage: KafkaMessage = createKafkaMessageFromLiquidityTiersEvent({ - liquidityTierEvent, - transactionIndex, - height: defaultHeight, - time: defaultTime, - txHash: defaultTxHash, - }); - // Confirm there is no existing liquidity tier - await expectNoExistingLiquidityTiers(); - await perpetualMarketRefresher.updatePerpetualMarkets(); - - const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); - await onMessage(kafkaMessage); - - const newLiquidityTiers: LiquidityTiersFromDatabase[] = await LiquidityTiersTable.findAll( - {}, - [], { - orderBy: [[LiquidityTiersColumns.id, Ordering.ASC]], + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'creates new liquidity tier (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_LIQUIDITY_TIER_HANDLER_SQL_FUNCTION = useSqlFunction; + const transactionIndex: number = 0; + const liquidityTierEvent: LiquidityTierUpsertEventV1 = defaultLiquidityTierUpsertEvent; + const kafkaMessage: KafkaMessage = createKafkaMessageFromLiquidityTiersEvent({ + liquidityTierEvent, + transactionIndex, + height: defaultHeight, + time: defaultTime, + txHash: defaultTxHash, }); - expect(newLiquidityTiers.length).toEqual(1); - expectLiquidityTier(newLiquidityTiers[0], liquidityTierEvent); - expectTimingStats(); - validateLiquidityTierRefresher(defaultLiquidityTierUpsertEvent); - expectKafkaMessages(producerSendMock, liquidityTierEvent, 0); - }); + // Confirm there is no existing liquidity tier + await expectNoExistingLiquidityTiers(); + await perpetualMarketRefresher.updatePerpetualMarkets(); - it('updates existing liquidity tier', async () => { - const transactionIndex: number = 0; - const liquidityTierEvent: LiquidityTierUpsertEventV1 = defaultLiquidityTierUpsertEvent; - const kafkaMessage: KafkaMessage = createKafkaMessageFromLiquidityTiersEvent({ - liquidityTierEvent, - transactionIndex, - height: defaultHeight, - time: defaultTime, - txHash: defaultTxHash, + const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); + await onMessage(kafkaMessage); + + const newLiquidityTiers: LiquidityTiersFromDatabase[] = await LiquidityTiersTable.findAll( + {}, + [], { + orderBy: [[LiquidityTiersColumns.id, Ordering.ASC]], + }); + expect(newLiquidityTiers.length).toEqual(1); + expectLiquidityTier(newLiquidityTiers[0], liquidityTierEvent); + if (!useSqlFunction) { + expectTimingStats(); + } + validateLiquidityTierRefresher(defaultLiquidityTierUpsertEvent); + expectKafkaMessages(producerSendMock, liquidityTierEvent, 0); }); - // Create existing liquidity tier - await LiquidityTiersTable.upsert(defaultLiquidityTier); - // create perpetual market with existing liquidity tier to test websockets - await Promise.all([ - MarketTable.create(testConstants.defaultMarket), - MarketTable.create(testConstants.defaultMarket2), - ]); - await Promise.all([ - PerpetualMarketTable.create(testConstants.defaultPerpetualMarket), - PerpetualMarketTable.create(testConstants.defaultPerpetualMarket2), - ]); - await perpetualMarketRefresher.updatePerpetualMarkets(); + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'updates existing liquidity tier (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_LIQUIDITY_TIER_HANDLER_SQL_FUNCTION = useSqlFunction; + const transactionIndex: number = 0; + const liquidityTierEvent: LiquidityTierUpsertEventV1 = defaultLiquidityTierUpsertEvent; + const kafkaMessage: KafkaMessage = createKafkaMessageFromLiquidityTiersEvent({ + liquidityTierEvent, + transactionIndex, + height: defaultHeight, + time: defaultTime, + txHash: defaultTxHash, + }); + // Create existing liquidity tier + await LiquidityTiersTable.upsert(defaultLiquidityTier); - const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); - await onMessage(kafkaMessage); + // create perpetual market with existing liquidity tier to test websockets + await Promise.all([ + MarketTable.create(testConstants.defaultMarket), + MarketTable.create(testConstants.defaultMarket2), + ]); + await Promise.all([ + PerpetualMarketTable.create(testConstants.defaultPerpetualMarket), + PerpetualMarketTable.create(testConstants.defaultPerpetualMarket2), + ]); + await perpetualMarketRefresher.updatePerpetualMarkets(); - const newLiquidityTiers: LiquidityTiersFromDatabase[] = await LiquidityTiersTable.findAll( - {}, - [], { - orderBy: [[LiquidityTiersColumns.id, Ordering.ASC]], - }); - expect(newLiquidityTiers.length).toEqual(1); - expectLiquidityTier(newLiquidityTiers[0], liquidityTierEvent); - expectTimingStats(); - validateLiquidityTierRefresher(defaultLiquidityTierUpsertEvent); - expectKafkaMessages(producerSendMock, liquidityTierEvent, 2); - }); + const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); + await onMessage(kafkaMessage); + + const newLiquidityTiers: LiquidityTiersFromDatabase[] = await LiquidityTiersTable.findAll( + {}, + [], { + orderBy: [[LiquidityTiersColumns.id, Ordering.ASC]], + }); + expect(newLiquidityTiers.length).toEqual(1); + expectLiquidityTier(newLiquidityTiers[0], liquidityTierEvent); + if (!useSqlFunction) { + expectTimingStats(); + } + validateLiquidityTierRefresher(defaultLiquidityTierUpsertEvent); + expectKafkaMessages(producerSendMock, liquidityTierEvent, 2); + }); }); function expectTimingStats() { diff --git a/indexer/services/ender/src/config.ts b/indexer/services/ender/src/config.ts index e9e6f9e868..ecc09c4ac6 100644 --- a/indexer/services/ender/src/config.ts +++ b/indexer/services/ender/src/config.ts @@ -29,6 +29,9 @@ export const configSchema = { USE_LIQUIDATION_HANDLER_SQL_FUNCTION: parseBoolean({ default: true, }), + USE_LIQUIDITY_TIER_HANDLER_SQL_FUNCTION: parseBoolean({ + default: true, + }), USE_MARKET_CREATE_HANDLER_SQL_FUNCTION: parseBoolean({ default: true, }), diff --git a/indexer/services/ender/src/handlers/liquidity-tier-handler.ts b/indexer/services/ender/src/handlers/liquidity-tier-handler.ts index 4a3743d319..708440023b 100644 --- a/indexer/services/ender/src/handlers/liquidity-tier-handler.ts +++ b/indexer/services/ender/src/handlers/liquidity-tier-handler.ts @@ -1,15 +1,20 @@ +import { logger } from '@dydxprotocol-indexer/base'; import { LiquidityTiersCreateObject, LiquidityTiersFromDatabase, + LiquidityTiersModel, LiquidityTiersTable, PerpetualMarketFromDatabase, liquidityTierRefresher, perpetualMarketRefresher, protocolTranslations, + storeHelpers, } from '@dydxprotocol-indexer/postgres'; import { LiquidityTierUpsertEventV1 } from '@dydxprotocol-indexer/v4-protos'; import _ from 'lodash'; +import * as pg from 'pg'; +import config from '../config'; import { QUOTE_CURRENCY_ATOMIC_RESOLUTION } from '../constants'; import { generatePerpetualMarketMessage } from '../helpers/kafka-helper'; import { ConsolidatedKafkaEvent } from '../lib/types'; @@ -24,6 +29,36 @@ export class LiquidityTierHandler extends Handler { // eslint-disable-next-line @typescript-eslint/require-await public async internalHandle(): Promise { + if (config.USE_LIQUIDITY_TIER_HANDLER_SQL_FUNCTION) { + return this.handleViaSqlFunction(); + } + return this.handleViaKnex(); + } + + private async handleViaSqlFunction(): Promise { + const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes; + const result: pg.QueryResult = await storeHelpers.rawQuery( + `SELECT dydx_liquidity_tier_handler( + '${JSON.stringify(LiquidityTierUpsertEventV1.decode(eventDataBinary))}' + ) AS result;`, + { txId: this.txId }, + ).catch((error: Error) => { + logger.error({ + at: 'LiquidityTierHandler#handleViaSqlFunction', + message: 'Failed to handle LiquidityTierUpsertEventV1', + error, + }); + + throw error; + }); + + const liquidityTier: LiquidityTiersFromDatabase = LiquidityTiersModel.fromJson( + result.rows[0].result.liquidity_tier) as LiquidityTiersFromDatabase; + liquidityTierRefresher.upsertLiquidityTier(liquidityTier); + return this.generateWebsocketEventsForLiquidityTier(liquidityTier); + } + + private async handleViaKnex(): Promise { const liquidityTier: LiquidityTiersFromDatabase = await this.runFuncWithTimingStatAndErrorLogging( this.upsertLiquidityTier(), diff --git a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts index b24585a7f4..ec43a44afb 100644 --- a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts +++ b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts @@ -45,6 +45,7 @@ const scripts: string[] = [ 'dydx_get_total_filled_from_liquidity.sql', 'dydx_get_weighted_average.sql', 'dydx_liquidation_fill_handler_per_order.sql', + 'dydx_liquidity_tier_handler.sql', 'dydx_order_fill_handler_per_order.sql', 'dydx_perpetual_market_handler.sql', 'dydx_perpetual_position_and_order_side_matching.sql', diff --git a/indexer/services/ender/src/scripts/dydx_liquidity_tier_handler.sql b/indexer/services/ender/src/scripts/dydx_liquidity_tier_handler.sql new file mode 100644 index 0000000000..b32c4212ba --- /dev/null +++ b/indexer/services/ender/src/scripts/dydx_liquidity_tier_handler.sql @@ -0,0 +1,34 @@ +/** + Parameters: + - event_data: The 'data' field of the IndexerTendermintEvent (https://github.com/dydxprotocol/v4-proto/blob/8d35c86/dydxprotocol/indexer/indexer_manager/event.proto#L25) + converted to JSON format. Conversion to JSON is expected to be done by JSON.stringify. + Returns: JSON object containing fields: + - liquidy_tier: The upserted liquidity tier in liquidity-tiers-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/liquidity-tiers-model.ts). +*/ +CREATE OR REPLACE FUNCTION dydx_liquidity_tier_handler(event_data jsonb) RETURNS jsonb AS $$ +DECLARE + liquidity_tier_record liquidity_tiers%ROWTYPE; +BEGIN + liquidity_tier_record."id" = (event_data->'id')::integer; + liquidity_tier_record."name" = event_data->>'name'; + liquidity_tier_record."initialMarginPpm" = (event_data->'initialMarginPpm')::bigint; + liquidity_tier_record."maintenanceFractionPpm" = (event_data->'maintenanceFractionPpm')::bigint; + liquidity_tier_record."basePositionNotional" = power(10, -6) * dydx_from_jsonlib_long(event_data->'basePositionNotional'); + + INSERT INTO liquidity_tiers + VALUES (liquidity_tier_record.*) + ON CONFLICT ("id") DO + UPDATE + SET + "name" = liquidity_tier_record."name", + "initialMarginPpm" = liquidity_tier_record."initialMarginPpm", + "maintenanceFractionPpm" = liquidity_tier_record."maintenanceFractionPpm", + "basePositionNotional" = liquidity_tier_record."basePositionNotional" + RETURNING * INTO liquidity_tier_record; + + RETURN jsonb_build_object( + 'liquidity_tier', + dydx_to_jsonb(liquidity_tier_record) + ); +END; +$$ LANGUAGE plpgsql; \ No newline at end of file