From 4da05e4ffadcd2c4e75e0a1146ec2713087034d3 Mon Sep 17 00:00:00 2001 From: Lukasz Cwik <126621805+lcwik@users.noreply.github.com> Date: Tue, 7 Nov 2023 15:33:38 -0800 Subject: [PATCH] [IND-471] Update ender funding to execute updates via a SQL function. (#770) --- .../handlers/funding-handler.test.ts | 421 ++++++++++-------- .../ender/__tests__/scripts/scripts.test.ts | 11 + indexer/services/ender/src/config.ts | 3 + .../ender/src/handlers/funding-handler.ts | 103 ++++- .../helpers/postgres/postgres-functions.ts | 2 + .../src/scripts/dydx_funding_handler.sql | 93 ++++ ...x_uuid_from_funding_index_update_parts.sql | 8 + 7 files changed, 464 insertions(+), 177 deletions(-) create mode 100644 indexer/services/ender/src/scripts/dydx_funding_handler.sql create mode 100644 indexer/services/ender/src/scripts/dydx_uuid_from_funding_index_update_parts.sql diff --git a/indexer/services/ender/__tests__/handlers/funding-handler.test.ts b/indexer/services/ender/__tests__/handlers/funding-handler.test.ts index 6b9b8ec117f..5ad391ead26 100644 --- a/indexer/services/ender/__tests__/handlers/funding-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/funding-handler.test.ts @@ -42,6 +42,7 @@ import { redisClient } from '../../src/helpers/redis/redis-controller'; import { bigIntToBytes } from '@dydxprotocol-indexer/v4-proto-parser'; import { startPriceCache } from '../../src/caches/price-cache'; import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions'; +import config from '../../src/config'; describe('fundingHandler', () => { beforeAll(async () => { @@ -114,201 +115,273 @@ describe('fundingHandler', () => { }); }); - it('successfully processes single premium sample event', async () => { - const kafkaMessage: KafkaMessage = createKafkaMessageFromFundingEvents({ - fundingEvents: [defaultFundingUpdateSampleEvent], - height: defaultHeight, - time: defaultTime, + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'successfully processes single premium sample event (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_FUNDING_HANDLER_SQL_FUNCTION = useSqlFunction; + const kafkaMessage: KafkaMessage = createKafkaMessageFromFundingEvents({ + fundingEvents: [defaultFundingUpdateSampleEvent], + height: defaultHeight, + time: defaultTime, + }); + + await onMessage(kafkaMessage); + + await expectNextFundingRate( + 'BTC-USD', + new Big(protocolTranslations.funding8HourValuePpmTo1HourRate( + defaultFundingUpdateSampleEvent.updates[0].fundingValuePpm, + )), + ); + if (!useSqlFunction) { + expectTimingStat('handle_premium_sample'); + } }); - await onMessage(kafkaMessage); + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'successfully processes multiple premium sample event for different markets (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_FUNDING_HANDLER_SQL_FUNCTION = useSqlFunction; + const fundingUpdateSampleEvent2: FundingEventV1 = { + type: FundingEventV1_Type.TYPE_PREMIUM_SAMPLE, + updates: [ + { + perpetualId: 0, + fundingValuePpm: 100, + fundingIndex: bigIntToBytes(BigInt(0)), + }, + { + perpetualId: 1, + fundingValuePpm: 50, + fundingIndex: bigIntToBytes(BigInt(0)), + }, + ], + }; - await expectNextFundingRate( - 'BTC-USD', - new Big(protocolTranslations.funding8HourValuePpmTo1HourRate( - defaultFundingUpdateSampleEvent.updates[0].fundingValuePpm, - )), - ); - expectTimingStat('handle_premium_sample'); - }); + const kafkaMessage: KafkaMessage = createKafkaMessageFromFundingEvents({ + fundingEvents: [defaultFundingUpdateSampleEvent, fundingUpdateSampleEvent2], + height: defaultHeight, + time: defaultTime, + }); - it('successfully processes multiple premium sample event for different markets', async () => { - const fundingUpdateSampleEvent2: FundingEventV1 = { - type: FundingEventV1_Type.TYPE_PREMIUM_SAMPLE, - updates: [ - { - perpetualId: 0, - fundingValuePpm: 100, - fundingIndex: bigIntToBytes(BigInt(0)), - }, - { - perpetualId: 1, - fundingValuePpm: 50, - fundingIndex: bigIntToBytes(BigInt(0)), - }, - ], - }; + await onMessage(kafkaMessage); - const kafkaMessage: KafkaMessage = createKafkaMessageFromFundingEvents({ - fundingEvents: [defaultFundingUpdateSampleEvent, fundingUpdateSampleEvent2], - height: defaultHeight, - time: defaultTime, + await expectNextFundingRate( + 'BTC-USD', + new Big('0.000006875'), + ); + await expectNextFundingRate( + 'ETH-USD', + new Big(protocolTranslations.funding8HourValuePpmTo1HourRate( + fundingUpdateSampleEvent2.updates[1].fundingValuePpm, + )), + ); + if (!useSqlFunction) { + expectTimingStat('handle_premium_sample'); + } }); - await onMessage(kafkaMessage); + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'successfully processes and clears cache for a new funding rate (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_FUNDING_HANDLER_SQL_FUNCTION = useSqlFunction; + const kafkaMessage: KafkaMessage = createKafkaMessageFromFundingEvents({ + fundingEvents: [defaultFundingUpdateSampleEvent], + height: defaultHeight, + time: defaultTime, + }); - await expectNextFundingRate( - 'BTC-USD', - new Big('0.000006875'), - ); - await expectNextFundingRate( - 'ETH-USD', - new Big(protocolTranslations.funding8HourValuePpmTo1HourRate( - fundingUpdateSampleEvent2.updates[1].fundingValuePpm, - )), - ); - expectTimingStat('handle_premium_sample'); - }); + await onMessage(kafkaMessage); - it('successfully processes and clears cache for a new funding rate', async () => { - const kafkaMessage: KafkaMessage = createKafkaMessageFromFundingEvents({ - fundingEvents: [defaultFundingUpdateSampleEvent], - height: defaultHeight, - time: defaultTime, - }); + await expectNextFundingRate( + 'BTC-USD', + new Big(protocolTranslations.funding8HourValuePpmTo1HourRate( + defaultFundingUpdateSampleEvent.updates[0].fundingValuePpm, + )), + ); + if (!useSqlFunction) { + expectTimingStat('handle_premium_sample'); + } - await onMessage(kafkaMessage); + const kafkaMessage2: KafkaMessage = createKafkaMessageFromFundingEvents({ + fundingEvents: [defaultFundingRateEvent], + height: 4, + time: defaultTime, + }); - await expectNextFundingRate( - 'BTC-USD', - new Big(protocolTranslations.funding8HourValuePpmTo1HourRate( - defaultFundingUpdateSampleEvent.updates[0].fundingValuePpm, - )), - ); - expectTimingStat('handle_premium_sample'); + await onMessage(kafkaMessage2); + await expectNextFundingRate( + 'BTC-USD', + undefined, + ); + const fundingIndices: FundingIndexUpdatesFromDatabase[] = await + FundingIndexUpdatesTable.findAll({}, [], {}); - const kafkaMessage2: KafkaMessage = createKafkaMessageFromFundingEvents({ - fundingEvents: [defaultFundingRateEvent], - height: 4, - time: defaultTime, + expect(fundingIndices.length).toEqual(1); + expect(fundingIndices[0]).toEqual(expect.objectContaining({ + perpetualId: '0', + rate: '0.00000125', + oraclePrice: '10000', + fundingIndex: '0.1', + })); + if (!useSqlFunction) { + expectTimingStat('handle_funding_rate'); + } }); - await onMessage(kafkaMessage2); - await expectNextFundingRate( - 'BTC-USD', - undefined, - ); - const fundingIndices: FundingIndexUpdatesFromDatabase[] = await - FundingIndexUpdatesTable.findAll({}, [], {}); + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'successfully processes and clears cache for multiple new funding rates (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_FUNDING_HANDLER_SQL_FUNCTION = useSqlFunction; + const fundingSampleEvent: FundingEventV1 = { + type: FundingEventV1_Type.TYPE_PREMIUM_SAMPLE, + updates: [ + { + perpetualId: 0, + fundingValuePpm: 100, + fundingIndex: bigIntToBytes(BigInt(0)), + }, + { + perpetualId: 1, + fundingValuePpm: 50, + fundingIndex: bigIntToBytes(BigInt(0)), + }, + ], + }; + const kafkaMessage: KafkaMessage = createKafkaMessageFromFundingEvents({ + fundingEvents: [fundingSampleEvent], + height: defaultHeight, + time: defaultTime, + }); - expect(fundingIndices.length).toEqual(1); - expect(fundingIndices[0]).toEqual(expect.objectContaining({ - perpetualId: '0', - rate: '0.00000125', - oraclePrice: '10000', - fundingIndex: '0.1', - })); - expectTimingStat('handle_funding_rate'); - }); + await onMessage(kafkaMessage); - it('successfully processes and clears cache for multiple new funding rates', async () => { - const fundingSampleEvent: FundingEventV1 = { - type: FundingEventV1_Type.TYPE_PREMIUM_SAMPLE, - updates: [ - { - perpetualId: 0, - fundingValuePpm: 100, - fundingIndex: bigIntToBytes(BigInt(0)), - }, - { - perpetualId: 1, - fundingValuePpm: 50, - fundingIndex: bigIntToBytes(BigInt(0)), - }, - ], - }; - const kafkaMessage: KafkaMessage = createKafkaMessageFromFundingEvents({ - fundingEvents: [fundingSampleEvent], - height: defaultHeight, - time: defaultTime, - }); + await Promise.all([ + expectNextFundingRate( + 'BTC-USD', + new Big(protocolTranslations.funding8HourValuePpmTo1HourRate( + fundingSampleEvent.updates[0].fundingValuePpm, + )), + ), + expectNextFundingRate( + 'ETH-USD', + new Big(protocolTranslations.funding8HourValuePpmTo1HourRate( + fundingSampleEvent.updates[1].fundingValuePpm, + )), + ), + ]); + if (!useSqlFunction) { + expectTimingStat('handle_premium_sample'); + } - await onMessage(kafkaMessage); + const fundingRateEvent: FundingEventMessage = { + type: FundingEventV1_Type.TYPE_FUNDING_RATE_AND_INDEX, + updates: [ + { + perpetualId: 0, + fundingValuePpm: 10, + fundingIndex: bigIntToBytes(BigInt(10)), + }, + { + perpetualId: 1, + fundingValuePpm: 100, + fundingIndex: bigIntToBytes(BigInt(100)), + }, + ], + }; + const kafkaMessage2: KafkaMessage = createKafkaMessageFromFundingEvents({ + fundingEvents: [fundingRateEvent], + height: 4, + time: defaultTime, + }); - await Promise.all([ - expectNextFundingRate( - 'BTC-USD', - new Big(protocolTranslations.funding8HourValuePpmTo1HourRate( - fundingSampleEvent.updates[0].fundingValuePpm, - )), - ), - expectNextFundingRate( - 'ETH-USD', - new Big(protocolTranslations.funding8HourValuePpmTo1HourRate( - fundingSampleEvent.updates[1].fundingValuePpm, - )), - ), - ]); - expectTimingStat('handle_premium_sample'); - - const fundingRateEvent: FundingEventMessage = { - type: FundingEventV1_Type.TYPE_FUNDING_RATE_AND_INDEX, - updates: [ - { - perpetualId: 0, - fundingValuePpm: 10, - fundingIndex: bigIntToBytes(BigInt(10)), - }, + await onMessage(kafkaMessage2); + await Promise.all([ + expectNextFundingRate( + 'BTC-USD', + undefined, + ), + expectNextFundingRate( + 'ETH-USD', + undefined, + ), + ]); + const fundingIndices: FundingIndexUpdatesFromDatabase[] = await + FundingIndexUpdatesTable.findAll( + {}, + [], { - perpetualId: 1, - fundingValuePpm: 100, - fundingIndex: bigIntToBytes(BigInt(100)), + orderBy: [[FundingIndexUpdatesColumns.perpetualId, Ordering.ASC]], }, - ], - }; - const kafkaMessage2: KafkaMessage = createKafkaMessageFromFundingEvents({ - fundingEvents: [fundingRateEvent], - height: 4, - time: defaultTime, - }); - - await onMessage(kafkaMessage2); - await Promise.all([ - expectNextFundingRate( - 'BTC-USD', - undefined, - ), - expectNextFundingRate( - 'ETH-USD', - undefined, - ), - ]); - const fundingIndices: FundingIndexUpdatesFromDatabase[] = await - FundingIndexUpdatesTable.findAll( - {}, - [], - { - orderBy: [[FundingIndexUpdatesColumns.perpetualId, Ordering.ASC]], - }, - ); + ); - expect(fundingIndices.length).toEqual(2); - expect(fundingIndices[0]).toEqual(expect.objectContaining({ - perpetualId: '0', - rate: '0.00000125', - oraclePrice: '10000', - // 1e1 * 1e-6 * 1e-6 / 1e-10 = 1e-1 - fundingIndex: '0.1', - })); - expect(fundingIndices[1]).toEqual(expect.objectContaining({ - perpetualId: '1', - rate: '0.0000125', - oraclePrice: '500', - // 1e2 * 1e-6 * 1e-6 / 1e-18 = 1e8 - fundingIndex: '100000000', - })); - expectTimingStat('handle_funding_rate'); - }); + expect(fundingIndices.length).toEqual(2); + expect(fundingIndices[0]).toEqual(expect.objectContaining({ + perpetualId: '0', + rate: '0.00000125', + oraclePrice: '10000', + // 1e1 * 1e-6 * 1e-6 / 1e-10 = 1e-1 + fundingIndex: '0.1', + })); + expect(fundingIndices[1]).toEqual(expect.objectContaining({ + perpetualId: '1', + rate: '0.0000125', + oraclePrice: '500', + // 1e2 * 1e-6 * 1e-6 / 1e-18 = 1e8 + fundingIndex: '100000000', + })); + if (!useSqlFunction) { + expectTimingStat('handle_funding_rate'); + } + }); }); function expectTimingStat(fnName: string) { diff --git a/indexer/services/ender/__tests__/scripts/scripts.test.ts b/indexer/services/ender/__tests__/scripts/scripts.test.ts index 3c14458953c..359273343ce 100644 --- a/indexer/services/ender/__tests__/scripts/scripts.test.ts +++ b/indexer/services/ender/__tests__/scripts/scripts.test.ts @@ -20,6 +20,7 @@ import { PositionSide, TendermintEventTable, FillTable, + FundingIndexUpdatesTable, OraclePriceTable, OrderTable, protocolTranslations, @@ -256,6 +257,16 @@ describe('SQL Function Tests', () => { expect(result).toEqual(FillTable.uuid(eventId, liquidity)); }); + it.each([ + [1, 2, 3, 4], + [5, 6, 7, 8], + ])('dydx_uuid_from_funding_index_update_parts (%s, %s, %s, %s)', async (blockHeight: number, transactionIndex: number, eventIndex: number, perpetualId: number) => { + const eventId = TendermintEventTable.createEventId(`${blockHeight}`, transactionIndex, eventIndex); + const result = await getSingleRawQueryResultRow( + `SELECT dydx_uuid_from_funding_index_update_parts('${blockHeight}', '\\x${eventId.toString('hex')}'::bytea, '${perpetualId}') AS result`); + expect(result).toEqual(FundingIndexUpdatesTable.uuid(`${blockHeight}`, eventId, `${perpetualId}`)); + }); + it.each([ { subaccountId: { diff --git a/indexer/services/ender/src/config.ts b/indexer/services/ender/src/config.ts index e6fd60d9f35..33ea3ddf2ff 100644 --- a/indexer/services/ender/src/config.ts +++ b/indexer/services/ender/src/config.ts @@ -26,6 +26,9 @@ export const configSchema = { USE_ASSET_CREATE_HANDLER_SQL_FUNCTION: parseBoolean({ default: true, }), + USE_FUNDING_HANDLER_SQL_FUNCTION: parseBoolean({ + default: true, + }), USE_LIQUIDATION_HANDLER_SQL_FUNCTION: parseBoolean({ default: true, }), diff --git a/indexer/services/ender/src/handlers/funding-handler.ts b/indexer/services/ender/src/handlers/funding-handler.ts index 676cfd9976e..28baf619eca 100644 --- a/indexer/services/ender/src/handlers/funding-handler.ts +++ b/indexer/services/ender/src/handlers/funding-handler.ts @@ -7,14 +7,22 @@ import { FundingIndexUpdatesCreateObject, FundingIndexUpdatesFromDatabase, protocolTranslations, + storeHelpers, + PerpetualMarketModel, } from '@dydxprotocol-indexer/postgres'; import { NextFundingCache } from '@dydxprotocol-indexer/redis'; import { bytesToBigInt } from '@dydxprotocol-indexer/v4-proto-parser'; -import { FundingEventV1_Type, FundingUpdateV1 } from '@dydxprotocol-indexer/v4-protos'; +import { + FundingEventV1, + FundingEventV1_Type, + FundingUpdateV1, +} from '@dydxprotocol-indexer/v4-protos'; import Big from 'big.js'; import _ from 'lodash'; +import * as pg from 'pg'; import { getPrice } from '../caches/price-cache'; +import config from '../config'; import { redisClient } from '../helpers/redis/redis-controller'; import { indexerTendermintEventToTransactionIndex } from '../lib/helper'; import { ConsolidatedKafkaEvent, FundingEventMessage } from '../lib/types'; @@ -48,6 +56,95 @@ export class FundingHandler extends Handler { // eslint-disable-next-line @typescript-eslint/require-await public async internalHandle(): Promise { + if (config.USE_FUNDING_HANDLER_SQL_FUNCTION) { + return this.handleViaSqlFunction(); + } + return this.handleViaKnex(); + } + + private async handleViaSqlFunction(): Promise { + const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes; + const transactionIndex: number = indexerTendermintEventToTransactionIndex( + this.indexerTendermintEvent, + ); + const result: pg.QueryResult = await storeHelpers.rawQuery( + `SELECT dydx_funding_handler( + ${this.block.height}, + '${this.block.time?.toISOString()}', + '${JSON.stringify(FundingEventV1.decode(eventDataBinary))}', + ${this.indexerTendermintEvent.eventIndex}, + ${transactionIndex} + ) AS result;`, + { txId: this.txId }, + ).catch((error: Error) => { + logger.error({ + at: 'FundingHandler#handleViaSqlFunction', + message: 'Failed to handle FundingEventV1', + error, + }); + + throw error; + }); + + const perpetualMarkets: + Map = new Map(); + for (const [key, perpetualMarket] of Object.entries(result.rows[0].result.perpetual_markets)) { + perpetualMarkets.set( + key, + PerpetualMarketModel.fromJson(perpetualMarket as object) as PerpetualMarketFromDatabase, + ); + } + + const promises: Promise[] = new Array>(this.event.updates.length); + + for (let i: number = 0; i < this.event.updates.length; i++) { + const update: FundingUpdateV1 = this.event.updates[i]; + if (result.rows[0].result.errors[i] != null) { + logger.error({ + at: 'FundingHandler#handleFundingSample', + message: result.rows[0].result.errors[i], + update, + }); + continue; + } + + const perpetualMarket: + PerpetualMarketFromDatabase | undefined = perpetualMarkets.get(update.perpetualId.toString()); + if (perpetualMarket === undefined) { + logger.error({ + at: 'FundingHandler#handleFundingSample', + message: 'Received FundingUpdate with unknown perpetualId.', + update, + }); + continue; + } + + switch (this.event.type) { + case FundingEventV1_Type.TYPE_PREMIUM_SAMPLE: + promises[i] = NextFundingCache.addFundingSample( + perpetualMarket.ticker, + new Big(protocolTranslations.funding8HourValuePpmTo1HourRate(update.fundingValuePpm)), + redisClient, + ); + break; + case FundingEventV1_Type.TYPE_FUNDING_RATE_AND_INDEX: + // clear the cache for the predicted next funding rate + promises[i] = NextFundingCache.clearFundingSamples(perpetualMarket.ticker, redisClient); + break; + default: + logger.error({ + at: 'FundingHandler#handle', + message: 'Received unknown FundingEvent type.', + event: this.event, + }); + } + } + + await Promise.all(promises); + return []; + } + + private async handleViaKnex(): Promise { logger.info({ at: 'FundingHandler#handle', message: 'Received FundingEvent.', @@ -77,7 +174,7 @@ export class FundingHandler extends Handler { return []; } - public async handleFundingSample(samples: FundingUpdateV1[]): Promise { + private async handleFundingSample(samples: FundingUpdateV1[]): Promise { await Promise.all( _.map(samples, (sample: FundingUpdateV1) => { const perpetualMarket: @@ -101,7 +198,7 @@ export class FundingHandler extends Handler { ); } - public async handleFundingRate(updates: FundingUpdateV1[]): Promise { + private async handleFundingRate(updates: FundingUpdateV1[]): Promise { // clear the cache for the predicted next funding rate await Promise.all( _.map(updates, (update: FundingUpdateV1) => { diff --git a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts index 6a5b2b2f189..6193321c190 100644 --- a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts +++ b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts @@ -40,6 +40,7 @@ const scripts: string[] = [ 'dydx_from_protocol_order_side.sql', 'dydx_from_protocol_time_in_force.sql', 'dydx_from_serializable_int.sql', + 'dydx_funding_handler.sql', 'dydx_get_fee_from_liquidity.sql', 'dydx_get_order_status.sql', 'dydx_get_total_filled_from_liquidity.sql', @@ -57,6 +58,7 @@ const scripts: string[] = [ 'dydx_uuid.sql', 'dydx_uuid_from_asset_position_parts.sql', 'dydx_uuid_from_fill_event_parts.sql', + 'dydx_uuid_from_funding_index_update_parts.sql', 'dydx_uuid_from_oracle_price_parts.sql', 'dydx_uuid_from_order_id.sql', 'dydx_uuid_from_order_id_parts.sql', diff --git a/indexer/services/ender/src/scripts/dydx_funding_handler.sql b/indexer/services/ender/src/scripts/dydx_funding_handler.sql new file mode 100644 index 00000000000..d9c057cbdac --- /dev/null +++ b/indexer/services/ender/src/scripts/dydx_funding_handler.sql @@ -0,0 +1,93 @@ +/** + Parameters: + - block_height: the height of the block being processing. + - block_time: the time of the block being processed. + - event_data: The 'data' field of the IndexerTendermintEvent (https://github.com/dydxprotocol/v4-proto/blob/8d35c86/dydxprotocol/indexer/indexer_manager/event.proto#L25) + converted to JSON format. Conversion to JSON is expected to be done by JSON.stringify. + - event_index: The 'event_index' of the IndexerTendermintEvent. + - transaction_index: The transaction_index of the IndexerTendermintEvent after the conversion that takes into + account the block_event (https://github.com/dydxprotocol/indexer/blob/cc70982/services/ender/src/lib/helper.ts#L33) + Returns: JSON object containing fields: + - perpetual_markets: A mapping from perpetual market id to the associated perpetual market in perpetual-market-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/perpetual-market-model.ts). + - errors: An array containing an error string (or NULL if no error occurred) for each FundingEventUpdate. +*/ +CREATE OR REPLACE FUNCTION dydx_funding_handler( + block_height int, block_time timestamp, event_data jsonb, event_index int, transaction_index int) RETURNS jsonb AS $$ +DECLARE + PPM_EXPONENT constant numeric = -6; + FUNDING_RATE_FROM_PROTOCOL_IN_HOURS constant numeric = 8; + QUOTE_CURRENCY_ATOMIC_RESOLUTION constant numeric = -6; + + TYPE_PREMIUM_SAMPLE constant jsonb = '1'; + TYPE_FUNDING_RATE_AND_INDEX constant jsonb = '2'; + + perpetual_market_id bigint; + perpetual_market_record perpetual_markets%ROWTYPE; + funding_index_updates_record funding_index_updates%ROWTYPE; + oracle_prices_record oracle_prices%ROWTYPE; + + funding_update jsonb; + perpetual_markets_response jsonb = jsonb_build_object(); + errors_response jsonb[]; + event_id bytea; +BEGIN + FOR funding_update IN SELECT * FROM jsonb_array_elements(event_data->'updates') LOOP + perpetual_market_id = (funding_update->'perpetualId')::bigint; + SELECT * INTO perpetual_market_record FROM perpetual_markets WHERE "id" = perpetual_market_id; + IF NOT FOUND THEN + errors_response = array_append(errors_response, 'Received FundingUpdate with unknown perpetualId.'); + END IF; + + perpetual_markets_response = jsonb_set(perpetual_markets_response, ARRAY[(perpetual_market_record."id")::text], dydx_to_jsonb(perpetual_market_record)); + + CASE event_data->'type' + WHEN TYPE_PREMIUM_SAMPLE THEN + /** Here we just need to return the associated perpetual market. */ + WHEN TYPE_FUNDING_RATE_AND_INDEX THEN + /** Returns the latest oracle price <= current block_height. */ + SELECT * INTO oracle_prices_record + FROM oracle_prices + WHERE "marketId" = perpetual_market_record."marketId" AND "effectiveAtHeight" <= block_height + ORDER BY "effectiveAtHeight" + DESC LIMIT 1; + IF NOT FOUND THEN + RAISE EXCEPTION 'price not found for marketId %', perpetual_market_record."marketId"; + END IF; + + event_id = dydx_event_id_from_parts(block_height, transaction_index, event_index); + + funding_index_updates_record."id" = dydx_uuid_from_funding_index_update_parts( + block_height, + event_id, + perpetual_market_record."id"); + funding_index_updates_record."perpetualId" = perpetual_market_id; + funding_index_updates_record."eventId" = event_id; + funding_index_updates_record."effectiveAt" = block_time; + funding_index_updates_record."rate" = dydx_trim_scale( + power(10, PPM_EXPONENT) / + FUNDING_RATE_FROM_PROTOCOL_IN_HOURS * + (funding_update->'fundingValuePpm')::numeric); + funding_index_updates_record."oraclePrice" = oracle_prices_record."price"; + funding_index_updates_record."fundingIndex" = dydx_trim_scale( + dydx_from_serializable_int(funding_update->'fundingIndex') * + power(10, + PPM_EXPONENT + QUOTE_CURRENCY_ATOMIC_RESOLUTION - perpetual_market_record."atomicResolution")); + funding_index_updates_record."effectiveAtHeight" = block_height; + + INSERT INTO funding_index_updates VALUES (funding_index_updates_record.*); + ELSE + errors_response = array_append(errors_response, 'Received unknown FundingEvent type.'); + CONTINUE; + END CASE; + + errors_response = array_append(errors_response, NULL); + END LOOP; + + RETURN jsonb_build_object( + 'perpetual_markets', + perpetual_markets_response, + 'errors', + to_jsonb(errors_response) + ); +END; +$$ LANGUAGE plpgsql; \ No newline at end of file diff --git a/indexer/services/ender/src/scripts/dydx_uuid_from_funding_index_update_parts.sql b/indexer/services/ender/src/scripts/dydx_uuid_from_funding_index_update_parts.sql new file mode 100644 index 00000000000..dfcf4455c09 --- /dev/null +++ b/indexer/services/ender/src/scripts/dydx_uuid_from_funding_index_update_parts.sql @@ -0,0 +1,8 @@ +/** + Returns a UUID using the parts of a funding index update. +*/ +CREATE OR REPLACE FUNCTION dydx_uuid_from_funding_index_update_parts(block_height int, event_id bytea, perpetual_id bigint) RETURNS uuid AS $$ +BEGIN + return dydx_uuid(concat(block_height, '-', encode(event_id, 'hex'), '-', perpetual_id)); +END; +$$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE;