-
Notifications
You must be signed in to change notification settings - Fork 117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[IND-471] Update ender funding to execute updates via a SQL function. #770
Conversation
IND-471 Ender funding handler SQL pushdown
Update |
WalkthroughThe changes introduce a new configuration option and modify the Changes
TipsChat with CodeRabbit Bot (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 2
Configuration used: CodeRabbit UI
Files selected for processing (7)
- indexer/services/ender/tests/handlers/funding-handler.test.ts (2 hunks)
- indexer/services/ender/tests/scripts/scripts.test.ts (2 hunks)
- indexer/services/ender/src/config.ts (1 hunks)
- indexer/services/ender/src/handlers/funding-handler.ts (4 hunks)
- indexer/services/ender/src/helpers/postgres/postgres-functions.ts (2 hunks)
- indexer/services/ender/src/scripts/dydx_funding_handler.sql (1 hunks)
- indexer/services/ender/src/scripts/dydx_uuid_from_funding_index_update_parts.sql (1 hunks)
Files skipped from review due to trivial changes (1)
- indexer/services/ender/src/config.ts
Additional comments: 12
indexer/services/ender/__tests__/scripts/scripts.test.ts (1)
- 257-267: The new test case for the
dydx_uuid_from_funding_index_update_parts
function is correctly implemented. It checks the function's behavior by passing different input parameters and asserting the expected output. The test case is parameterized, which is a good practice for testing different scenarios.indexer/services/ender/src/scripts/dydx_uuid_from_funding_index_update_parts.sql (1)
- 1-8: The function
dydx_uuid_from_funding_index_update_parts
is well defined and seems to be logically correct. However, ensure that thedydx_uuid
function that is being called in line 6 is already defined and available in the database. Also, verify that theencode
function is correctly convertingevent_id
to hexadecimal format.indexer/services/ender/src/helpers/postgres/postgres-functions.ts (2)
40-46: The addition of 'dydx_funding_handler.sql' to the scripts array introduces a new SQL function. Ensure that this function is correctly implemented and tested.
58-64: The addition of 'dydx_uuid_from_funding_index_update_parts.sql' to the scripts array introduces a new SQL function. Ensure that this function is correctly implemented and tested.
indexer/services/ender/__tests__/handlers/funding-handler.test.ts (4)
118-152: Parameterized tests have been introduced to test the processing of a single premium sample event via both knex and SQL function. The configuration is set based on the test parameters and the expected next funding rate is checked. If the SQL function is not used, the timing stat for 'handle_premium_sample' is expected.
153-206: Parameterized tests have been introduced to test the processing of multiple premium sample events for different markets via both knex and SQL function. The configuration is set based on the test parameters and the expected next funding rate for both 'BTC-USD' and 'ETH-USD' is checked. If the SQL function is not used, the timing stat for 'handle_premium_sample' is expected.
208-266: Parameterized tests have been introduced to test the processing and cache clearing for a new funding rate via both knex and SQL function. The configuration is set based on the test parameters and the expected next funding rate is checked. If the SQL function is not used, the timing stat for 'handle_funding_rate' is expected. The funding indices are also checked.
268-384: Parameterized tests have been introduced to test the processing and cache clearing for multiple new funding rates via both knex and SQL function. The configuration is set based on the test parameters and the expected next funding rate for both 'BTC-USD' and 'ETH-USD' is checked. If the SQL function is not used, the timing stat for 'handle_funding_rate' is expected. The funding indices are also checked.
indexer/services/ender/src/handlers/funding-handler.ts (3)
56-63: The
internalHandle
method now checks theUSE_FUNDING_HANDLER_SQL_FUNCTION
configuration and calls the appropriate handling method. Ensure that the configuration is set correctly in all environments where this code will be deployed.174-180: > Note: This review was outside of the patch, so it was mapped to the patch with the greatest overlap. Original lines [177-199]
The
handleFundingSample
method has been changed to private. Ensure that this method is not being called from outside this class in the codebase.
- 201-204: The
handleFundingRate
method has been changed to private. Ensure that this method is not being called from outside this class in the codebase.indexer/services/ender/src/scripts/dydx_funding_handler.sql (1)
- 14-93: The function
dydx_funding_handler
is well-structured and follows good practices for SQL functions. However, there are a few areas that could be improved:
Error handling: The function currently appends error messages to an array and continues processing. This approach might lead to silent failures where the function continues to execute despite encountering errors. Consider raising exceptions instead of appending to an error array to halt execution when an error occurs.
Use of constants: The function uses several constants defined at the beginning. While this is a good practice, it would be beneficial to document what these constants represent and why they are needed.
Query optimization: The function performs a SELECT query inside a loop (line 36). This could potentially lead to performance issues if the number of iterations is large. Consider fetching all necessary data before the loop and storing it in a data structure for quick access.
Code readability: The function contains complex calculations (lines 67-74). Consider breaking these calculations into smaller, more readable parts or adding comments to explain what they do.
Here are the proposed changes:
14: CREATE OR REPLACE FUNCTION dydx_funding_handler( 15: block_height int, block_time timestamp, event_data jsonb, event_index int, transaction_index int) RETURNS jsonb AS $$ 16: DECLARE 17: PPM_EXPONENT constant numeric = -6; -- Add explanation here 18: FUNDING_RATE_FROM_PROTOCOL_IN_HOURS constant numeric = 8; -- Add explanation here 19: QUOTE_CURRENCY_ATOMIC_RESOLUTION constant numeric = -6; -- Add explanation here 20: 21: TYPE_PREMIUM_SAMPLE constant jsonb = '1'; 22: TYPE_FUNDING_RATE_AND_INDEX constant jsonb = '2'; 23: 24: perpetual_market_id bigint; 25: perpetual_market_record perpetual_markets%ROWTYPE; 26: funding_index_updates_record funding_index_updates%ROWTYPE; 27: oracle_prices_record oracle_prices%ROWTYPE; 28: 29: funding_update jsonb; 30: perpetual_markets_response jsonb = jsonb_build_object(); 31: errors_response jsonb[]; 32: event_id bytea; 33: BEGIN 34: -- Fetch all necessary data here and store in a data structure for quick access 35: FOR funding_update IN SELECT * FROM jsonb_array_elements(event_data->'updates') LOOP 36: perpetual_market_id = (funding_update->'perpetualId')::bigint; 37: SELECT * INTO perpetual_market_record FROM perpetual_markets WHERE "id" = perpetual_market_id; 38: IF NOT FOUND THEN 39: RAISE EXCEPTION 'Received FundingUpdate with unknown perpetualId.'; -- Raise exception instead of appending to error array 40: END IF; 41: 42: perpetual_markets_response = jsonb_set(perpetual_markets_response, ARRAY[(perpetual_market_record."id")::text], dydx_to_jsonb(perpetual_market_record)); 43: 44: CASE event_data->'type' 45: WHEN TYPE_PREMIUM_SAMPLE THEN 46: /** Here we just need to return the associated perpetual market. */ 47: WHEN TYPE_FUNDING_RATE_AND_INDEX THEN 48: /** Returns the latest oracle price <= current block_height. */ 49: SELECT * INTO oracle_prices_record 50: FROM oracle_prices 51: WHERE "marketId" = perpetual_market_record."marketId" AND "effectiveAtHeight" <= block_height 52: ORDER BY "effectiveAtHeight" 53: DESC LIMIT 1; 54: IF NOT FOUND THEN 55: RAISE EXCEPTION 'price not found for marketId %', perpetual_market_record."marketId"; -- Raise exception instead of appending to error array 56: END IF; 57: 58: event_id = dydx_event_id_from_parts(block_height, transaction_index, event_index); 59: 60: funding_index_updates_record."id" = dydx_uuid_from_funding_index_update_parts( 61: block_height, 62: event_id, 63: perpetual_market_record."id"); 64: funding_index_updates_record."perpetualId" = perpetual_market_id; 65: funding_index_updates_record."eventId" = event_id; 66: funding_index_updates_record."effectiveAt" = block_time; 67: -- Break complex calculation into smaller, more readable parts or add comments to explain what it does 68: funding_index_updates_record."rate" = dydx_trim_scale( 69: power(10, PPM_EXPONENT) / 70: FUNDING_RATE_FROM_PROTOCOL_IN_HOURS * 71: (funding_update->'fundingValuePpm')::numeric); 72: funding_index_updates_record."oraclePrice" = oracle_prices_record."price"; 73: -- Break complex calculation into smaller, more readable parts or add comments </blockquote></details></blockquote></details> </details>
private async handleViaSqlFunction(): Promise<ConsolidatedKafkaEvent[]> { | ||
const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes; | ||
const transactionIndex: number = indexerTendermintEventToTransactionIndex( | ||
this.indexerTendermintEvent, | ||
); | ||
const result: pg.QueryResult = await storeHelpers.rawQuery( | ||
`SELECT dydx_funding_handler( | ||
${this.block.height}, | ||
'${this.block.time?.toISOString()}', | ||
'${JSON.stringify(FundingEventV1.decode(eventDataBinary))}', | ||
${this.indexerTendermintEvent.eventIndex}, | ||
${transactionIndex} | ||
) AS result;`, | ||
{ txId: this.txId }, | ||
).catch((error: Error) => { | ||
logger.error({ | ||
at: 'FundingHandler#handleViaSqlFunction', | ||
message: 'Failed to handle FundingEventV1', | ||
error, | ||
}); | ||
|
||
throw error; | ||
}); | ||
|
||
const perpetualMarkets: | ||
Map<string, PerpetualMarketFromDatabase> = new Map<string, PerpetualMarketFromDatabase>(); | ||
for (const [key, perpetualMarket] of Object.entries(result.rows[0].result.perpetual_markets)) { | ||
perpetualMarkets.set( | ||
key, | ||
PerpetualMarketModel.fromJson(perpetualMarket as object) as PerpetualMarketFromDatabase, | ||
); | ||
} | ||
|
||
const promises: Promise<number>[] = new Array<Promise<number>>(this.event.updates.length); | ||
|
||
for (let i: number = 0; i < this.event.updates.length; i++) { | ||
const update: FundingUpdateV1 = this.event.updates[i]; | ||
if (result.rows[0].result.errors[i] != null) { | ||
logger.error({ | ||
at: 'FundingHandler#handleFundingSample', | ||
message: result.rows[0].result.errors[i], | ||
update, | ||
}); | ||
continue; | ||
} | ||
|
||
const perpetualMarket: | ||
PerpetualMarketFromDatabase | undefined = perpetualMarkets.get(update.perpetualId.toString()); | ||
if (perpetualMarket === undefined) { | ||
logger.error({ | ||
at: 'FundingHandler#handleFundingSample', | ||
message: 'Received FundingUpdate with unknown perpetualId.', | ||
update, | ||
}); | ||
continue; | ||
} | ||
|
||
switch (this.event.type) { | ||
case FundingEventV1_Type.TYPE_PREMIUM_SAMPLE: | ||
promises[i] = NextFundingCache.addFundingSample( | ||
perpetualMarket.ticker, | ||
new Big(protocolTranslations.funding8HourValuePpmTo1HourRate(update.fundingValuePpm)), | ||
redisClient, | ||
); | ||
break; | ||
case FundingEventV1_Type.TYPE_FUNDING_RATE_AND_INDEX: | ||
// clear the cache for the predicted next funding rate | ||
promises[i] = NextFundingCache.clearFundingSamples(perpetualMarket.ticker, redisClient); | ||
break; | ||
default: | ||
logger.error({ | ||
at: 'FundingHandler#handle', | ||
message: 'Received unknown FundingEvent type.', | ||
event: this.event, | ||
}); | ||
} | ||
} | ||
|
||
await Promise.all(promises); | ||
return []; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new handleViaSqlFunction
method executes a raw SQL query using the dydx_funding_handler
function and handles the result. It's important to ensure that the SQL function dydx_funding_handler
is correctly implemented and tested, as SQL errors could lead to unexpected behavior or data corruption. Also, consider handling SQL injection attacks by parameterizing the query instead of using string interpolation.
- `SELECT dydx_funding_handler(
- ${this.block.height},
- '${this.block.time?.toISOString()}',
- '${JSON.stringify(FundingEventV1.decode(eventDataBinary))}',
- ${this.indexerTendermintEvent.eventIndex},
- ${transactionIndex}
- ) AS result;`
+ `SELECT dydx_funding_handler(
+ $1,
+ $2,
+ $3,
+ $4,
+ $5
+ ) AS result;`,
+ [this.block.height, this.block.time?.toISOString(), JSON.stringify(FundingEventV1.decode(eventDataBinary)), this.indexerTendermintEvent.eventIndex, transactionIndex]
Commitable suggestion
[!IMPORTANT]
Carefully review the code before committing. Make sure it correctly replaces the highlighted code, has no missing lines and indentaion issues.
private async handleViaSqlFunction(): Promise<ConsolidatedKafkaEvent[]> { | |
const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes; | |
const transactionIndex: number = indexerTendermintEventToTransactionIndex( | |
this.indexerTendermintEvent, | |
); | |
const result: pg.QueryResult = await storeHelpers.rawQuery( | |
`SELECT dydx_funding_handler( | |
${this.block.height}, | |
'${this.block.time?.toISOString()}', | |
'${JSON.stringify(FundingEventV1.decode(eventDataBinary))}', | |
${this.indexerTendermintEvent.eventIndex}, | |
${transactionIndex} | |
) AS result;`, | |
{ txId: this.txId }, | |
).catch((error: Error) => { | |
logger.error({ | |
at: 'FundingHandler#handleViaSqlFunction', | |
message: 'Failed to handle FundingEventV1', | |
error, | |
}); | |
throw error; | |
}); | |
const perpetualMarkets: | |
Map<string, PerpetualMarketFromDatabase> = new Map<string, PerpetualMarketFromDatabase>(); | |
for (const [key, perpetualMarket] of Object.entries(result.rows[0].result.perpetual_markets)) { | |
perpetualMarkets.set( | |
key, | |
PerpetualMarketModel.fromJson(perpetualMarket as object) as PerpetualMarketFromDatabase, | |
); | |
} | |
const promises: Promise<number>[] = new Array<Promise<number>>(this.event.updates.length); | |
for (let i: number = 0; i < this.event.updates.length; i++) { | |
const update: FundingUpdateV1 = this.event.updates[i]; | |
if (result.rows[0].result.errors[i] != null) { | |
logger.error({ | |
at: 'FundingHandler#handleFundingSample', | |
message: result.rows[0].result.errors[i], | |
update, | |
}); | |
continue; | |
} | |
const perpetualMarket: | |
PerpetualMarketFromDatabase | undefined = perpetualMarkets.get(update.perpetualId.toString()); | |
if (perpetualMarket === undefined) { | |
logger.error({ | |
at: 'FundingHandler#handleFundingSample', | |
message: 'Received FundingUpdate with unknown perpetualId.', | |
update, | |
}); | |
continue; | |
} | |
switch (this.event.type) { | |
case FundingEventV1_Type.TYPE_PREMIUM_SAMPLE: | |
promises[i] = NextFundingCache.addFundingSample( | |
perpetualMarket.ticker, | |
new Big(protocolTranslations.funding8HourValuePpmTo1HourRate(update.fundingValuePpm)), | |
redisClient, | |
); | |
break; | |
case FundingEventV1_Type.TYPE_FUNDING_RATE_AND_INDEX: | |
// clear the cache for the predicted next funding rate | |
promises[i] = NextFundingCache.clearFundingSamples(perpetualMarket.ticker, redisClient); | |
break; | |
default: | |
logger.error({ | |
at: 'FundingHandler#handle', | |
message: 'Received unknown FundingEvent type.', | |
event: this.event, | |
}); | |
} | |
} | |
await Promise.all(promises); | |
return []; | |
} | |
private async handleViaSqlFunction(): Promise<ConsolidatedKafkaEvent[]> { | |
const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes; | |
const transactionIndex: number = indexerTendermintEventToTransactionIndex( | |
this.indexerTendermintEvent, | |
); | |
const result: pg.QueryResult = await storeHelpers.rawQuery( | |
`SELECT dydx_funding_handler( | |
$1, | |
$2, | |
$3, | |
$4, | |
$5 | |
) AS result;`, | |
[this.block.height, this.block.time?.toISOString(), JSON.stringify(FundingEventV1.decode(eventDataBinary)), this.indexerTendermintEvent.eventIndex, transactionIndex], | |
{ txId: this.txId }, | |
).catch((error: Error) => { | |
logger.error({ | |
at: 'FundingHandler#handleViaSqlFunction', | |
message: 'Failed to handle FundingEventV1', | |
error, | |
}); | |
throw error; | |
}); | |
const perpetualMarkets: | |
Map<string, PerpetualMarketFromDatabase> = new Map<string, PerpetualMarketFromDatabase>(); | |
for (const [key, perpetualMarket] of Object.entries(result.rows[0].result.perpetual_markets)) { | |
perpetualMarkets.set( | |
key, | |
PerpetualMarketModel.fromJson(perpetualMarket as object) as PerpetualMarketFromDatabase, | |
); | |
} | |
const promises: Promise<number>[] = new Array<Promise<number>>(this.event.updates.length); | |
for (let i: number = 0; i < this.event.updates.length; i++) { | |
const update: FundingUpdateV1 = this.event.updates[i]; | |
if (result.rows[0].result.errors[i] != null) { | |
logger.error({ | |
at: 'FundingHandler#handleFundingSample', | |
message: result.rows[0].result.errors[i], | |
update, | |
}); | |
continue; | |
} | |
const perpetualMarket: | |
PerpetualMarketFromDatabase | undefined = perpetualMarkets.get(update.perpetualId.toString()); | |
if (perpetualMarket === undefined) { | |
logger.error({ | |
at: 'FundingHandler#handleFundingSample', | |
message: 'Received FundingUpdate with unknown perpetualId.', | |
update, | |
}); | |
continue; | |
} | |
switch (this.event.type) { | |
case FundingEventV1_Type.TYPE_PREMIUM_SAMPLE: | |
promises[i] = NextFundingCache.addFundingSample( | |
perpetualMarket.ticker, | |
new Big(protocolTranslations.funding8HourValuePpmTo1HourRate(update.fundingValuePpm)), | |
redisClient, | |
); | |
break; | |
case FundingEventV1_Type.TYPE_FUNDING_RATE_AND_INDEX: | |
// clear the cache for the predicted next funding rate | |
promises[i] = NextFundingCache.clearFundingSamples(perpetualMarket.ticker, redisClient); | |
break; | |
default: | |
logger.error({ | |
at: 'FundingHandler#handle', | |
message: 'Received unknown FundingEvent type.', | |
event: this.event, | |
}); | |
} | |
} | |
await Promise.all(promises); | |
return []; | |
} |
/** | ||
Parameters: | ||
- block_height: the height of the block being processing. | ||
- block_time: the time of the block being processed. | ||
- event_data: The 'data' field of the IndexerTendermintEvent (https://github.com/dydxprotocol/v4-proto/blob/8d35c86/dydxprotocol/indexer/indexer_manager/event.proto#L25) | ||
converted to JSON format. Conversion to JSON is expected to be done by JSON.stringify. | ||
- event_index: The 'event_index' of the IndexerTendermintEvent. | ||
- transaction_index: The transaction_index of the IndexerTendermintEvent after the conversion that takes into | ||
account the block_event (https://github.com/dydxprotocol/indexer/blob/cc70982/services/ender/src/lib/helper.ts#L33) | ||
Returns: JSON object containing fields: | ||
- perpetual_markets: A mapping from perpetual market id to the associated perpetual market in perpetual-market-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/perpetual-market-model.ts). | ||
- errors: An array containing an error string (or NULL if no error occurred) for each FundingEventUpdate. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function documentation is clear and provides a good overview of the function's purpose, parameters, and return value. However, it would be beneficial to include a brief explanation of the function's logic and any assumptions or constraints.
Changelist
[IND-471] Update ender funding to execute updates via a SQL function.
Test Plan
Updated existing tests and added new tests.
Author/Reviewer Checklist
state-breaking
label.PrepareProposal
orProcessProposal
, manually add the labelproposal-breaking
.feature:[feature-name]
.backport/[branch-name]
.refactor
,chore
,bug
.