Skip to content

Commit

Permalink
[IND-470] Update asset create handler to use SQL function. (#748)
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwik authored Nov 3, 2023
1 parent 5e0c4e0 commit 4bd3c34
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 42 deletions.
2 changes: 2 additions & 0 deletions indexer/packages/postgres/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { CandleMessage_Resolution, ClobPairStatus } from '@dydxprotocol-indexer/v4-protos';

import config from './config';
import AssetModel from './models/asset-model';
import AssetPositionModel from './models/asset-position-model';
import FillModel from './models/fill-model';
import MarketModel from './models/market-model';
Expand Down Expand Up @@ -83,6 +84,7 @@ export const TIME_IN_FORCE_TO_API_TIME_IN_FORCE: Record<TimeInForce, APITimeInFo

// A list of models that have sqlToJsonConversions defined.
export const SQL_TO_JSON_DEFINED_MODELS = [
AssetModel,
AssetPositionModel,
FillModel,
MarketModel,
Expand Down
1 change: 1 addition & 0 deletions indexer/packages/postgres/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ export * from './constants';

export { default as Transaction } from './helpers/transaction';
export { postgresConfigSchema } from './config';
export { default as AssetModel } from './models/asset-model';
export { default as AssetPositionModel } from './models/asset-position-model';
export { default as FillModel } from './models/fill-model';
export { default as MarketModel } from './models/market-model';
Expand Down
16 changes: 16 additions & 0 deletions indexer/packages/postgres/src/models/asset-model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,22 @@ export default class AssetModel extends Model {
};
}

/**
* A mapping from column name to JSON conversion expected.
* See getSqlConversionForDydxModelTypes for valid conversions.
*
* TODO(IND-239): Ensure that jsonSchema() / sqlToJsonConversions() / model fields match.
*/
static get sqlToJsonConversions() {
return {
id: 'string',
symbol: 'string',
atomicResolution: 'integer',
hasMarket: 'boolean',
marketId: 'integer',
};
}

id!: string;

symbol!: string;
Expand Down
113 changes: 73 additions & 40 deletions indexer/services/ender/__tests__/handlers/asset-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
} from '../helpers/constants';
import { updateBlockCache } from '../../src/caches/block-cache';
import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions';
import config from '../../src/config';

describe('assetHandler', () => {
beforeAll(async () => {
Expand All @@ -61,6 +62,7 @@ describe('assetHandler', () => {

afterEach(async () => {
await dbHelpers.clearData();
assetRefresher.clear();
jest.clearAllMocks();
});

Expand Down Expand Up @@ -98,51 +100,82 @@ describe('assetHandler', () => {
});
});

it('fails when market doesnt exist for asset', async () => {
const transactionIndex: number = 0;
const kafkaMessage: KafkaMessage = createKafkaMessageFromAssetEvent({
assetEvent: defaultAssetCreateEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});

const message: string = 'Unable to find market with id: 0';
await expect(onMessage(kafkaMessage)).rejects.toThrowError(
new Error(message),
);
});
it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])(
'fails when market doesnt exist for asset (%s)',
async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_ASSET_CREATE_HANDLER_SQL_FUNCTION = useSqlFunction;
const transactionIndex: number = 0;
const kafkaMessage: KafkaMessage = createKafkaMessageFromAssetEvent({
assetEvent: defaultAssetCreateEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});

it('creates new asset', async () => {
await MarketTable.create(testConstants.defaultMarket);
await marketRefresher.updateMarkets();
const transactionIndex: number = 0;

const assetEvent: AssetCreateEventV1 = defaultAssetCreateEvent;
const kafkaMessage: KafkaMessage = createKafkaMessageFromAssetEvent({
assetEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
await expect(onMessage(kafkaMessage)).rejects.toThrowError(
'Unable to find market with id: 0',
);
});
// Confirm there is no existing asset to or from the sender subaccount
await expectNoExistingAssets();

await onMessage(kafkaMessage);
it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])(
'creates new asset (%s)',
async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_ASSET_CREATE_HANDLER_SQL_FUNCTION = useSqlFunction;
await MarketTable.create(testConstants.defaultMarket);
await marketRefresher.updateMarkets();
const transactionIndex: number = 0;

const newAssets: AssetFromDatabase[] = await AssetTable.findAll(
{},
[], {
orderBy: [[AssetColumns.id, Ordering.ASC]],
const assetEvent: AssetCreateEventV1 = defaultAssetCreateEvent;
const kafkaMessage: KafkaMessage = createKafkaMessageFromAssetEvent({
assetEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});
expect(newAssets.length).toEqual(1);
expectAssetMatchesEvent(assetEvent, newAssets[0]);
expectTimingStats();
const asset: AssetFromDatabase = assetRefresher.getAssetFromId('0');
expect(asset).toBeDefined();
});
// Confirm there is no existing asset to or from the sender subaccount
await expectNoExistingAssets();

await onMessage(kafkaMessage);

const newAssets: AssetFromDatabase[] = await AssetTable.findAll(
{},
[], {
orderBy: [[AssetColumns.id, Ordering.ASC]],
});
expect(newAssets.length).toEqual(1);
expectAssetMatchesEvent(assetEvent, newAssets[0]);
if (!useSqlFunction) {
expectTimingStats();
}
const asset: AssetFromDatabase = assetRefresher.getAssetFromId('0');
expect(asset).toBeDefined();
});
});

