Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IND-470] Update asset create handler to use SQL function. #748

Merged
merged 1 commit into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions indexer/packages/postgres/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { CandleMessage_Resolution, ClobPairStatus } from '@dydxprotocol-indexer/v4-protos';

import config from './config';
import AssetModel from './models/asset-model';
import AssetPositionModel from './models/asset-position-model';
import FillModel from './models/fill-model';
import MarketModel from './models/market-model';
Expand Down Expand Up @@ -83,6 +84,7 @@ export const TIME_IN_FORCE_TO_API_TIME_IN_FORCE: Record<TimeInForce, APITimeInFo

// A list of models that have sqlToJsonConversions defined.
export const SQL_TO_JSON_DEFINED_MODELS = [
AssetModel,
AssetPositionModel,
FillModel,
MarketModel,
Expand Down
1 change: 1 addition & 0 deletions indexer/packages/postgres/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ export * from './constants';

export { default as Transaction } from './helpers/transaction';
export { postgresConfigSchema } from './config';
export { default as AssetModel } from './models/asset-model';
export { default as AssetPositionModel } from './models/asset-position-model';
export { default as FillModel } from './models/fill-model';
export { default as MarketModel } from './models/market-model';
Expand Down
16 changes: 16 additions & 0 deletions indexer/packages/postgres/src/models/asset-model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,22 @@ export default class AssetModel extends Model {
};
}

/**
* A mapping from column name to JSON conversion expected.
* See getSqlConversionForDydxModelTypes for valid conversions.
*
* TODO(IND-239): Ensure that jsonSchema() / sqlToJsonConversions() / model fields match.
*/
static get sqlToJsonConversions() {
return {
id: 'string',
symbol: 'string',
atomicResolution: 'integer',
hasMarket: 'boolean',
marketId: 'integer',
};
}

id!: string;

symbol!: string;
Expand Down
113 changes: 73 additions & 40 deletions indexer/services/ender/__tests__/handlers/asset-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
} from '../helpers/constants';
import { updateBlockCache } from '../../src/caches/block-cache';
import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions';
import config from '../../src/config';

