Skip to content

Commit

Permalink
[IND-472] Update ender liquidity tier upsert to execute updates via a…
Browse files Browse the repository at this point in the history
… SQL function. (#761)
  • Loading branch information
lcwik authored Nov 6, 2023
1 parent 6c6dcde commit f8bb1fb
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 61 deletions.
2 changes: 2 additions & 0 deletions indexer/packages/postgres/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import config from './config';
import AssetModel from './models/asset-model';
import AssetPositionModel from './models/asset-position-model';
import FillModel from './models/fill-model';
import LiquidityTiersModel from './models/liquidity-tiers-model';
import MarketModel from './models/market-model';
import OraclePriceModel from './models/oracle-price-model';
import OrderModel from './models/order-model';
Expand Down Expand Up @@ -87,6 +88,7 @@ export const SQL_TO_JSON_DEFINED_MODELS = [
AssetModel,
AssetPositionModel,
FillModel,
LiquidityTiersModel,
MarketModel,
OraclePriceModel,
OrderModel,
Expand Down
1 change: 1 addition & 0 deletions indexer/packages/postgres/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export { postgresConfigSchema } from './config';
export { default as AssetModel } from './models/asset-model';
export { default as AssetPositionModel } from './models/asset-position-model';
export { default as FillModel } from './models/fill-model';
export { default as LiquidityTiersModel } from './models/liquidity-tiers-model';
export { default as MarketModel } from './models/market-model';
export { default as OraclePriceModel } from './models/oracle-price-model';
export { default as OrderModel } from './models/order-model';
Expand Down
16 changes: 16 additions & 0 deletions indexer/packages/postgres/src/models/liquidity-tiers-model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ export default class LiquidityTiersModel extends BaseModel {
};
}

/**
* A mapping from column name to JSON conversion expected.
* See getSqlConversionForDydxModelTypes for valid conversions.
*
* TODO(IND-239): Ensure that jsonSchema() / sqlToJsonConversions() / model fields match.
*/
static get sqlToJsonConversions() {
return {
id: 'integer',
name: 'string',
initialMarginPpm: 'string',
maintenanceFractionPpm: 'string',
basePositionNotional: 'string',
};
}

id!: number;

QueryBuilderType!: UpsertQueryBuilder<this>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import { updateBlockCache } from '../../src/caches/block-cache';
import { defaultLiquidityTier } from '@dydxprotocol-indexer/postgres/build/__tests__/helpers/constants';
import _ from 'lodash';
import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions';
import config from '../../src/config';

describe('liquidityTierHandler', () => {
beforeAll(async () => {
Expand Down Expand Up @@ -103,73 +104,107 @@ describe('liquidityTierHandler', () => {
});
});

it('creates new liquidity tier', async () => {
const transactionIndex: number = 0;
const liquidityTierEvent: LiquidityTierUpsertEventV1 = defaultLiquidityTierUpsertEvent;
const kafkaMessage: KafkaMessage = createKafkaMessageFromLiquidityTiersEvent({
liquidityTierEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});
// Confirm there is no existing liquidity tier
await expectNoExistingLiquidityTiers();
await perpetualMarketRefresher.updatePerpetualMarkets();

const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send');
await onMessage(kafkaMessage);

const newLiquidityTiers: LiquidityTiersFromDatabase[] = await LiquidityTiersTable.findAll(
{},
[], {
orderBy: [[LiquidityTiersColumns.id, Ordering.ASC]],
it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])(
'creates new liquidity tier (%s)',
async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_LIQUIDITY_TIER_HANDLER_SQL_FUNCTION = useSqlFunction;
const transactionIndex: number = 0;
const liquidityTierEvent: LiquidityTierUpsertEventV1 = defaultLiquidityTierUpsertEvent;
const kafkaMessage: KafkaMessage = createKafkaMessageFromLiquidityTiersEvent({
liquidityTierEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});
expect(newLiquidityTiers.length).toEqual(1);
expectLiquidityTier(newLiquidityTiers[0], liquidityTierEvent);
expectTimingStats();
validateLiquidityTierRefresher(defaultLiquidityTierUpsertEvent);
expectKafkaMessages(producerSendMock, liquidityTierEvent, 0);
});
// Confirm there is no existing liquidity tier
await expectNoExistingLiquidityTiers();
await perpetualMarketRefresher.updatePerpetualMarkets();

it('updates existing liquidity tier', async () => {
const transactionIndex: number = 0;
const liquidityTierEvent: LiquidityTierUpsertEventV1 = defaultLiquidityTierUpsertEvent;
const kafkaMessage: KafkaMessage = createKafkaMessageFromLiquidityTiersEvent({
liquidityTierEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send');
await onMessage(kafkaMessage);

const newLiquidityTiers: LiquidityTiersFromDatabase[] = await LiquidityTiersTable.findAll(
{},
[], {
orderBy: [[LiquidityTiersColumns.id, Ordering.ASC]],
});
expect(newLiquidityTiers.length).toEqual(1);
expectLiquidityTier(newLiquidityTiers[0], liquidityTierEvent);
if (!useSqlFunction) {
expectTimingStats();
}
validateLiquidityTierRefresher(defaultLiquidityTierUpsertEvent);
expectKafkaMessages(producerSendMock, liquidityTierEvent, 0);
});
// Create existing liquidity tier
await LiquidityTiersTable.upsert(defaultLiquidityTier);

// create perpetual market with existing liquidity tier to test websockets
await Promise.all([
MarketTable.create(testConstants.defaultMarket),
MarketTable.create(testConstants.defaultMarket2),
]);
await Promise.all([
PerpetualMarketTable.create(testConstants.defaultPerpetualMarket),
PerpetualMarketTable.create(testConstants.defaultPerpetualMarket2),
]);
await perpetualMarketRefresher.updatePerpetualMarkets();
it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])(
'updates existing liquidity tier (%s)',
async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_LIQUIDITY_TIER_HANDLER_SQL_FUNCTION = useSqlFunction;
const transactionIndex: number = 0;
const liquidityTierEvent: LiquidityTierUpsertEventV1 = defaultLiquidityTierUpsertEvent;
const kafkaMessage: KafkaMessage = createKafkaMessageFromLiquidityTiersEvent({
liquidityTierEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});
// Create existing liquidity tier
await LiquidityTiersTable.upsert(defaultLiquidityTier);

const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send');
await onMessage(kafkaMessage);
// create perpetual market with existing liquidity tier to test websockets
await Promise.all([
MarketTable.create(testConstants.defaultMarket),
MarketTable.create(testConstants.defaultMarket2),
]);
await Promise.all([
PerpetualMarketTable.create(testConstants.defaultPerpetualMarket),
PerpetualMarketTable.create(testConstants.defaultPerpetualMarket2),
]);
await perpetualMarketRefresher.updatePerpetualMarkets();

const newLiquidityTiers: LiquidityTiersFromDatabase[] = await LiquidityTiersTable.findAll(
{},
[], {
orderBy: [[LiquidityTiersColumns.id, Ordering.ASC]],
});
expect(newLiquidityTiers.length).toEqual(1);
expectLiquidityTier(newLiquidityTiers[0], liquidityTierEvent);
expectTimingStats();
validateLiquidityTierRefresher(defaultLiquidityTierUpsertEvent);
expectKafkaMessages(producerSendMock, liquidityTierEvent, 2);
});
const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send');
await onMessage(kafkaMessage);

const newLiquidityTiers: LiquidityTiersFromDatabase[] = await LiquidityTiersTable.findAll(
{},
[], {
orderBy: [[LiquidityTiersColumns.id, Ordering.ASC]],
});
expect(newLiquidityTiers.length).toEqual(1);
expectLiquidityTier(newLiquidityTiers[0], liquidityTierEvent);
if (!useSqlFunction) {
expectTimingStats();
}
validateLiquidityTierRefresher(defaultLiquidityTierUpsertEvent);
expectKafkaMessages(producerSendMock, liquidityTierEvent, 2);
});
});

