Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IND-475] Update ender update clob pair handler to execute via a SQL function. #752

Merged
merged 3 commits into from
Nov 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
dbHelpers,
liquidityTierRefresher,
perpetualMarketRefresher,
protocolTranslations,
testMocks,
} from '@dydxprotocol-indexer/postgres';
import { updateBlockCache } from '../../src/caches/block-cache';
Expand Down Expand Up @@ -33,6 +32,7 @@ import { createKafkaMessage, producer } from '@dydxprotocol-indexer/kafka';
import { KafkaMessage } from 'kafkajs';
import { onMessage } from '../../src/lib/on-message';
import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions';
import config from '../../src/config';

describe('update-clob-pair-handler', () => {
beforeAll(async () => {
Expand Down Expand Up @@ -91,35 +91,49 @@ describe('update-clob-pair-handler', () => {
});
});

it('updates an existing perpetual market', async () => {
const transactionIndex: number = 0;
const kafkaMessage: KafkaMessage = createKafkaMessageFromUpdateClobPairEvent({
updatePerpetualEvent: defaultUpdateClobPairEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])(
'updates an existing perpetual market (%s)',
async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION = useSqlFunction;
const transactionIndex: number = 0;
const kafkaMessage: KafkaMessage = createKafkaMessageFromUpdateClobPairEvent({
updatePerpetualEvent: defaultUpdateClobPairEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});
const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send');
await onMessage(kafkaMessage);

const perpetualMarketId: string = perpetualMarketRefresher.getPerpetualMarketFromClobPairId(
defaultUpdateClobPairEvent.clobPairId.toString(),
)!.id;
const perpetualMarket:
PerpetualMarketFromDatabase | undefined = await PerpetualMarketTable.findById(
perpetualMarketId,
);

expect(perpetualMarket).toEqual(
perpetualMarketRefresher.getPerpetualMarketFromId(perpetualMarketId));

if (!useSqlFunction) {
expectTimingStats();
}
expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]);
});
const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send');
await onMessage(kafkaMessage);

const perpetualMarketId: string = perpetualMarketRefresher.getPerpetualMarketFromClobPairId(
defaultUpdateClobPairEvent.clobPairId.toString(),
)!.id;
const perpetualMarket:
PerpetualMarketFromDatabase | undefined = await PerpetualMarketTable.findById(
perpetualMarketId,
);
expect(perpetualMarket).toEqual(expect.objectContaining({
clobPairId: defaultUpdateClobPairEvent.clobPairId.toString(),
status: protocolTranslations.clobStatusToMarketStatus(defaultUpdateClobPairEvent.status),
quantumConversionExponent: defaultUpdateClobPairEvent.quantumConversionExponent,
subticksPerTick: defaultUpdateClobPairEvent.subticksPerTick,
stepBaseQuantums: defaultUpdateClobPairEvent.stepBaseQuantums.toNumber(),
}));
expectTimingStats();
expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]);
});
});

function expectTimingStats() {
Expand Down
9 changes: 9 additions & 0 deletions indexer/services/ender/__tests__/scripts/scripts.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
} from '@dydxprotocol-indexer/v4-protos';
import {
BUFFER_ENCODING_UTF_8,
CLOB_STATUS_TO_MARKET_STATUS,
dbHelpers,
AssetPositionTable,
PerpetualPositionTable,
Expand Down Expand Up @@ -377,6 +378,14 @@ describe('SQL Function Tests', () => {
);
});

it('dydx_clob_pair_status_to_market_status should convert all statuses', async () => {
for (const [key, value] of Object.entries(CLOB_STATUS_TO_MARKET_STATUS)) {
const result = await getSingleRawQueryResultRow(
`SELECT dydx_clob_pair_status_to_market_status('${key}') AS result`);
expect(result).toEqual(value);
}
});