describe('assetHandler', () => {
beforeAll(async () => {
Comment on lines 36 to 42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import of config and the use of config.USE_ASSET_CREATE_HANDLER_SQL_FUNCTION in the tests indicate that the tests are now dependent on the configuration. This could lead to unpredictable test results if the configuration changes. Consider mocking the configuration to ensure consistent test results.

Expand All @@ -61,6 +62,7 @@ describe('assetHandler', () => {

afterEach(async () => {
await dbHelpers.clearData();
assetRefresher.clear();
jest.clearAllMocks();
});

Expand Down Expand Up @@ -98,51 +100,82 @@ describe('assetHandler', () => {
});
});

it('fails when market doesnt exist for asset', async () => {
const transactionIndex: number = 0;
const kafkaMessage: KafkaMessage = createKafkaMessageFromAssetEvent({
assetEvent: defaultAssetCreateEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});

const message: string = 'Unable to find market with id: 0';
await expect(onMessage(kafkaMessage)).rejects.toThrowError(
new Error(message),
);
});
it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])(
'fails when market doesnt exist for asset (%s)',
async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_ASSET_CREATE_HANDLER_SQL_FUNCTION = useSqlFunction;
const transactionIndex: number = 0;
const kafkaMessage: KafkaMessage = createKafkaMessageFromAssetEvent({
assetEvent: defaultAssetCreateEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});

it('creates new asset', async () => {
await MarketTable.create(testConstants.defaultMarket);
await marketRefresher.updateMarkets();
const transactionIndex: number = 0;

const assetEvent: AssetCreateEventV1 = defaultAssetCreateEvent;
const kafkaMessage: KafkaMessage = createKafkaMessageFromAssetEvent({
assetEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
await expect(onMessage(kafkaMessage)).rejects.toThrowError(
'Unable to find market with id: 0',
);
});
Comment on lines 100 to 131
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameterized tests are a good way to test the same logic with different configurations. However, the configuration is being set directly in the test. This could lead to unpredictable test results if the tests are run in parallel. Consider setting the configuration in a beforeEach block within the describe.each block to ensure that the configuration is set correctly for each test.

- it.each([
-   [
-     'via knex',
-     false,
-   ],
-   [
-     'via SQL function',
-     true,
-   ],
- ])(
-   'fails when market doesnt exist for asset (%s)',
-   async (
-     _name: string,
-     useSqlFunction: boolean,
-   ) => {
-     config.USE_ASSET_CREATE_HANDLER_SQL_FUNCTION = useSqlFunction;
-     // rest of the test
-   });
+ describe.each([
+   [
+     'via knex',
+     false,
+   ],
+   [
+     'via SQL function',
+     true,
+   ],
+ ])('Asset creation %s', (_name: string, useSqlFunction: boolean) => {
+   beforeEach(() => {
+     config.USE_ASSET_CREATE_HANDLER_SQL_FUNCTION = useSqlFunction;
+   });
+
+   it('fails when market doesnt exist for asset', async () => {
+     // rest of the test
+   });
+ });

Commitable suggestion (Beta)
Suggested change
});
});
it('fails when market doesnt exist for asset', async () => {
const transactionIndex: number = 0;
const kafkaMessage: KafkaMessage = createKafkaMessageFromAssetEvent({
assetEvent: defaultAssetCreateEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});
const message: string = 'Unable to find market with id: 0';
await expect(onMessage(kafkaMessage)).rejects.toThrowError(
new Error(message),
);
});
it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])(
'fails when market doesnt exist for asset (%s)',
async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_ASSET_CREATE_HANDLER_SQL_FUNCTION = useSqlFunction;
const transactionIndex: number = 0;
const kafkaMessage: KafkaMessage = createKafkaMessageFromAssetEvent({
assetEvent: defaultAssetCreateEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});
it('creates new asset', async () => {
await MarketTable.create(testConstants.defaultMarket);
await marketRefresher.updateMarkets();
const transactionIndex: number = 0;
const assetEvent: AssetCreateEventV1 = defaultAssetCreateEvent;
const kafkaMessage: KafkaMessage = createKafkaMessageFromAssetEvent({
assetEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
await expect(onMessage(kafkaMessage)).rejects.toThrowError(
'Unable to find market with id: 0',
);
});
describe.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])('Asset creation %s', (_name: string, useSqlFunction: boolean) => {
beforeEach(() => {
config.USE_ASSET_CREATE_HANDLER_SQL_FUNCTION = useSqlFunction;
});
it('fails when market doesnt exist for asset', async () => {
// rest of the test
});
});

// Confirm there is no existing asset to or from the sender subaccount
await expectNoExistingAssets();

await onMessage(kafkaMessage);
it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])(
'creates new asset (%s)',
async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_ASSET_CREATE_HANDLER_SQL_FUNCTION = useSqlFunction;
await MarketTable.create(testConstants.defaultMarket);
await marketRefresher.updateMarkets();
const transactionIndex: number = 0;

const newAssets: AssetFromDatabase[] = await AssetTable.findAll(
{},
[], {
orderBy: [[AssetColumns.id, Ordering.ASC]],
const assetEvent: AssetCreateEventV1 = defaultAssetCreateEvent;
const kafkaMessage: KafkaMessage = createKafkaMessageFromAssetEvent({
assetEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});
expect(newAssets.length).toEqual(1);
expectAssetMatchesEvent(assetEvent, newAssets[0]);
expectTimingStats();
const asset: AssetFromDatabase = assetRefresher.getAssetFromId('0');
expect(asset).toBeDefined();
});
// Confirm there is no existing asset to or from the sender subaccount
await expectNoExistingAssets();

await onMessage(kafkaMessage);

const newAssets: AssetFromDatabase[] = await AssetTable.findAll(
{},
[], {
orderBy: [[AssetColumns.id, Ordering.ASC]],
});
expect(newAssets.length).toEqual(1);
expectAssetMatchesEvent(assetEvent, newAssets[0]);
if (!useSqlFunction) {
expectTimingStats();
}
const asset: AssetFromDatabase = assetRefresher.getAssetFromId('0');
expect(asset).toBeDefined();
});
Comment on lines +133 to +178
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same comment applies to these parameterized tests as well.

});

