diff --git a/indexer/packages/postgres/src/constants.ts b/indexer/packages/postgres/src/constants.ts index 0f0423409f..0cd5eaf83f 100644 --- a/indexer/packages/postgres/src/constants.ts +++ b/indexer/packages/postgres/src/constants.ts @@ -1,6 +1,7 @@ import { CandleMessage_Resolution, ClobPairStatus } from '@dydxprotocol-indexer/v4-protos'; import config from './config'; +import AssetModel from './models/asset-model'; import AssetPositionModel from './models/asset-position-model'; import FillModel from './models/fill-model'; import MarketModel from './models/market-model'; @@ -83,6 +84,7 @@ export const TIME_IN_FORCE_TO_API_TIME_IN_FORCE: Record { beforeAll(async () => { @@ -61,6 +62,7 @@ describe('assetHandler', () => { afterEach(async () => { await dbHelpers.clearData(); + assetRefresher.clear(); jest.clearAllMocks(); }); @@ -98,51 +100,82 @@ describe('assetHandler', () => { }); }); - it('fails when market doesnt exist for asset', async () => { - const transactionIndex: number = 0; - const kafkaMessage: KafkaMessage = createKafkaMessageFromAssetEvent({ - assetEvent: defaultAssetCreateEvent, - transactionIndex, - height: defaultHeight, - time: defaultTime, - txHash: defaultTxHash, - }); - - const message: string = 'Unable to find market with id: 0'; - await expect(onMessage(kafkaMessage)).rejects.toThrowError( - new Error(message), - ); - }); + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'fails when market doesnt exist for asset (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_ASSET_CREATE_HANDLER_SQL_FUNCTION = useSqlFunction; + const transactionIndex: number = 0; + const kafkaMessage: KafkaMessage = createKafkaMessageFromAssetEvent({ + assetEvent: defaultAssetCreateEvent, + transactionIndex, + height: defaultHeight, + time: defaultTime, + txHash: defaultTxHash, + }); - it('creates new asset', async () => { - await MarketTable.create(testConstants.defaultMarket); - await marketRefresher.updateMarkets(); - const transactionIndex: number = 0; - - const assetEvent: AssetCreateEventV1 = defaultAssetCreateEvent; - const kafkaMessage: KafkaMessage = createKafkaMessageFromAssetEvent({ - assetEvent, - transactionIndex, - height: defaultHeight, - time: defaultTime, - txHash: defaultTxHash, + await expect(onMessage(kafkaMessage)).rejects.toThrowError( + 'Unable to find market with id: 0', + ); }); - // Confirm there is no existing asset to or from the sender subaccount - await expectNoExistingAssets(); - await onMessage(kafkaMessage); + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'creates new asset (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_ASSET_CREATE_HANDLER_SQL_FUNCTION = useSqlFunction; + await MarketTable.create(testConstants.defaultMarket); + await marketRefresher.updateMarkets(); + const transactionIndex: number = 0; - const newAssets: AssetFromDatabase[] = await AssetTable.findAll( - {}, - [], { - orderBy: [[AssetColumns.id, Ordering.ASC]], + const assetEvent: AssetCreateEventV1 = defaultAssetCreateEvent; + const kafkaMessage: KafkaMessage = createKafkaMessageFromAssetEvent({ + assetEvent, + transactionIndex, + height: defaultHeight, + time: defaultTime, + txHash: defaultTxHash, }); - expect(newAssets.length).toEqual(1); - expectAssetMatchesEvent(assetEvent, newAssets[0]); - expectTimingStats(); - const asset: AssetFromDatabase = assetRefresher.getAssetFromId('0'); - expect(asset).toBeDefined(); - }); + // Confirm there is no existing asset to or from the sender subaccount + await expectNoExistingAssets(); + + await onMessage(kafkaMessage); + + const newAssets: AssetFromDatabase[] = await AssetTable.findAll( + {}, + [], { + orderBy: [[AssetColumns.id, Ordering.ASC]], + }); + expect(newAssets.length).toEqual(1); + expectAssetMatchesEvent(assetEvent, newAssets[0]); + if (!useSqlFunction) { + expectTimingStats(); + } + const asset: AssetFromDatabase = assetRefresher.getAssetFromId('0'); + expect(asset).toBeDefined(); + }); }); function expectTimingStats() { diff --git a/indexer/services/ender/src/config.ts b/indexer/services/ender/src/config.ts index cf9899aeb4..ce74d46858 100644 --- a/indexer/services/ender/src/config.ts +++ b/indexer/services/ender/src/config.ts @@ -23,7 +23,7 @@ export const configSchema = { SEND_WEBSOCKET_MESSAGES: parseBoolean({ default: true, }), - USE_ORDER_HANDLER_SQL_FUNCTION: parseBoolean({ + USE_ASSET_CREATE_HANDLER_SQL_FUNCTION: parseBoolean({ default: true, }), USE_LIQUIDATION_HANDLER_SQL_FUNCTION: parseBoolean({ @@ -38,6 +38,9 @@ export const configSchema = { USE_MARKET_PRICE_UPDATE_HANDLER_SQL_FUNCTION: parseBoolean({ default: true, }), + USE_ORDER_HANDLER_SQL_FUNCTION: parseBoolean({ + default: true, + }), USE_SUBACCOUNT_UPDATE_SQL_FUNCTION: parseBoolean({ default: true, }), diff --git a/indexer/services/ender/src/handlers/asset-handler.ts b/indexer/services/ender/src/handlers/asset-handler.ts index 39bcb6e478..1287952cc7 100644 --- a/indexer/services/ender/src/handlers/asset-handler.ts +++ b/indexer/services/ender/src/handlers/asset-handler.ts @@ -1,8 +1,16 @@ +import { logger } from '@dydxprotocol-indexer/base'; import { - AssetFromDatabase, AssetTable, assetRefresher, marketRefresher, + AssetFromDatabase, + AssetModel, + AssetTable, + assetRefresher, + marketRefresher, + storeHelpers, } from '@dydxprotocol-indexer/postgres'; import { AssetCreateEventV1 } from '@dydxprotocol-indexer/v4-protos'; +import * as pg from 'pg'; +import config from '../config'; import { ConsolidatedKafkaEvent } from '../lib/types'; import { Handler } from './handler'; @@ -15,6 +23,36 @@ export class AssetCreationHandler extends Handler { // eslint-disable-next-line @typescript-eslint/require-await public async internalHandle(): Promise { + if (config.USE_ASSET_CREATE_HANDLER_SQL_FUNCTION) { + return this.handleViaSqlFunction(); + } + return this.handleViaKnex(); + } + + private async handleViaSqlFunction(): Promise { + const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes; + const result: pg.QueryResult = await storeHelpers.rawQuery( + `SELECT dydx_asset_create_handler( + '${JSON.stringify(AssetCreateEventV1.decode(eventDataBinary))}' + ) AS result;`, + { txId: this.txId }, + ).catch((error: Error) => { + logger.error({ + at: 'AssetCreationHandler#handleViaSqlFunction', + message: 'Failed to handle AssetCreateEventV1', + error, + }); + + throw error; + }); + + const asset: AssetFromDatabase = AssetModel.fromJson( + result.rows[0].result.asset) as AssetFromDatabase; + assetRefresher.addAsset(asset); + return []; + } + + private async handleViaKnex(): Promise { await this.runFuncWithTimingStatAndErrorLogging( this.createAsset(), this.generateTimingStatsOptions('create_asset'), diff --git a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts index 718b40a603..83cb99fe2e 100644 --- a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts +++ b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts @@ -29,6 +29,7 @@ function newScript(name: string, scriptPath: string): PostgresFunction { const scripts: string[] = [ 'create_extension_pg_stat_statements.sql', 'create_extension_uuid_ossp.sql', + 'dydx_asset_create_handler.sql', 'dydx_market_create_handler.sql', 'dydx_market_modify_handler.sql', 'dydx_market_price_update_handler.sql', diff --git a/indexer/services/ender/src/scripts/dydx_asset_create_handler.sql b/indexer/services/ender/src/scripts/dydx_asset_create_handler.sql new file mode 100644 index 0000000000..0e3a9dce8a --- /dev/null +++ b/indexer/services/ender/src/scripts/dydx_asset_create_handler.sql @@ -0,0 +1,34 @@ +/** + Parameters: + - event_data: The 'data' field of the IndexerTendermintEvent (https://github.com/dydxprotocol/v4-proto/blob/8d35c86/dydxprotocol/indexer/indexer_manager/event.proto#L25) + converted to JSON format. Conversion to JSON is expected to be done by JSON.stringify. + Returns: JSON object containing fields: + - asset: The created asset in asset-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/asset-model.ts). +*/ +CREATE OR REPLACE FUNCTION dydx_asset_create_handler(event_data jsonb) RETURNS jsonb AS $$ +DECLARE + market_record_id integer; + asset_record assets%ROWTYPE; +BEGIN + asset_record."id" = event_data->>'id'; + asset_record."atomicResolution" = (event_data->'atomicResolution')::integer; + asset_record."symbol" = event_data->>'symbol'; + + asset_record."hasMarket" = (event_data->'hasMarket')::bool; + if asset_record."hasMarket" THEN + market_record_id = (event_data->'marketId')::integer; + SELECT "id" INTO asset_record."marketId" FROM markets WHERE "id" = market_record_id; + + IF NOT FOUND THEN + RAISE EXCEPTION 'Unable to find market with id: %', market_record_id; + END IF; + END IF; + + INSERT INTO assets VALUES (asset_record.*); + + RETURN jsonb_build_object( + 'asset', + dydx_to_jsonb(asset_record) + ); +END; +$$ LANGUAGE plpgsql; \ No newline at end of file