Skip to content

Commit

Permalink
[IND-473] Update ender perpetual market create to execute updates via…
Browse files Browse the repository at this point in the history
… a SQL function.
  • Loading branch information
lcwik committed Nov 6, 2023
1 parent 78614e2 commit 6e7aac0
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import {
} from '../helpers/constants';
import { updateBlockCache } from '../../src/caches/block-cache';
import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions';
import config from '../../src/config';

describe('perpetualMarketHandler', () => {
beforeAll(async () => {
Expand All @@ -65,6 +66,7 @@ describe('perpetualMarketHandler', () => {

afterEach(async () => {
await dbHelpers.clearData();
perpetualMarketRefresher.clear();
jest.clearAllMocks();
liquidityTierRefresher.clear();
});
Expand Down Expand Up @@ -103,71 +105,118 @@ describe('perpetualMarketHandler', () => {
});
});

it('fails when market doesnt exist for perpetual market', async () => {
const transactionIndex: number = 0;
const kafkaMessage: KafkaMessage = createKafkaMessageFromPerpetualMarketEvent({
perpetualMarketEvent: defaultPerpetualMarketCreateEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])(
'fails when market doesnt exist for perpetual market (%s)',
async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_PERPETUAL_MARKET_HANDLER_SQL_FUNCTION = useSqlFunction;
const transactionIndex: number = 0;
const kafkaMessage: KafkaMessage = createKafkaMessageFromPerpetualMarketEvent({
perpetualMarketEvent: defaultPerpetualMarketCreateEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});

await expect(onMessage(kafkaMessage)).rejects.toThrowError();
});

await expect(onMessage(kafkaMessage)).rejects.toThrowError();
});
it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])(
'fails when liquidity tier doesnt exist for perpetual market (%s)',
async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_PERPETUAL_MARKET_HANDLER_SQL_FUNCTION = useSqlFunction;
await MarketTable.create(testConstants.defaultMarket);
await marketRefresher.updateMarkets();
const transactionIndex: number = 0;
const kafkaMessage: KafkaMessage = createKafkaMessageFromPerpetualMarketEvent({
perpetualMarketEvent: defaultPerpetualMarketCreateEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});

it('fails when liquidity tier doesnt exist for perpetual market', async () => {
await MarketTable.create(testConstants.defaultMarket);
await marketRefresher.updateMarkets();
const transactionIndex: number = 0;
const kafkaMessage: KafkaMessage = createKafkaMessageFromPerpetualMarketEvent({
perpetualMarketEvent: defaultPerpetualMarketCreateEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
await expect(onMessage(kafkaMessage)).rejects.toThrowError();
});

await expect(onMessage(kafkaMessage)).rejects.toThrowError();
});

it('creates new perpetual market', async () => {
await Promise.all([
MarketTable.create(testConstants.defaultMarket),
LiquidityTiersTable.create(testConstants.defaultLiquidityTier),
]);
await liquidityTierRefresher.updateLiquidityTiers();
await marketRefresher.updateMarkets();
it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])(
'creates new perpetual market (%s)',
async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_PERPETUAL_MARKET_HANDLER_SQL_FUNCTION = useSqlFunction;
await Promise.all([
MarketTable.create(testConstants.defaultMarket),
LiquidityTiersTable.create(testConstants.defaultLiquidityTier),
]);
await liquidityTierRefresher.updateLiquidityTiers();
await marketRefresher.updateMarkets();

const transactionIndex: number = 0;
const transactionIndex: number = 0;

const perpetualMarketEvent: PerpetualMarketCreateEventV1 = defaultPerpetualMarketCreateEvent;
const kafkaMessage: KafkaMessage = createKafkaMessageFromPerpetualMarketEvent({
perpetualMarketEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});
// Confirm there is no existing perpetualMarket.
await expectNoExistingPerpetualMarkets();
const perpetualMarketEvent: PerpetualMarketCreateEventV1 = defaultPerpetualMarketCreateEvent;
const kafkaMessage: KafkaMessage = createKafkaMessageFromPerpetualMarketEvent({
perpetualMarketEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});
// Confirm there is no existing perpetualMarket.
await expectNoExistingPerpetualMarkets();

const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send');
await onMessage(kafkaMessage);
const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send');
await onMessage(kafkaMessage);

const newPerpetualMarkets: PerpetualMarketFromDatabase[] = await PerpetualMarketTable.findAll(
{},
[], {
orderBy: [[PerpetualMarketColumns.id, Ordering.ASC]],
});
expect(newPerpetualMarkets.length).toEqual(1);
expectPerpetualMarketMatchesEvent(perpetualMarketEvent, newPerpetualMarkets[0]);
expectTimingStats();
const perpetualMarket: PerpetualMarketFromDatabase | undefined = perpetualMarketRefresher.getPerpetualMarketFromId('0');
expect(perpetualMarket).toBeDefined();
expectPerpetualMarket(perpetualMarket!, perpetualMarketEvent);
expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]);
});
const newPerpetualMarkets: PerpetualMarketFromDatabase[] = await PerpetualMarketTable.findAll(
{},
[], {
orderBy: [[PerpetualMarketColumns.id, Ordering.ASC]],
});
expect(newPerpetualMarkets.length).toEqual(1);
expectPerpetualMarketMatchesEvent(perpetualMarketEvent, newPerpetualMarkets[0]);
if (!useSqlFunction) {
expectTimingStats();
}
const perpetualMarket: PerpetualMarketFromDatabase | undefined = perpetualMarketRefresher.getPerpetualMarketFromId('0');
expect(perpetualMarket).toBeDefined();
expectPerpetualMarket(perpetualMarket!, perpetualMarketEvent);
expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]);
});
});

