From 5e0c4e02a4f9ec67d99534e821f056fb99b3d0ed Mon Sep 17 00:00:00 2001 From: Lukasz Cwik <126621805+lcwik@users.noreply.github.com> Date: Fri, 3 Nov 2023 08:25:03 -0700 Subject: [PATCH] [IND-469] Update `ender` market price update handler to execute updates via a SQL function. (#746) --- indexer/packages/postgres/src/constants.ts | 2 + indexer/packages/postgres/src/index.ts | 1 + .../postgres/src/models/oracle-price-model.ts | 16 ++ .../market-price-update-handler.test.ts | 246 +++++++++++------- .../ender/__tests__/scripts/scripts.test.ts | 29 ++- indexer/services/ender/src/config.ts | 3 + .../markets/market-price-update-handler.ts | 55 +++- .../helpers/postgres/postgres-functions.ts | 2 + .../dydx_market_price_update_handler.sql | 51 ++++ .../dydx_uuid_from_oracle_price_parts.sql | 8 + 10 files changed, 305 insertions(+), 108 deletions(-) create mode 100644 indexer/services/ender/src/scripts/dydx_market_price_update_handler.sql create mode 100644 indexer/services/ender/src/scripts/dydx_uuid_from_oracle_price_parts.sql diff --git a/indexer/packages/postgres/src/constants.ts b/indexer/packages/postgres/src/constants.ts index e716af81bb..0f0423409f 100644 --- a/indexer/packages/postgres/src/constants.ts +++ b/indexer/packages/postgres/src/constants.ts @@ -4,6 +4,7 @@ import config from './config'; import AssetPositionModel from './models/asset-position-model'; import FillModel from './models/fill-model'; import MarketModel from './models/market-model'; +import OraclePriceModel from './models/oracle-price-model'; import OrderModel from './models/order-model'; import PerpetualMarketModel from './models/perpetual-market-model'; import PerpetualPositionModel from './models/perpetual-position-model'; @@ -85,6 +86,7 @@ export const SQL_TO_JSON_DEFINED_MODELS = [ AssetPositionModel, FillModel, MarketModel, + OraclePriceModel, OrderModel, PerpetualMarketModel, PerpetualPositionModel, diff --git a/indexer/packages/postgres/src/index.ts b/indexer/packages/postgres/src/index.ts index 03c004276d..ea27819a06 100644 --- a/indexer/packages/postgres/src/index.ts +++ b/indexer/packages/postgres/src/index.ts @@ -6,6 +6,7 @@ export { postgresConfigSchema } from './config'; export { default as AssetPositionModel } from './models/asset-position-model'; export { default as FillModel } from './models/fill-model'; export { default as MarketModel } from './models/market-model'; +export { default as OraclePriceModel } from './models/oracle-price-model'; export { default as OrderModel } from './models/order-model'; export { default as PerpetualMarketModel } from './models/perpetual-market-model'; export { default as PerpetualPositionModel } from './models/perpetual-position-model'; diff --git a/indexer/packages/postgres/src/models/oracle-price-model.ts b/indexer/packages/postgres/src/models/oracle-price-model.ts index 8da221df9e..6b0480c8b9 100644 --- a/indexer/packages/postgres/src/models/oracle-price-model.ts +++ b/indexer/packages/postgres/src/models/oracle-price-model.ts @@ -53,6 +53,22 @@ export default class OraclePriceModel extends Model { }; } + /** + * A mapping from column name to JSON conversion expected. + * See getSqlConversionForDydxModelTypes for valid conversions. + * + * TODO(IND-239): Ensure that jsonSchema() / sqlToJsonConversions() / model fields match. + */ + static get sqlToJsonConversions() { + return { + id: 'string', + marketId: 'integer', + price: 'string', + effectiveAt: 'date-time', + effectiveAtHeight: 'string', + }; + } + id!: string; marketId!: number; diff --git a/indexer/services/ender/__tests__/handlers/markets/market-price-update-handler.test.ts b/indexer/services/ender/__tests__/handlers/markets/market-price-update-handler.test.ts index 9a56111afd..05edfd648d 100644 --- a/indexer/services/ender/__tests__/handlers/markets/market-price-update-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/markets/market-price-update-handler.test.ts @@ -33,6 +33,7 @@ import { MarketPriceUpdateHandler } from '../../../src/handlers/markets/market-p import Long from 'long'; import { getPrice } from '../../../src/caches/price-cache'; import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions'; +import config from '../../../src/config'; describe('marketPriceUpdateHandler', () => { beforeAll(async () => { @@ -95,125 +96,170 @@ describe('marketPriceUpdateHandler', () => { }); }); - it('fails when no market exists', async () => { - const transactionIndex: number = 0; - const marketPriceUpdate: MarketEventV1 = { - marketId: 5, - priceUpdate: { - priceWithExponent: Long.fromValue(50000000, true), - }, - }; - const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({ - marketEvents: [marketPriceUpdate], - transactionIndex, - height: defaultHeight, - time: defaultTime, - txHash: defaultTxHash, - }); - - await expect(onMessage(kafkaMessage)).rejects.toThrowError( - new ParseMessageError('MarketPriceUpdateEvent contains a non-existent market id'), - ); - - expect(loggerError).toHaveBeenCalledWith(expect.objectContaining({ - at: 'MarketPriceUpdateHandler#logAndThrowParseMessageError', - message: 'MarketPriceUpdateEvent contains a non-existent market id', - })); - expect(loggerCrit).toHaveBeenCalledWith(expect.objectContaining({ - at: 'onMessage#onMessage', - message: 'Error: Unable to parse message, this must be due to a bug in V4 node', - })); - expect(producerSendMock.mock.calls.length).toEqual(0); - }); + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'fails when no market exists (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_MARKET_PRICE_UPDATE_HANDLER_SQL_FUNCTION = useSqlFunction; + const transactionIndex: number = 0; + const marketPriceUpdate: MarketEventV1 = { + marketId: 5, + priceUpdate: { + priceWithExponent: Long.fromValue(50000000, true), + }, + }; + const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({ + marketEvents: [marketPriceUpdate], + transactionIndex, + height: defaultHeight, + time: defaultTime, + txHash: defaultTxHash, + }); - it('successfully inserts new oracle price for existing market', async () => { - const transactionIndex: number = 0; + await expect(onMessage(kafkaMessage)).rejects.toThrowError( + new ParseMessageError('MarketPriceUpdateEvent contains a non-existent market id'), + ); - const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({ - marketEvents: [defaultMarketPriceUpdate], - transactionIndex, - height: defaultHeight, - time: defaultTime, - txHash: defaultTxHash, + expect(loggerError).toHaveBeenCalledWith(expect.objectContaining({ + at: 'MarketPriceUpdateHandler#logAndThrowParseMessageError', + message: 'MarketPriceUpdateEvent contains a non-existent market id', + })); + expect(loggerCrit).toHaveBeenCalledWith(expect.objectContaining({ + at: 'onMessage#onMessage', + message: 'Error: Unable to parse message, this must be due to a bug in V4 node', + })); + expect(producerSendMock.mock.calls.length).toEqual(0); }); - await onMessage(kafkaMessage); + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'successfully inserts new oracle price for existing market (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_MARKET_PRICE_UPDATE_HANDLER_SQL_FUNCTION = useSqlFunction; + const transactionIndex: number = 0; - const { market, oraclePrice } = await getDbState(defaultMarketPriceUpdate); + const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({ + marketEvents: [defaultMarketPriceUpdate], + transactionIndex, + height: defaultHeight, + time: defaultTime, + txHash: defaultTxHash, + }); - expectOraclePriceMatchesEvent( - defaultMarketPriceUpdate as MarketPriceUpdateEventMessage, - oraclePrice, - market, - defaultHeight, - ); + await onMessage(kafkaMessage); - expect(getPrice(oraclePrice.marketId)).toEqual(oraclePrice.price); + const { market, oraclePrice } = await getDbState(defaultMarketPriceUpdate); - const contents: MarketMessageContents = generateOraclePriceContents( - oraclePrice, - market.pair, - ); + expectOraclePriceMatchesEvent( + defaultMarketPriceUpdate as MarketPriceUpdateEventMessage, + oraclePrice, + market, + defaultHeight, + ); + + expect(getPrice(oraclePrice.marketId)).toEqual(oraclePrice.price); - expectMarketKafkaMessage({ - producerSendMock, - contents: JSON.stringify(contents), + const contents: MarketMessageContents = generateOraclePriceContents( + oraclePrice, + market.pair, + ); + + expectMarketKafkaMessage({ + producerSendMock, + contents: JSON.stringify(contents), + }); }); - }); - it('successfully inserts new oracle price for market created in same block', async () => { - const transactionIndex: number = 0; - const newMarketId: number = 3000; - - // Include an event to create the market - const marketCreate: MarketEventV1 = { - marketId: newMarketId, - marketCreate: { - base: { - pair: 'NEWTOKEN-USD', - minPriceChangePpm: 500, + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'successfully inserts new oracle price for market created in same block (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_MARKET_PRICE_UPDATE_HANDLER_SQL_FUNCTION = useSqlFunction; + const transactionIndex: number = 0; + const newMarketId: number = 3000; + + // Include an event to create the market + const marketCreate: MarketEventV1 = { + marketId: newMarketId, + marketCreate: { + base: { + pair: 'NEWTOKEN-USD', + minPriceChangePpm: 500, + }, + exponent: -5, }, - exponent: -5, - }, - }; - const marketPriceUpdate: MarketEventV1 = { - marketId: newMarketId, - priceUpdate: { - priceWithExponent: Long.fromValue(50000000), - }, - }; - - const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({ - marketEvents: [marketCreate, marketPriceUpdate], - transactionIndex, - height: defaultHeight, - time: defaultTime, - txHash: defaultTxHash, - }); + }; + const marketPriceUpdate: MarketEventV1 = { + marketId: newMarketId, + priceUpdate: { + priceWithExponent: Long.fromValue(50000000), + }, + }; + + const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({ + marketEvents: [marketCreate, marketPriceUpdate], + transactionIndex, + height: defaultHeight, + time: defaultTime, + txHash: defaultTxHash, + }); - await onMessage(kafkaMessage); + await onMessage(kafkaMessage); - const { market, oraclePrice } = await getDbState(marketPriceUpdate); + const { market, oraclePrice } = await getDbState(marketPriceUpdate); - expectOraclePriceMatchesEvent( - marketPriceUpdate as MarketPriceUpdateEventMessage, - oraclePrice, - market, - defaultHeight, - ); + expectOraclePriceMatchesEvent( + marketPriceUpdate as MarketPriceUpdateEventMessage, + oraclePrice, + market, + defaultHeight, + ); - expect(getPrice(oraclePrice.marketId)).toEqual(oraclePrice.price); + expect(getPrice(oraclePrice.marketId)).toEqual(oraclePrice.price); - const contents: MarketMessageContents = generateOraclePriceContents( - oraclePrice, - market.pair, - ); + const contents: MarketMessageContents = generateOraclePriceContents( + oraclePrice, + market.pair, + ); - expectMarketKafkaMessage({ - producerSendMock, - contents: JSON.stringify(contents), + expectMarketKafkaMessage({ + producerSendMock, + contents: JSON.stringify(contents), + }); }); - }); }); async function getDbState(marketPriceUpdate: MarketEventV1): Promise { diff --git a/indexer/services/ender/__tests__/scripts/scripts.test.ts b/indexer/services/ender/__tests__/scripts/scripts.test.ts index 3d33e0c6a7..932435dac4 100644 --- a/indexer/services/ender/__tests__/scripts/scripts.test.ts +++ b/indexer/services/ender/__tests__/scripts/scripts.test.ts @@ -19,6 +19,7 @@ import { PositionSide, TendermintEventTable, FillTable, + OraclePriceTable, OrderTable, protocolTranslations, SubaccountTable, @@ -350,15 +351,29 @@ describe('SQL Function Tests', () => { } }); - it('dydx_uuid_from_transaction_parts (%s)', async () => { - const transactionParts = { - blockHeight: '123456', - transactionIndex: 123, - }; + it.each([ + [ + '123456', + 123, + ], + ])('dydx_uuid_from_transaction_parts (%s, %s)', async (blockHeight: string, transactionIndex: number) => { + const result = await getSingleRawQueryResultRow( + `SELECT dydx_uuid_from_transaction_parts('${blockHeight}', '${transactionIndex}') AS result`); + expect(result).toEqual( + TransactionTable.uuid(blockHeight, transactionIndex), + ); + }); + + it.each([ + [ + 123, + '123456', + ], + ])('dydx_uuid_from_oracle_price_parts (%s, %s)', async (marketId: number, blockHeight: string) => { const result = await getSingleRawQueryResultRow( - `SELECT dydx_uuid_from_transaction_parts('${transactionParts.blockHeight}', '${transactionParts.transactionIndex}') AS result`); + `SELECT dydx_uuid_from_oracle_price_parts('${marketId}', '${blockHeight}') AS result`); expect(result).toEqual( - TransactionTable.uuid(transactionParts.blockHeight, transactionParts.transactionIndex), + OraclePriceTable.uuid(marketId, blockHeight), ); }); diff --git a/indexer/services/ender/src/config.ts b/indexer/services/ender/src/config.ts index 979d38a566..cf9899aeb4 100644 --- a/indexer/services/ender/src/config.ts +++ b/indexer/services/ender/src/config.ts @@ -35,6 +35,9 @@ export const configSchema = { USE_MARKET_MODIFY_HANDLER_SQL_FUNCTION: parseBoolean({ default: true, }), + USE_MARKET_PRICE_UPDATE_HANDLER_SQL_FUNCTION: parseBoolean({ + default: true, + }), USE_SUBACCOUNT_UPDATE_SQL_FUNCTION: parseBoolean({ default: true, }), diff --git a/indexer/services/ender/src/handlers/markets/market-price-update-handler.ts b/indexer/services/ender/src/handlers/markets/market-price-update-handler.ts index 7a28e2c8a4..4efdbfc6b7 100644 --- a/indexer/services/ender/src/handlers/markets/market-price-update-handler.ts +++ b/indexer/services/ender/src/handlers/markets/market-price-update-handler.ts @@ -5,13 +5,16 @@ import { MarketTable, OraclePriceCreateObject, OraclePriceFromDatabase, + OraclePriceModel, OraclePriceTable, protocolTranslations, - MarketMessageContents, + MarketMessageContents, storeHelpers, MarketModel, marketRefresher, } from '@dydxprotocol-indexer/postgres'; import { MarketEventV1 } from '@dydxprotocol-indexer/v4-protos'; +import * as pg from 'pg'; import { updatePriceCacheWithPrice } from '../../caches/price-cache'; +import config from '../../config'; import { generateOraclePriceContents } from '../../helpers/kafka-helper'; import { ConsolidatedKafkaEvent, @@ -38,6 +41,13 @@ export class MarketPriceUpdateHandler extends Handler { message: 'Received MarketEvent with MarketPriceUpdate.', event: this.event, }); + if (config.USE_MARKET_MODIFY_HANDLER_SQL_FUNCTION) { + return this.handleViaSqlFunction(); + } + return this.handleViaKnexQueries(); + } + + private async handleViaKnexQueries(): Promise { // MarketHandler already makes sure the event has 'priceUpdate' as the oneofKind. const castedMarketPriceUpdateMessage: MarketPriceUpdateEventMessage = this.event as MarketPriceUpdateEventMessage; @@ -54,6 +64,49 @@ export class MarketPriceUpdateHandler extends Handler { ]; } + private async handleViaSqlFunction(): Promise { + const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes; + const result: pg.QueryResult = await storeHelpers.rawQuery( + `SELECT dydx_market_price_update_handler( + ${this.block.height}, + '${this.block.time?.toISOString()}', + '${JSON.stringify(MarketEventV1.decode(eventDataBinary))}' + ) AS result;`, + { txId: this.txId }, + ).catch((error: Error) => { + logger.error({ + at: 'MarketPriceUpdateHandler#handleViaSqlFunction', + message: 'Failed to handle MarketEventV1', + error, + }); + + if (error.message.includes('MarketPriceUpdateEvent contains a non-existent market id')) { + const castedMarketPriceUpdateMessage: + MarketPriceUpdateEventMessage = this.event as MarketPriceUpdateEventMessage; + this.logAndThrowParseMessageError( + 'MarketPriceUpdateEvent contains a non-existent market id', + { castedMarketPriceUpdateMessage }, + ); + } + + throw error; + }); + + const market: MarketFromDatabase = MarketModel.fromJson( + result.rows[0].result.market) as MarketFromDatabase; + const oraclePrice: OraclePriceFromDatabase = OraclePriceModel.fromJson( + result.rows[0].result.oracle_price) as OraclePriceFromDatabase; + + marketRefresher.updateMarket(market); + updatePriceCacheWithPrice(oraclePrice); + + return [ + this.generateKafkaEvent( + oraclePrice, market.pair, + ), + ]; + } + protected async updateMarketFromEvent( castedMarketPriceUpdateMessage: MarketPriceUpdateEventMessage, humanPrice: string, diff --git a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts index faad7bb04c..718b40a603 100644 --- a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts +++ b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts @@ -31,6 +31,7 @@ const scripts: string[] = [ 'create_extension_uuid_ossp.sql', 'dydx_market_create_handler.sql', 'dydx_market_modify_handler.sql', + 'dydx_market_price_update_handler.sql', 'dydx_event_id_from_parts.sql', 'dydx_event_to_transaction_index.sql', 'dydx_from_jsonlib_long.sql', @@ -49,6 +50,7 @@ const scripts: string[] = [ 'dydx_uuid.sql', 'dydx_uuid_from_asset_position_parts.sql', 'dydx_uuid_from_fill_event_parts.sql', + 'dydx_uuid_from_oracle_price_parts.sql', 'dydx_uuid_from_order_id.sql', 'dydx_uuid_from_order_id_parts.sql', 'dydx_uuid_from_perpetual_position_parts.sql', diff --git a/indexer/services/ender/src/scripts/dydx_market_price_update_handler.sql b/indexer/services/ender/src/scripts/dydx_market_price_update_handler.sql new file mode 100644 index 0000000000..afbabce7eb --- /dev/null +++ b/indexer/services/ender/src/scripts/dydx_market_price_update_handler.sql @@ -0,0 +1,51 @@ +/** + Parameters: + - block_height: the height of the block being processing. + - block_time: the time of the block being processed. + - event_data: The 'data' field of the IndexerTendermintEvent (https://github.com/dydxprotocol/v4-proto/blob/8d35c86/dydxprotocol/indexer/indexer_manager/event.proto#L25) + converted to JSON format. Conversion to JSON is expected to be done by JSON.stringify. + Returns: JSON object containing fields: + - market: The updated market in market-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/market-model.ts). + - oracle_price: The created oracle price in oracle-price-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/oracle-price-model.ts). +*/ +CREATE OR REPLACE FUNCTION dydx_market_price_update_handler(block_height int, block_time timestamp, event_data jsonb) RETURNS jsonb AS $$ +DECLARE + market_record_id integer; + market_record markets%ROWTYPE; + oracle_price numeric; + oracle_price_record oracle_prices%ROWTYPE; +BEGIN + market_record_id = (event_data->'marketId')::integer; + SELECT * INTO market_record FROM markets WHERE "id" = market_record_id; + + IF NOT FOUND THEN + RAISE EXCEPTION 'MarketPriceUpdateEvent contains a non-existent market id. Id: %', market_record_id; + END IF; + + oracle_price = dydx_trim_scale( + dydx_from_jsonlib_long(event_data->'priceUpdate'->'priceWithExponent') * + power(10, market_record.exponent::numeric)); + + market_record."oraclePrice" = oracle_price; + + UPDATE markets + SET + "oraclePrice" = market_record."oraclePrice" + WHERE id = market_record."id"; + + oracle_price_record."id" = dydx_uuid_from_oracle_price_parts(market_record_id, block_height); + oracle_price_record."effectiveAt" = block_time; + oracle_price_record."effectiveAtHeight" = block_height; + oracle_price_record."marketId" = market_record_id; + oracle_price_record."price" = oracle_price; + + INSERT INTO oracle_prices VALUES (oracle_price_record.*); + + RETURN jsonb_build_object( + 'market', + dydx_to_jsonb(market_record), + 'oracle_price', + dydx_to_jsonb(oracle_price_record) + ); +END; +$$ LANGUAGE plpgsql; \ No newline at end of file diff --git a/indexer/services/ender/src/scripts/dydx_uuid_from_oracle_price_parts.sql b/indexer/services/ender/src/scripts/dydx_uuid_from_oracle_price_parts.sql new file mode 100644 index 0000000000..52ed712276 --- /dev/null +++ b/indexer/services/ender/src/scripts/dydx_uuid_from_oracle_price_parts.sql @@ -0,0 +1,8 @@ +/** + Returns a UUID using the parts of an OraclePrice (https://github.com/dydxprotocol/v4-chain/blob/755b0b928be793072d19eb3a1608e7a2503f396a/indexer/packages/postgres/src/stores/oracle-price-table.ts#L24). +*/ +CREATE OR REPLACE FUNCTION dydx_uuid_from_oracle_price_parts(market_id int, block_height int) RETURNS uuid AS $$ +BEGIN + return dydx_uuid(concat(market_id, '-', block_height)); +END; +$$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE; \ No newline at end of file