function expectTimingStats() {
Expand Down
5 changes: 4 additions & 1 deletion indexer/services/ender/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export const configSchema = {
SEND_WEBSOCKET_MESSAGES: parseBoolean({
default: true,
}),
USE_ORDER_HANDLER_SQL_FUNCTION: parseBoolean({
USE_ASSET_CREATE_HANDLER_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_LIQUIDATION_HANDLER_SQL_FUNCTION: parseBoolean({
Expand All @@ -38,6 +38,9 @@ export const configSchema = {
USE_MARKET_PRICE_UPDATE_HANDLER_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_ORDER_HANDLER_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_SUBACCOUNT_UPDATE_SQL_FUNCTION: parseBoolean({
default: true,
}),
Expand Down
40 changes: 39 additions & 1 deletion indexer/services/ender/src/handlers/asset-handler.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
import { logger } from '@dydxprotocol-indexer/base';
import {
AssetFromDatabase, AssetTable, assetRefresher, marketRefresher,
AssetFromDatabase,
AssetModel,
AssetTable,
assetRefresher,
marketRefresher,
storeHelpers,
} from '@dydxprotocol-indexer/postgres';
import { AssetCreateEventV1 } from '@dydxprotocol-indexer/v4-protos';
import * as pg from 'pg';

import config from '../config';
import { ConsolidatedKafkaEvent } from '../lib/types';
import { Handler } from './handler';

Expand All @@ -15,6 +23,36 @@ export class AssetCreationHandler extends Handler<AssetCreateEventV1> {

// eslint-disable-next-line @typescript-eslint/require-await
public async internalHandle(): Promise<ConsolidatedKafkaEvent[]> {
if (config.USE_ASSET_CREATE_HANDLER_SQL_FUNCTION) {
return this.handleViaSqlFunction();
}
return this.handleViaKnex();
}

private async handleViaSqlFunction(): Promise<ConsolidatedKafkaEvent[]> {
const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes;
const result: pg.QueryResult = await storeHelpers.rawQuery(
`SELECT dydx_asset_create_handler(
'${JSON.stringify(AssetCreateEventV1.decode(eventDataBinary))}'
) AS result;`,
{ txId: this.txId },
).catch((error: Error) => {
logger.error({
at: 'AssetCreationHandler#handleViaSqlFunction',
message: 'Failed to handle AssetCreateEventV1',
error,
});

throw error;
});

const asset: AssetFromDatabase = AssetModel.fromJson(
result.rows[0].result.asset) as AssetFromDatabase;
assetRefresher.addAsset(asset);
return [];
}

private async handleViaKnex(): Promise<ConsolidatedKafkaEvent[]> {
await this.runFuncWithTimingStatAndErrorLogging(
this.createAsset(),
this.generateTimingStatsOptions('create_asset'),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ function newScript(name: string, scriptPath: string): PostgresFunction {
const scripts: string[] = [
'create_extension_pg_stat_statements.sql',
'create_extension_uuid_ossp.sql',
'dydx_asset_create_handler.sql',
'dydx_market_create_handler.sql',
'dydx_market_modify_handler.sql',
'dydx_market_price_update_handler.sql',
Expand Down
34 changes: 34 additions & 0 deletions indexer/services/ender/src/scripts/dydx_asset_create_handler.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
Parameters:
- event_data: The 'data' field of the IndexerTendermintEvent (https://github.com/dydxprotocol/v4-proto/blob/8d35c86/dydxprotocol/indexer/indexer_manager/event.proto#L25)
converted to JSON format. Conversion to JSON is expected to be done by JSON.stringify.
Returns: JSON object containing fields:
- asset: The created asset in asset-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/asset-model.ts).
*/
CREATE OR REPLACE FUNCTION dydx_asset_create_handler(event_data jsonb) RETURNS jsonb AS $$
DECLARE
market_record_id integer;
asset_record assets%ROWTYPE;
BEGIN
asset_record."id" = event_data->>'id';
asset_record."atomicResolution" = (event_data->'atomicResolution')::integer;
asset_record."symbol" = event_data->>'symbol';

asset_record."hasMarket" = (event_data->'hasMarket')::bool;
if asset_record."hasMarket" THEN
market_record_id = (event_data->'marketId')::integer;
SELECT "id" INTO asset_record."marketId" FROM markets WHERE "id" = market_record_id;

IF NOT FOUND THEN
RAISE EXCEPTION 'Unable to find market with id: %', market_record_id;
END IF;
END IF;

INSERT INTO assets VALUES (asset_record.*);

RETURN jsonb_build_object(
'asset',
dydx_to_jsonb(asset_record)
);
END;
$$ LANGUAGE plpgsql;

0 comments on commit 4bd3c34

Please sign in to comment.