function expectTimingStats() {
Expand Down
5 changes: 4 additions & 1 deletion indexer/services/ender/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export const configSchema = {
SEND_WEBSOCKET_MESSAGES: parseBoolean({
default: true,
}),
USE_ORDER_HANDLER_SQL_FUNCTION: parseBoolean({
USE_ASSET_CREATE_HANDLER_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_LIQUIDATION_HANDLER_SQL_FUNCTION: parseBoolean({
Expand All @@ -38,6 +38,9 @@ export const configSchema = {
USE_MARKET_PRICE_UPDATE_HANDLER_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_ORDER_HANDLER_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_SUBACCOUNT_UPDATE_SQL_FUNCTION: parseBoolean({
default: true,
}),
Expand Down
40 changes: 39 additions & 1 deletion indexer/services/ender/src/handlers/asset-handler.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
import { logger } from '@dydxprotocol-indexer/base';
import {
AssetFromDatabase, AssetTable, assetRefresher, marketRefresher,
AssetFromDatabase,
AssetModel,
AssetTable,
assetRefresher,
marketRefresher,
storeHelpers,
} from '@dydxprotocol-indexer/postgres';
import { AssetCreateEventV1 } from '@dydxprotocol-indexer/v4-protos';
import * as pg from 'pg';

import config from '../config';
import { ConsolidatedKafkaEvent } from '../lib/types';
import { Handler } from './handler';

Expand All @@ -15,6 +23,36 @@ export class AssetCreationHandler extends Handler<AssetCreateEventV1> {

// eslint-disable-next-line @typescript-eslint/require-await
public async internalHandle(): Promise<ConsolidatedKafkaEvent[]> {
if (config.USE_ASSET_CREATE_HANDLER_SQL_FUNCTION) {
return this.handleViaSqlFunction();
}
return this.handleViaKnex();
}

private async handleViaSqlFunction(): Promise<ConsolidatedKafkaEvent[]> {
const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes;
const result: pg.QueryResult = await storeHelpers.rawQuery(
`SELECT dydx_asset_create_handler(
'${JSON.stringify(AssetCreateEventV1.decode(eventDataBinary))}'
) AS result;`,
{ txId: this.txId },
).catch((error: Error) => {
logger.error({
at: 'AssetCreationHandler#handleViaSqlFunction',
message: 'Failed to handle AssetCreateEventV1',
error,
});

throw error;
});

const asset: AssetFromDatabase = AssetModel.fromJson(
result.rows[0].result.asset) as AssetFromDatabase;
assetRefresher.addAsset(asset);
return [];
}

private async handleViaKnex(): Promise<ConsolidatedKafkaEvent[]> {
await this.runFuncWithTimingStatAndErrorLogging(
this.createAsset(),
this.generateTimingStatsOptions('create_asset'),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ function newScript(name: string, scriptPath: string): PostgresFunction {
const scripts: string[] = [
'create_extension_pg_stat_statements.sql',
'create_extension_uuid_ossp.sql',
'dydx_asset_create_handler.sql',
'dydx_market_create_handler.sql',
'dydx_market_modify_handler.sql',
'dydx_market_price_update_handler.sql',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
Parameters:
- event_data: The 'data' field of the IndexerTendermintEvent (https://github.com/dydxprotocol/v4-proto/blob/8d35c86/dydxprotocol/indexer/indexer_manager/event.proto#L25)
converted to JSON format. Conversion to JSON is expected to be done by JSON.stringify.
Returns: JSON object containing fields:
- asset: The created asset in asset-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/asset-model.ts).
*/
CREATE OR REPLACE FUNCTION dydx_asset_create_handler(event_data jsonb) RETURNS jsonb AS $$
DECLARE
market_record_id integer;
asset_record assets%ROWTYPE;
BEGIN
asset_record."id" = event_data->>'id';
asset_record."atomicResolution" = (event_data->'atomicResolution')::integer;
asset_record."symbol" = event_data->>'symbol';

asset_record."hasMarket" = (event_data->'hasMarket')::bool;
if asset_record."hasMarket" THEN
market_record_id = (event_data->'marketId')::integer;
SELECT "id" INTO asset_record."marketId" FROM markets WHERE "id" = market_record_id;

IF NOT FOUND THEN
RAISE EXCEPTION 'Unable to find market with id: %', market_record_id;
END IF;
END IF;

INSERT INTO assets VALUES (asset_record.*);

RETURN jsonb_build_object(
'asset',
dydx_to_jsonb(asset_record)
);
END;
$$ LANGUAGE plpgsql;