Skip to content

Commit

Permalink
[IND-475] Update ender update clob pair handler to execute via a SQ…
Browse files Browse the repository at this point in the history
…L function. (#752)

* [IND-475] Update ender `update clob pair` handler to execute via a SQL function.
  • Loading branch information
lcwik authored and vincentwschau committed Nov 9, 2023
1 parent b205524 commit e4b45de
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
dbHelpers,
liquidityTierRefresher,
perpetualMarketRefresher,
protocolTranslations,
testMocks,
} from '@dydxprotocol-indexer/postgres';
import { updateBlockCache } from '../../src/caches/block-cache';
Expand Down Expand Up @@ -33,6 +32,7 @@ import { createKafkaMessage, producer } from '@dydxprotocol-indexer/kafka';
import { KafkaMessage } from 'kafkajs';
import { onMessage } from '../../src/lib/on-message';
import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions';
import config from '../../src/config';

describe('update-clob-pair-handler', () => {
beforeAll(async () => {
Expand Down Expand Up @@ -91,35 +91,49 @@ describe('update-clob-pair-handler', () => {
});
});

it('updates an existing perpetual market', async () => {
const transactionIndex: number = 0;
const kafkaMessage: KafkaMessage = createKafkaMessageFromUpdateClobPairEvent({
updatePerpetualEvent: defaultUpdateClobPairEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])(
'updates an existing perpetual market (%s)',
async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION = useSqlFunction;
const transactionIndex: number = 0;
const kafkaMessage: KafkaMessage = createKafkaMessageFromUpdateClobPairEvent({
updatePerpetualEvent: defaultUpdateClobPairEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});
const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send');
await onMessage(kafkaMessage);

const perpetualMarketId: string = perpetualMarketRefresher.getPerpetualMarketFromClobPairId(
defaultUpdateClobPairEvent.clobPairId.toString(),
)!.id;
const perpetualMarket:
PerpetualMarketFromDatabase | undefined = await PerpetualMarketTable.findById(
perpetualMarketId,
);

expect(perpetualMarket).toEqual(
perpetualMarketRefresher.getPerpetualMarketFromId(perpetualMarketId));

if (!useSqlFunction) {
expectTimingStats();
}
expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]);
});
const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send');
await onMessage(kafkaMessage);

const perpetualMarketId: string = perpetualMarketRefresher.getPerpetualMarketFromClobPairId(
defaultUpdateClobPairEvent.clobPairId.toString(),
)!.id;
const perpetualMarket:
PerpetualMarketFromDatabase | undefined = await PerpetualMarketTable.findById(
perpetualMarketId,
);
expect(perpetualMarket).toEqual(expect.objectContaining({
clobPairId: defaultUpdateClobPairEvent.clobPairId.toString(),
status: protocolTranslations.clobStatusToMarketStatus(defaultUpdateClobPairEvent.status),
quantumConversionExponent: defaultUpdateClobPairEvent.quantumConversionExponent,
subticksPerTick: defaultUpdateClobPairEvent.subticksPerTick,
stepBaseQuantums: defaultUpdateClobPairEvent.stepBaseQuantums.toNumber(),
}));
expectTimingStats();
expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]);
});
});

function expectTimingStats() {
Expand Down
9 changes: 9 additions & 0 deletions indexer/services/ender/__tests__/scripts/scripts.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
} from '@dydxprotocol-indexer/v4-protos';
import {
BUFFER_ENCODING_UTF_8,
CLOB_STATUS_TO_MARKET_STATUS,
dbHelpers,
AssetPositionTable,
PerpetualPositionTable,
Expand Down Expand Up @@ -377,6 +378,14 @@ describe('SQL Function Tests', () => {
);
});

it('dydx_clob_pair_status_to_market_status should convert all statuses', async () => {
for (const [key, value] of Object.entries(CLOB_STATUS_TO_MARKET_STATUS)) {
const result = await getSingleRawQueryResultRow(
`SELECT dydx_clob_pair_status_to_market_status('${key}') AS result`);
expect(result).toEqual(value);
}
});