function expectTimingStats() {
Expand Down
3 changes: 3 additions & 0 deletions indexer/services/ender/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ export const configSchema = {
USE_LIQUIDATION_HANDLER_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_LIQUIDITY_TIER_HANDLER_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_MARKET_CREATE_HANDLER_SQL_FUNCTION: parseBoolean({
default: true,
}),
Expand Down
35 changes: 35 additions & 0 deletions indexer/services/ender/src/handlers/liquidity-tier-handler.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import { logger } from '@dydxprotocol-indexer/base';
import {
LiquidityTiersCreateObject,
LiquidityTiersFromDatabase,
LiquidityTiersModel,
LiquidityTiersTable,
PerpetualMarketFromDatabase,
liquidityTierRefresher,
perpetualMarketRefresher,
protocolTranslations,
storeHelpers,
} from '@dydxprotocol-indexer/postgres';
import { LiquidityTierUpsertEventV1 } from '@dydxprotocol-indexer/v4-protos';
import _ from 'lodash';
import * as pg from 'pg';

import config from '../config';
import { QUOTE_CURRENCY_ATOMIC_RESOLUTION } from '../constants';
import { generatePerpetualMarketMessage } from '../helpers/kafka-helper';
import { ConsolidatedKafkaEvent } from '../lib/types';
Expand All @@ -24,6 +29,36 @@ export class LiquidityTierHandler extends Handler<LiquidityTierUpsertEventV1> {

// eslint-disable-next-line @typescript-eslint/require-await
public async internalHandle(): Promise<ConsolidatedKafkaEvent[]> {
if (config.USE_LIQUIDITY_TIER_HANDLER_SQL_FUNCTION) {
return this.handleViaSqlFunction();
}
return this.handleViaKnex();
}

private async handleViaSqlFunction(): Promise<ConsolidatedKafkaEvent[]> {
const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes;
const result: pg.QueryResult = await storeHelpers.rawQuery(
`SELECT dydx_liquidity_tier_handler(
'${JSON.stringify(LiquidityTierUpsertEventV1.decode(eventDataBinary))}'
) AS result;`,
{ txId: this.txId },
).catch((error: Error) => {
logger.error({
at: 'LiquidityTierHandler#handleViaSqlFunction',
message: 'Failed to handle LiquidityTierUpsertEventV1',
error,
});

throw error;
});

const liquidityTier: LiquidityTiersFromDatabase = LiquidityTiersModel.fromJson(
result.rows[0].result.liquidity_tier) as LiquidityTiersFromDatabase;
liquidityTierRefresher.upsertLiquidityTier(liquidityTier);
return this.generateWebsocketEventsForLiquidityTier(liquidityTier);
}

private async handleViaKnex(): Promise<ConsolidatedKafkaEvent[]> {
const liquidityTier:
LiquidityTiersFromDatabase = await this.runFuncWithTimingStatAndErrorLogging(
this.upsertLiquidityTier(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const scripts: string[] = [
'dydx_get_total_filled_from_liquidity.sql',
'dydx_get_weighted_average.sql',
'dydx_liquidation_fill_handler_per_order.sql',
'dydx_liquidity_tier_handler.sql',
'dydx_order_fill_handler_per_order.sql',
'dydx_perpetual_market_handler.sql',
'dydx_perpetual_position_and_order_side_matching.sql',
Expand Down
34 changes: 34 additions & 0 deletions indexer/services/ender/src/scripts/dydx_liquidity_tier_handler.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
Parameters:
- event_data: The 'data' field of the IndexerTendermintEvent (https://github.com/dydxprotocol/v4-proto/blob/8d35c86/dydxprotocol/indexer/indexer_manager/event.proto#L25)
converted to JSON format. Conversion to JSON is expected to be done by JSON.stringify.
Returns: JSON object containing fields:
- liquidy_tier: The upserted liquidity tier in liquidity-tiers-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/liquidity-tiers-model.ts).
*/
CREATE OR REPLACE FUNCTION dydx_liquidity_tier_handler(event_data jsonb) RETURNS jsonb AS $$
DECLARE
liquidity_tier_record liquidity_tiers%ROWTYPE;
BEGIN
liquidity_tier_record."id" = (event_data->'id')::integer;
liquidity_tier_record."name" = event_data->>'name';
liquidity_tier_record."initialMarginPpm" = (event_data->'initialMarginPpm')::bigint;
liquidity_tier_record."maintenanceFractionPpm" = (event_data->'maintenanceFractionPpm')::bigint;
liquidity_tier_record."basePositionNotional" = power(10, -6) * dydx_from_jsonlib_long(event_data->'basePositionNotional');

INSERT INTO liquidity_tiers
VALUES (liquidity_tier_record.*)
ON CONFLICT ("id") DO
UPDATE
SET
"name" = liquidity_tier_record."name",
"initialMarginPpm" = liquidity_tier_record."initialMarginPpm",
"maintenanceFractionPpm" = liquidity_tier_record."maintenanceFractionPpm",
"basePositionNotional" = liquidity_tier_record."basePositionNotional"
RETURNING * INTO liquidity_tier_record;

RETURN jsonb_build_object(
'liquidity_tier',
dydx_to_jsonb(liquidity_tier_record)
);
END;
$$ LANGUAGE plpgsql;

0 comments on commit f8bb1fb

Please sign in to comment.