-
Notifications
You must be signed in to change notification settings - Fork 117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[IND-475] Update ender update clob pair
handler to execute via a SQL function.
#752
Conversation
IND-475 Ender update clob pair handler SQL pushdown
Update |
WalkthroughThe changes introduced in this diff primarily focus on enhancing the Changes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 1
Configuration used: CodeRabbit UI
Files selected for processing (7)
- indexer/services/ender/tests/handlers/update-clob-pair-handler.test.ts (2 hunks)
- indexer/services/ender/tests/scripts/scripts.test.ts (2 hunks)
- indexer/services/ender/src/config.ts (1 hunks)
- indexer/services/ender/src/handlers/update-clob-pair-handler.ts (3 hunks)
- indexer/services/ender/src/helpers/postgres/postgres-functions.ts (2 hunks)
- indexer/services/ender/src/scripts/dydx_clob_pair_status_to_market_status.sql (1 hunks)
- indexer/services/ender/src/scripts/dydx_update_clob_pair_handler.sql (1 hunks)
Files skipped from review due to trivial changes (3)
- indexer/services/ender/tests/scripts/scripts.test.ts
- indexer/services/ender/src/config.ts
- indexer/services/ender/src/helpers/postgres/postgres-functions.ts
Additional comments: 7
indexer/services/ender/src/scripts/dydx_clob_pair_status_to_market_status.sql (1)
- 1-13: The function
dydx_clob_pair_status_to_market_status
is well written and follows best practices for PostgreSQL functions. It uses aCASE
statement to map different values ofstatus
to corresponding market statuses and raises an exception if thestatus
value is not recognized. The function is declared asIMMUTABLE
andPARALLEL SAFE
, which means it can be used in parallel query execution and its result depends only on its input parameters.indexer/services/ender/__tests__/handlers/update-clob-pair-handler.test.ts (1)
- 33-37: The new imports are correctly added and used in the test suite.
indexer/services/ender/src/handlers/update-clob-pair-handler.ts (4)
26-33: The
internalHandle()
method has been updated to conditionally call eitherhandleViaSqlFunction()
orhandleViaKnex()
based on the configuration optionUSE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION
. Ensure that this configuration option is properly set in all environments where this code will be deployed.35-60: The
handleViaSqlFunction()
method has been added to handle the event using a SQL function. It executes a raw SQL query that calls thedydx_update_clob_pair_handler
function with the event data as an argument. If the query fails, it logs an error and rethrows the exception. After the query is successful, it generates a Kafka event with the updated perpetual market data. Ensure that the SQL functiondydx_update_clob_pair_handler
exists in the database and works as expected.62-65: The
handleViaKnex()
method has been added to handle the event using Knex. It calls theupdateClobPair()
method and generates a Kafka event with the updated perpetual market data.89-95: The error message in the
logAndThrowParseMessageError()
method has been updated. Ensure that this new message is more informative and helpful for debugging.indexer/services/ender/src/scripts/dydx_update_clob_pair_handler.sql (1)
- 1-38: The SQL function
dydx_update_clob_pair_handler
seems to be well written and follows good practices. It uses a transaction to ensure atomicity of the update operation. It also checks if the update operation affected any rows and raises an exception if not, which is a good practice for error handling. The function also returns the updated record in a JSON format, which can be useful for logging or further processing.However, there are a few points that need to be addressed:
The function assumes that the
event_data
JSON object will always contain the required fields. If any of these fields are missing, the function will raise an error. It would be better to add some error handling to check if these fields exist in theevent_data
JSON object before trying to access them.The function uses the
dydx_clob_pair_status_to_market_status
anddydx_from_jsonlib_long
functions, but it's not clear from the provided context whether these functions exist and are correctly implemented. Please verify that these functions exist and work as expected.The function updates the
perpetual_markets
table directly based on the values in theevent_data
JSON object. This could potentially lead to SQL injection attacks if theevent_data
JSON object contains malicious data. It would be better to sanitize the input data before using it in the SQL query.Here is a refined version of the function with added error handling and input sanitization:
CREATE OR REPLACE FUNCTION dydx_update_clob_pair_handler(event_data jsonb) RETURNS jsonb AS $$ DECLARE row_count integer; clob_pair_id bigint; perpetual_market_record perpetual_markets%ROWTYPE; BEGIN IF event_data ? 'clobPairId' AND event_data ? 'status' AND event_data ? 'quantumConversionExponent' AND event_data ? 'subticksPerTick' AND event_data ? 'stepBaseQuantums' THEN clob_pair_id = (event_data->'clobPairId')::bigint; perpetual_market_record."status" = dydx_clob_pair_status_to_market_status(event_data->'status'); perpetual_market_record."quantumConversionExponent" = (event_data->'quantumConversionExponent')::integer; perpetual_market_record."subticksPerTick" = (event_data->'subticksPerTick')::integer; perpetual_market_record."stepBaseQuantums" = dydx_from_jsonlib_long(event_data->'stepBaseQuantums'); UPDATE perpetual_markets SET "status" = perpetual_market_record."status", "quantumConversionExponent" = perpetual_market_record."quantumConversionExponent", "subticksPerTick" = perpetual_market_record."subticksPerTick", "stepBaseQuantums" = perpetual_market_record."stepBaseQuantums" WHERE "clobPairId" = clob_pair_id RETURNING * INTO perpetual_market_record; IF NOT FOUND THEN RAISE EXCEPTION 'Could not find perpetual market with corresponding clobPairId %', event_data; END IF; RETURN jsonb_build_object( 'perpetual_market', dydx_to_jsonb(perpetual_market_record) ); ELSE RAISE EXCEPTION 'Missing required fields in event_data JSON object'; END IF; END; $$ LANGUAGE plpgsql;
it.each([ | ||
[ | ||
'via knex', | ||
false, | ||
], | ||
[ | ||
'via SQL function', | ||
true, | ||
], | ||
])( | ||
'updates an existing perpetual market (%s)', | ||
async ( | ||
_name: string, | ||
useSqlFunction: boolean, | ||
) => { | ||
config.USE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION = useSqlFunction; | ||
const transactionIndex: number = 0; | ||
const kafkaMessage: KafkaMessage = createKafkaMessageFromUpdateClobPairEvent({ | ||
updatePerpetualEvent: defaultUpdateClobPairEvent, | ||
transactionIndex, | ||
height: defaultHeight, | ||
time: defaultTime, | ||
txHash: defaultTxHash, | ||
}); | ||
const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); | ||
await onMessage(kafkaMessage); | ||
|
||
const perpetualMarketId: string = perpetualMarketRefresher.getPerpetualMarketFromClobPairId( | ||
defaultUpdateClobPairEvent.clobPairId.toString(), | ||
)!.id; | ||
const perpetualMarket: | ||
PerpetualMarketFromDatabase | undefined = await PerpetualMarketTable.findById( | ||
perpetualMarketId, | ||
); | ||
expect(perpetualMarket).toEqual(expect.objectContaining({ | ||
clobPairId: defaultUpdateClobPairEvent.clobPairId.toString(), | ||
status: protocolTranslations.clobStatusToMarketStatus(defaultUpdateClobPairEvent.status), | ||
quantumConversionExponent: defaultUpdateClobPairEvent.quantumConversionExponent, | ||
subticksPerTick: defaultUpdateClobPairEvent.subticksPerTick, | ||
stepBaseQuantums: defaultUpdateClobPairEvent.stepBaseQuantums.toNumber(), | ||
})); | ||
expectTimingStats(); | ||
expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]); | ||
}); | ||
const perpetualMarketId: string = perpetualMarketRefresher.getPerpetualMarketFromClobPairId( | ||
defaultUpdateClobPairEvent.clobPairId.toString(), | ||
)!.id; | ||
const perpetualMarket: | ||
PerpetualMarketFromDatabase | undefined = await PerpetualMarketTable.findById( | ||
perpetualMarketId, | ||
); | ||
expect(perpetualMarket).toEqual(expect.objectContaining({ | ||
clobPairId: defaultUpdateClobPairEvent.clobPairId.toString(), | ||
status: protocolTranslations.clobStatusToMarketStatus(defaultUpdateClobPairEvent.status), | ||
quantumConversionExponent: defaultUpdateClobPairEvent.quantumConversionExponent, | ||
subticksPerTick: defaultUpdateClobPairEvent.subticksPerTick, | ||
stepBaseQuantums: defaultUpdateClobPairEvent.stepBaseQuantums.toNumber(), | ||
})); | ||
if (!useSqlFunction) { | ||
expectTimingStats(); | ||
} | ||
expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]); | ||
}); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new test case is well-structured and covers the new functionality. It tests both the Knex and SQL function paths for updating the perpetual market. The test checks the updated perpetual market and the Kafka message sent. However, ensure that the USE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION
configuration is reset to its original value after the test to prevent side effects on other tests.
+ const originalConfigValue = config.USE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION;
config.USE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION = useSqlFunction;
...
// Rest of the test
...
+ config.USE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION = originalConfigValue;
Commitable suggestion
[!IMPORTANT]
Carefully review the code before committing. Make sure it correctly replaces the highlighted code, has no missing lines and indentaion issues.
it.each([ | |
[ | |
'via knex', | |
false, | |
], | |
[ | |
'via SQL function', | |
true, | |
], | |
])( | |
'updates an existing perpetual market (%s)', | |
async ( | |
_name: string, | |
useSqlFunction: boolean, | |
) => { | |
config.USE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION = useSqlFunction; | |
const transactionIndex: number = 0; | |
const kafkaMessage: KafkaMessage = createKafkaMessageFromUpdateClobPairEvent({ | |
updatePerpetualEvent: defaultUpdateClobPairEvent, | |
transactionIndex, | |
height: defaultHeight, | |
time: defaultTime, | |
txHash: defaultTxHash, | |
}); | |
const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); | |
await onMessage(kafkaMessage); | |
const perpetualMarketId: string = perpetualMarketRefresher.getPerpetualMarketFromClobPairId( | |
defaultUpdateClobPairEvent.clobPairId.toString(), | |
)!.id; | |
const perpetualMarket: | |
PerpetualMarketFromDatabase | undefined = await PerpetualMarketTable.findById( | |
perpetualMarketId, | |
); | |
expect(perpetualMarket).toEqual(expect.objectContaining({ | |
clobPairId: defaultUpdateClobPairEvent.clobPairId.toString(), | |
status: protocolTranslations.clobStatusToMarketStatus(defaultUpdateClobPairEvent.status), | |
quantumConversionExponent: defaultUpdateClobPairEvent.quantumConversionExponent, | |
subticksPerTick: defaultUpdateClobPairEvent.subticksPerTick, | |
stepBaseQuantums: defaultUpdateClobPairEvent.stepBaseQuantums.toNumber(), | |
})); | |
expectTimingStats(); | |
expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]); | |
}); | |
const perpetualMarketId: string = perpetualMarketRefresher.getPerpetualMarketFromClobPairId( | |
defaultUpdateClobPairEvent.clobPairId.toString(), | |
)!.id; | |
const perpetualMarket: | |
PerpetualMarketFromDatabase | undefined = await PerpetualMarketTable.findById( | |
perpetualMarketId, | |
); | |
expect(perpetualMarket).toEqual(expect.objectContaining({ | |
clobPairId: defaultUpdateClobPairEvent.clobPairId.toString(), | |
status: protocolTranslations.clobStatusToMarketStatus(defaultUpdateClobPairEvent.status), | |
quantumConversionExponent: defaultUpdateClobPairEvent.quantumConversionExponent, | |
subticksPerTick: defaultUpdateClobPairEvent.subticksPerTick, | |
stepBaseQuantums: defaultUpdateClobPairEvent.stepBaseQuantums.toNumber(), | |
})); | |
if (!useSqlFunction) { | |
expectTimingStats(); | |
} | |
expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]); | |
}); | |
}); | |
const originalConfigValue = config.USE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION; | |
config.USE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION = useSqlFunction; | |
... | |
// Rest of the test | |
... | |
config.USE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION = originalConfigValue; |
|
||
const perpetualMarket: PerpetualMarketFromDatabase = PerpetualMarketModel.fromJson( | ||
result.rows[0].result.perpetual_market) as PerpetualMarketFromDatabase; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to update the perpetualMarketRefresher
in-memory cache here? Looks like we never added tests for this in the knex handler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Expanded the test and fixed the code.
@@ -0,0 +1,13 @@ | |||
CREATE OR REPLACE FUNCTION dydx_clob_pair_status_to_market_status(status jsonb) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Add a comment with a link to the enum definition in the proto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Configuration used: CodeRabbit UI
Files selected for processing (1)
- indexer/services/ender/src/scripts/dydx_clob_pair_status_to_market_status.sql (1 hunks)
Additional comments: 1
indexer/services/ender/src/scripts/dydx_clob_pair_status_to_market_status.sql (1)
- 1-21: The function
dydx_clob_pair_status_to_market_status
is well-documented and the logic is clear. It correctly maps the ClobPairStatus values to their corresponding market status values and raises an exception for unrecognized statuses. The function is also marked asIMMUTABLE
andPARALLEL SAFE
, which is appropriate as it does not modify the database and can be safely executed in parallel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 1
Configuration used: CodeRabbit UI
Files selected for processing (2)
- indexer/services/ender/tests/handlers/update-clob-pair-handler.test.ts (3 hunks)
- indexer/services/ender/src/handlers/update-clob-pair-handler.ts (3 hunks)
Additional comments: 5
indexer/services/ender/__tests__/handlers/update-clob-pair-handler.test.ts (2)
32-35: The import statements have been updated to include the necessary modules for the new functionality. Ensure that these modules are available in the project's dependencies.
94-136: The test case has been updated to use a parameterized test with two scenarios: one using
knex
and the other using an SQL function. This is a good practice as it allows for more comprehensive testing of the functionality. The test case now checks if the perpetual market is updated correctly and sends a Kafka message. Ensure that the SQL function andknex
both perform the same operation and that the test case checks for the correct output in both scenarios.indexer/services/ender/src/handlers/update-clob-pair-handler.ts (3)
26-33: The
internalHandle()
method has been updated to conditionally call eitherhandleViaSqlFunction()
orhandleViaKnex()
. This is based on theUSE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION
configuration option. Ensure that this configuration option is properly set in all environments where this code will be deployed.35-62: The
handleViaSqlFunction()
method has been added. It handles the event using a SQL function. It executes a raw SQL query that calls thedydx_update_clob_pair_handler
function with the event data. If the query fails, it logs an error and rethrows the error. After the query, it updates theperpetualMarketRefresher
in-memory cache with the updated perpetual market record. It then returns aConsolidatedKafkaEvent
generated from the updated perpetual market record.26-67: > Note: This review was outside of the patch, so it was mapped to the patch with the greatest overlap. Original lines [64-90]
The
handleViaKnex()
method has been added. It handles the event using Knex. It updates theperpetual_markets
table with the event data and then updates theperpetualMarketRefresher
in-memory cache with the updated perpetual market record. It then returns aConsolidatedKafkaEvent
generated from the updated perpetual market record.
|
||
if (perpetualMarket === undefined) { | ||
this.logAndThrowParseMessageError( | ||
'Could not find perpetual market with corresponding updatePerpetualEvent.id', | ||
'Could not find perpetual market with corresponding clobPairId', | ||
{ event: this.event }, | ||
); | ||
// This assert should never be hit because a ParseMessageError should be thrown above. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message in the logAndThrowParseMessageError()
method has been updated. It now includes the event data in the error message. This will help with debugging if an error occurs.
…L function. (#752) * [IND-475] Update ender `update clob pair` handler to execute via a SQL function.
…L function. (#752) * [IND-475] Update ender `update clob pair` handler to execute via a SQL function.
…L function. (#752) * [IND-475] Update ender `update clob pair` handler to execute via a SQL function.
…L function. (#752) * [IND-475] Update ender `update clob pair` handler to execute via a SQL function.
Changelist
[IND-475] Update ender
update clob pair
handler to execute via a SQL function.Test Plan
Updated existing tests and added new tests.
Author/Reviewer Checklist
state-breaking
label.PrepareProposal
orProcessProposal
, manually add the labelproposal-breaking
.feature:[feature-name]
.backport/[branch-name]
.refactor
,chore
,bug
.