it('dydx_create_transaction.sql should insert a transaction and return correct jsonb', async () => {
const transactionHash: string = 'txnhash';
const blockHeight: string = '1';
Expand Down
3 changes: 3 additions & 0 deletions indexer/services/ender/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ export const configSchema = {
USE_SUBACCOUNT_UPDATE_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_SQL_FUNCTION_TO_CREATE_INITIAL_ROWS: parseBoolean({
default: true,
}),
Expand Down
48 changes: 46 additions & 2 deletions indexer/services/ender/src/handlers/update-clob-pair-handler.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import assert from 'assert';

import { logger } from '@dydxprotocol-indexer/base';
import {
PerpetualMarketFromDatabase, PerpetualMarketTable, perpetualMarketRefresher, protocolTranslations,
PerpetualMarketFromDatabase,
PerpetualMarketModel,
PerpetualMarketTable,
perpetualMarketRefresher,
protocolTranslations,
storeHelpers,
} from '@dydxprotocol-indexer/postgres';
import { UpdateClobPairEventV1 } from '@dydxprotocol-indexer/v4-protos';
import * as pg from 'pg';

import config from '../config';
import { generatePerpetualMarketMessage } from '../helpers/kafka-helper';
import { ConsolidatedKafkaEvent } from '../lib/types';
import { Handler } from './handler';
Expand All @@ -18,6 +26,42 @@ export class UpdateClobPairHandler extends Handler<UpdateClobPairEventV1> {

// eslint-disable-next-line @typescript-eslint/require-await
public async internalHandle(): Promise<ConsolidatedKafkaEvent[]> {
if (config.USE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION) {
return this.handleViaSqlFunction();
}
return this.handleViaKnex();
}

private async handleViaSqlFunction(): Promise<ConsolidatedKafkaEvent[]> {
const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes;
const result: pg.QueryResult = await storeHelpers.rawQuery(
`SELECT dydx_update_clob_pair_handler(
'${JSON.stringify(UpdateClobPairEventV1.decode(eventDataBinary))}'
) AS result;`,
{ txId: this.txId },
).catch((error: Error) => {
logger.error({
at: 'UpdateClobPairHandler#handleViaSqlFunction',
message: 'Failed to handle UpdateClobPairEventV1',
error,
});

throw error;
});

const perpetualMarket: PerpetualMarketFromDatabase = PerpetualMarketModel.fromJson(
result.rows[0].result.perpetual_market) as PerpetualMarketFromDatabase;

perpetualMarketRefresher.upsertPerpetualMarket(perpetualMarket);

return [
this.generateConsolidatedMarketKafkaEvent(
JSON.stringify(generatePerpetualMarketMessage([perpetualMarket])),
),
];
}

private async handleViaKnex(): Promise<ConsolidatedKafkaEvent[]> {
const perpetualMarket:
PerpetualMarketFromDatabase = await this.runFuncWithTimingStatAndErrorLogging(
this.updateClobPair(),
Expand Down Expand Up @@ -47,7 +91,7 @@ export class UpdateClobPairHandler extends Handler<UpdateClobPairEventV1> {

if (perpetualMarket === undefined) {
this.logAndThrowParseMessageError(
'Could not find perpetual market with corresponding updatePerpetualEvent.id',
'Could not find perpetual market with corresponding clobPairId',
{ event: this.event },
);
// This assert should never be hit because a ParseMessageError should be thrown above.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const scripts: string[] = [
'create_extension_pg_stat_statements.sql',
'create_extension_uuid_ossp.sql',
'dydx_asset_create_handler.sql',
'dydx_clob_pair_status_to_market_status.sql',
'dydx_market_create_handler.sql',
'dydx_market_modify_handler.sql',
'dydx_market_price_update_handler.sql',
Expand All @@ -48,6 +49,7 @@ const scripts: string[] = [
'dydx_perpetual_position_and_order_side_matching.sql',
'dydx_subaccount_update_handler.sql',
'dydx_trim_scale.sql',
'dydx_update_clob_pair_handler.sql',
'dydx_uuid.sql',
'dydx_uuid_from_asset_position_parts.sql',
'dydx_uuid_from_fill_event_parts.sql',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/**
Returns the market status (https://github.com/dydxprotocol/v4-chain/blob/ea4f6895a73627aaa9bc5e21eed1ba51313b1ce4/indexer/packages/postgres/src/types/perpetual-market-types.ts#L60)
from the clob pair status (https://github.com/dydxprotocol/v4-chain/blob/ea4f6895a73627aaa9bc5e21eed1ba51313b1ce4/proto/dydxprotocol/indexer/protocol/v1/clob.proto#L157).
The conversion is equivalent to https://github.com/dydxprotocol/v4-chain/blob/ea4f6895a73627aaa9bc5e21eed1ba51313b1ce4/indexer/packages/postgres/src/lib/protocol-translations.ts#L351.
Parameters:
- status: the ClobPairStatus (https://github.com/dydxprotocol/v4-chain/blob/ea4f6895a73627aaa9bc5e21eed1ba51313b1ce4/proto/dydxprotocol/indexer/protocol/v1/clob.proto#L157)
*/
CREATE OR REPLACE FUNCTION dydx_clob_pair_status_to_market_status(status jsonb)
RETURNS text AS $$
BEGIN
CASE status
WHEN '1'::jsonb THEN RETURN 'ACTIVE'; /** CLOB_PAIR_STATUS_ACTIVE */
WHEN '2'::jsonb THEN RETURN 'PAUSED'; /** CLOB_PAIR_STATUS_PAUSED */
WHEN '3'::jsonb THEN RETURN 'CANCEL_ONLY'; /** CLOB_PAIR_STATUS_CANCEL_ONLY */
WHEN '4'::jsonb THEN RETURN 'POST_ONLY'; /** CLOB_PAIR_STATUS_POST_ONLY */
WHEN '5'::jsonb THEN RETURN 'INITIALIZING'; /** CLOB_PAIR_STATUS_INITIALIZING */
ELSE RAISE EXCEPTION 'Invalid clob pair status: %', status;
END CASE;
END;
$$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
Parameters:
- event_data: The 'data' field of the IndexerTendermintEvent (https://github.com/dydxprotocol/v4-proto/blob/8d35c86/dydxprotocol/indexer/indexer_manager/event.proto#L25)
converted to JSON format. Conversion to JSON is expected to be done by JSON.stringify.
Returns: JSON object containing fields:
- perpetual_market: The updated perpetual market in perpetual-market-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/perpetual-market-model.ts).
*/
CREATE OR REPLACE FUNCTION dydx_update_clob_pair_handler(event_data jsonb) RETURNS jsonb AS $$
DECLARE
row_count integer;
clob_pair_id bigint;
perpetual_market_record perpetual_markets%ROWTYPE;
BEGIN
clob_pair_id = (event_data->'clobPairId')::bigint;
perpetual_market_record."status" = dydx_clob_pair_status_to_market_status(event_data->'status');
perpetual_market_record."quantumConversionExponent" = (event_data->'quantumConversionExponent')::integer;
perpetual_market_record."subticksPerTick" = (event_data->'subticksPerTick')::integer;
perpetual_market_record."stepBaseQuantums" = dydx_from_jsonlib_long(event_data->'stepBaseQuantums');

UPDATE perpetual_markets
SET
"status" = perpetual_market_record."status",
"quantumConversionExponent" = perpetual_market_record."quantumConversionExponent",
"subticksPerTick" = perpetual_market_record."subticksPerTick",
"stepBaseQuantums" = perpetual_market_record."stepBaseQuantums"
WHERE "clobPairId" = clob_pair_id
RETURNING * INTO perpetual_market_record;

IF NOT FOUND THEN
RAISE EXCEPTION 'Could not find perpetual market with corresponding clobPairId %', event_data;
END IF;

RETURN jsonb_build_object(
'perpetual_market',
dydx_to_jsonb(perpetual_market_record)
);
END;
$$ LANGUAGE plpgsql;

0 comments on commit e4b45de

Please sign in to comment.