function expectTimingStats() {
Expand Down
3 changes: 3 additions & 0 deletions indexer/services/ender/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ export const configSchema = {
USE_SUBACCOUNT_UPDATE_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_PERPETUAL_MARKET_HANDLER_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION: parseBoolean({
default: true,
}),
Expand Down
44 changes: 42 additions & 2 deletions indexer/services/ender/src/handlers/perpetual-market-handler.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import { logger } from '@dydxprotocol-indexer/base';
import {
PerpetualMarketCreateObject,
PerpetualMarketFromDatabase,
PerpetualMarketFromDatabase, PerpetualMarketModel,
perpetualMarketRefresher,
PerpetualMarketTable,
protocolTranslations,
protocolTranslations, storeHelpers,
} from '@dydxprotocol-indexer/postgres';
import { PerpetualMarketCreateEventV1 } from '@dydxprotocol-indexer/v4-protos';
import * as pg from 'pg';

import config from '../config';
import { generatePerpetualMarketMessage } from '../helpers/kafka-helper';
import { ConsolidatedKafkaEvent } from '../lib/types';
import { Handler } from './handler';
Expand All @@ -20,6 +23,43 @@ export class PerpetualMarketCreationHandler extends Handler<PerpetualMarketCreat

// eslint-disable-next-line @typescript-eslint/require-await
public async internalHandle(): Promise<ConsolidatedKafkaEvent[]> {
if (config.USE_PERPETUAL_MARKET_HANDLER_SQL_FUNCTION) {
return this.handleViaSqlFunction();
}
return this.handleViaKnex();
}

// eslint-disable-next-line @typescript-eslint/require-await
private async handleViaSqlFunction(): Promise<ConsolidatedKafkaEvent[]> {
const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes;
const result: pg.QueryResult = await storeHelpers.rawQuery(
`SELECT dydx_perpetual_market_handler(
'${JSON.stringify(PerpetualMarketCreateEventV1.decode(eventDataBinary))}'
) AS result;`,
{ txId: this.txId },
).catch((error: Error) => {
logger.error({
at: 'PerpetualMarketCreationHandler#handleViaSqlFunction',
message: 'Failed to handle PerpetualMarketCreateEventV1',
error,
});

throw error;
});

const perpetualMarket: PerpetualMarketFromDatabase = PerpetualMarketModel.fromJson(
result.rows[0].result.perpetual_market) as PerpetualMarketFromDatabase;

perpetualMarketRefresher.upsertPerpetualMarket(perpetualMarket);
return [
this.generateConsolidatedMarketKafkaEvent(
JSON.stringify(generatePerpetualMarketMessage([perpetualMarket])),
),
];
}

// eslint-disable-next-line @typescript-eslint/require-await
private async handleViaKnex(): Promise<ConsolidatedKafkaEvent[]> {
const perpetualMarket:
PerpetualMarketFromDatabase = await this.runFuncWithTimingStatAndErrorLogging(
this.createPerpetualMarket(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const scripts: string[] = [
'dydx_get_weighted_average.sql',
'dydx_liquidation_fill_handler_per_order.sql',
'dydx_order_fill_handler_per_order.sql',
'dydx_perpetual_market_handler.sql',
'dydx_perpetual_position_and_order_side_matching.sql',
'dydx_subaccount_update_handler.sql',
'dydx_trim_scale.sql',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
Parameters:
- event_data: The 'data' field of the IndexerTendermintEvent (https://github.com/dydxprotocol/v4-proto/blob/8d35c86/dydxprotocol/indexer/indexer_manager/event.proto#L25)
converted to JSON format. Conversion to JSON is expected to be done by JSON.stringify.
Returns: JSON object containing fields:
- perpetual_market: The updated perpetual market in perpetual-market-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/perpetual-market-model.ts).
*/
CREATE OR REPLACE FUNCTION dydx_perpetual_market_handler(event_data jsonb) RETURNS jsonb AS $$
DECLARE
perpetual_market_record perpetual_markets%ROWTYPE;
BEGIN
perpetual_market_record."id" = (event_data->'id')::bigint;
perpetual_market_record."clobPairId" = (event_data->'clobPairId')::bigint;
perpetual_market_record."ticker" = event_data->>'ticker';
perpetual_market_record."marketId" = (event_data->'marketId')::integer;
perpetual_market_record."status" = dydx_clob_pair_status_to_market_status(event_data->'status');
perpetual_market_record."lastPrice" = 0;
perpetual_market_record."priceChange24H" = 0;
perpetual_market_record."trades24H" = 0;
perpetual_market_record."volume24H" = 0;
perpetual_market_record."nextFundingRate" = 0;
perpetual_market_record."openInterest"= 0;
perpetual_market_record."quantumConversionExponent" = (event_data->'quantumConversionExponent')::integer;
perpetual_market_record."atomicResolution" = (event_data->'atomicResolution')::integer;
perpetual_market_record."subticksPerTick" = (event_data->'subticksPerTick')::integer;
perpetual_market_record."stepBaseQuantums" = dydx_from_jsonlib_long(event_data->'stepBaseQuantums');
perpetual_market_record."liquidityTierId" = (event_data->'liquidityTier')::integer;

INSERT INTO perpetual_markets VALUES (perpetual_market_record.*) RETURNING * INTO perpetual_market_record;

RETURN jsonb_build_object(
'perpetual_market',
dydx_to_jsonb(perpetual_market_record)
);
END;
$$ LANGUAGE plpgsql;

0 comments on commit 6e7aac0

Please sign in to comment.