it('dydx_create_transaction.sql should insert a transaction and return correct jsonb', async () => {
const transactionHash: string = 'txnhash';
const blockHeight: string = '1';
Expand Down
3 changes: 3 additions & 0 deletions indexer/services/ender/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ export const configSchema = {
USE_SUBACCOUNT_UPDATE_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_SQL_FUNCTION_TO_CREATE_INITIAL_ROWS: parseBoolean({
default: true,
}),
Expand Down
48 changes: 46 additions & 2 deletions indexer/services/ender/src/handlers/update-clob-pair-handler.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import assert from 'assert';

import { logger } from '@dydxprotocol-indexer/base';
import {
PerpetualMarketFromDatabase, PerpetualMarketTable, perpetualMarketRefresher, protocolTranslations,
PerpetualMarketFromDatabase,
PerpetualMarketModel,
PerpetualMarketTable,
perpetualMarketRefresher,
protocolTranslations,
storeHelpers,
} from '@dydxprotocol-indexer/postgres';
import { UpdateClobPairEventV1 } from '@dydxprotocol-indexer/v4-protos';
import * as pg from 'pg';

import config from '../config';
import { generatePerpetualMarketMessage } from '../helpers/kafka-helper';
import { ConsolidatedKafkaEvent } from '../lib/types';
import { Handler } from './handler';
Expand All @@ -18,6 +26,42 @@ export class UpdateClobPairHandler extends Handler<UpdateClobPairEventV1> {

// eslint-disable-next-line @typescript-eslint/require-await
public async internalHandle(): Promise<ConsolidatedKafkaEvent[]> {
if (config.USE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION) {
return this.handleViaSqlFunction();
}
return this.handleViaKnex();
}

private async handleViaSqlFunction(): Promise<ConsolidatedKafkaEvent[]> {
const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes;
const result: pg.QueryResult = await storeHelpers.rawQuery(
`SELECT dydx_update_clob_pair_handler(
'${JSON.stringify(UpdateClobPairEventV1.decode(eventDataBinary))}'
) AS result;`,
{ txId: this.txId },
).catch((error: Error) => {
logger.error({
at: 'UpdateClobPairHandler#handleViaSqlFunction',
message: 'Failed to handle UpdateClobPairEventV1',
error,
});

throw error;
});

const perpetualMarket: PerpetualMarketFromDatabase = PerpetualMarketModel.fromJson(
result.rows[0].result.perpetual_market) as PerpetualMarketFromDatabase;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to update the perpetualMarketRefresher in-memory cache here? Looks like we never added tests for this in the knex handler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expanded the test and fixed the code.

perpetualMarketRefresher.upsertPerpetualMarket(perpetualMarket);

return [
this.generateConsolidatedMarketKafkaEvent(
JSON.stringify(generatePerpetualMarketMessage([perpetualMarket])),
),
];
}

private async handleViaKnex(): Promise<ConsolidatedKafkaEvent[]> {
const perpetualMarket:
PerpetualMarketFromDatabase = await this.runFuncWithTimingStatAndErrorLogging(
this.updateClobPair(),
Expand Down Expand Up @@ -47,7 +91,7 @@ export class UpdateClobPairHandler extends Handler<UpdateClobPairEventV1> {

if (perpetualMarket === undefined) {
this.logAndThrowParseMessageError(
'Could not find perpetual market with corresponding updatePerpetualEvent.id',
'Could not find perpetual market with corresponding clobPairId',
{ event: this.event },
);
// This assert should never be hit because a ParseMessageError should be thrown above.
Comment on lines 91 to 97
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message in the logAndThrowParseMessageError() method has been updated. It now includes the event data in the error message. This will help with debugging if an error occurs.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const scripts: string[] = [
'create_extension_pg_stat_statements.sql',
'create_extension_uuid_ossp.sql',
'dydx_asset_create_handler.sql',
'dydx_clob_pair_status_to_market_status.sql',
'dydx_market_create_handler.sql',
'dydx_market_modify_handler.sql',
'dydx_market_price_update_handler.sql',
Expand All @@ -48,6 +49,7 @@ const scripts: string[] = [
'dydx_perpetual_position_and_order_side_matching.sql',
'dydx_subaccount_update_handler.sql',
'dydx_trim_scale.sql',
'dydx_update_clob_pair_handler.sql',
'dydx_uuid.sql',
'dydx_uuid_from_asset_position_parts.sql',
'dydx_uuid_from_fill_event_parts.sql',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/**
Returns the market status (https://github.com/dydxprotocol/v4-chain/blob/ea4f6895a73627aaa9bc5e21eed1ba51313b1ce4/indexer/packages/postgres/src/types/perpetual-market-types.ts#L60)
from the clob pair status (https://github.com/dydxprotocol/v4-chain/blob/ea4f6895a73627aaa9bc5e21eed1ba51313b1ce4/proto/dydxprotocol/indexer/protocol/v1/clob.proto#L157).
The conversion is equivalent to https://github.com/dydxprotocol/v4-chain/blob/ea4f6895a73627aaa9bc5e21eed1ba51313b1ce4/indexer/packages/postgres/src/lib/protocol-translations.ts#L351.

Parameters:
- status: the ClobPairStatus (https://github.com/dydxprotocol/v4-chain/blob/ea4f6895a73627aaa9bc5e21eed1ba51313b1ce4/proto/dydxprotocol/indexer/protocol/v1/clob.proto#L157)
*/
CREATE OR REPLACE FUNCTION dydx_clob_pair_status_to_market_status(status jsonb)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Add a comment with a link to the enum definition in the proto.

RETURNS text AS $$
BEGIN
CASE status
WHEN '1'::jsonb THEN RETURN 'ACTIVE'; /** CLOB_PAIR_STATUS_ACTIVE */
WHEN '2'::jsonb THEN RETURN 'PAUSED'; /** CLOB_PAIR_STATUS_PAUSED */
WHEN '3'::jsonb THEN RETURN 'CANCEL_ONLY'; /** CLOB_PAIR_STATUS_CANCEL_ONLY */
WHEN '4'::jsonb THEN RETURN 'POST_ONLY'; /** CLOB_PAIR_STATUS_POST_ONLY */
WHEN '5'::jsonb THEN RETURN 'INITIALIZING'; /** CLOB_PAIR_STATUS_INITIALIZING */
ELSE RAISE EXCEPTION 'Invalid clob pair status: %', status;
END CASE;
END;
$$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
Parameters:
- event_data: The 'data' field of the IndexerTendermintEvent (https://github.com/dydxprotocol/v4-proto/blob/8d35c86/dydxprotocol/indexer/indexer_manager/event.proto#L25)
converted to JSON format. Conversion to JSON is expected to be done by JSON.stringify.
Returns: JSON object containing fields:
- perpetual_market: The updated perpetual market in perpetual-market-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/perpetual-market-model.ts).
*/
CREATE OR REPLACE FUNCTION dydx_update_clob_pair_handler(event_data jsonb) RETURNS jsonb AS $$
DECLARE
row_count integer;
clob_pair_id bigint;
perpetual_market_record perpetual_markets%ROWTYPE;
BEGIN
clob_pair_id = (event_data->'clobPairId')::bigint;
perpetual_market_record."status" = dydx_clob_pair_status_to_market_status(event_data->'status');
perpetual_market_record."quantumConversionExponent" = (event_data->'quantumConversionExponent')::integer;
perpetual_market_record."subticksPerTick" = (event_data->'subticksPerTick')::integer;
perpetual_market_record."stepBaseQuantums" = dydx_from_jsonlib_long(event_data->'stepBaseQuantums');

UPDATE perpetual_markets
SET
"status" = perpetual_market_record."status",
"quantumConversionExponent" = perpetual_market_record."quantumConversionExponent",
"subticksPerTick" = perpetual_market_record."subticksPerTick",
"stepBaseQuantums" = perpetual_market_record."stepBaseQuantums"
WHERE "clobPairId" = clob_pair_id
RETURNING * INTO perpetual_market_record;

IF NOT FOUND THEN
RAISE EXCEPTION 'Could not find perpetual market with corresponding clobPairId %', event_data;
END IF;

RETURN jsonb_build_object(
'perpetual_market',
dydx_to_jsonb(perpetual_market_record)
);
END;
$$ LANGUAGE plpgsql;