Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IND-467] Push ender market create logic to use a single sql function #737

Merged
merged 3 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions indexer/packages/postgres/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { CandleMessage_Resolution, ClobPairStatus } from '@dydxprotocol-indexer/
import config from './config';
import AssetPositionModel from './models/asset-position-model';
import FillModel from './models/fill-model';
import MarketModel from './models/market-model';
import OrderModel from './models/order-model';
import PerpetualMarketModel from './models/perpetual-market-model';
import PerpetualPositionModel from './models/perpetual-position-model';
Expand Down Expand Up @@ -83,6 +84,7 @@ export const TIME_IN_FORCE_TO_API_TIME_IN_FORCE: Record<TimeInForce, APITimeInFo
export const SQL_TO_JSON_DEFINED_MODELS = [
AssetPositionModel,
FillModel,
MarketModel,
OrderModel,
PerpetualMarketModel,
PerpetualPositionModel,
Expand Down
1 change: 1 addition & 0 deletions indexer/packages/postgres/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export { default as Transaction } from './helpers/transaction';
export { postgresConfigSchema } from './config';
export { default as AssetPositionModel } from './models/asset-position-model';
export { default as FillModel } from './models/fill-model';
export { default as MarketModel } from './models/market-model';
export { default as OrderModel } from './models/order-model';
export { default as PerpetualMarketModel } from './models/perpetual-market-model';
export { default as PerpetualPositionModel } from './models/perpetual-position-model';
Expand Down
7 changes: 7 additions & 0 deletions indexer/packages/postgres/src/loops/market-refresher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ export async function updateMarkets(options?: Options): Promise<void> {
stats.timing(`${config.SERVICE_NAME}.loops.update_markets`, Date.now() - startTime);
}

/**
* Updates the markets map with the specified market.
*/
export function updateMarket(market: MarketFromDatabase): void {
idToMarket[market.id] = market;
}
Comment on lines 40 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The updateMarket function directly modifies the idToMarket map. This could potentially lead to data races if multiple threads are trying to update the map at the same time. Consider using a lock or other synchronization mechanism to ensure thread safety.

+ let updateLock = false;

export function updateMarket(market: MarketFromDatabase): void {
+  while(updateLock) {} // wait for lock to be released
+  updateLock = true; // acquire lock
  idToMarket[market.id] = market;
+  updateLock = false; // release lock
}

Commitable suggestion (Beta)
Suggested change
stats.timing(`${config.SERVICE_NAME}.loops.update_markets`, Date.now() - startTime);
}
/**
* Updates the markets map with the specified market.
*/
export function updateMarket(market: MarketFromDatabase): void {
idToMarket[market.id] = market;
}
let updateLock = false;
export function updateMarket(market: MarketFromDatabase): void {
while(updateLock) {} // wait for lock to be released
updateLock = true; // acquire lock
idToMarket[market.id] = market;
updateLock = false; // release lock
}


/**
* Gets the market for a given id.
*/
Expand Down
16 changes: 16 additions & 0 deletions indexer/packages/postgres/src/models/market-model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,22 @@ export default class MarketModel extends Model {
};
}

/**
* A mapping from column name to JSON conversion expected.
* See getSqlConversionForDydxModelTypes for valid conversions.
*
* TODO(IND-239): Ensure that jsonSchema() / sqlToJsonConversions() / model fields match.
*/
static get sqlToJsonConversions() {
return {
id: 'integer',
pair: 'string',
exponent: 'integer',
minPriceChangePpm: 'integer',
oraclePrice: 'string',
};
}

id!: number;

pair!: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
} from '../../helpers/indexer-proto-helpers';
import Long from 'long';
import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions';
import config from '../../../src/config';

describe('marketCreateHandler', () => {
beforeAll(async () => {
Expand Down Expand Up @@ -86,67 +87,97 @@ describe('marketCreateHandler', () => {
});
});

it('creates new market', async () => {
const transactionIndex: number = 0;
it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])(
'creates new market (%s)',
async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_MARKET_CREATE_HANDLER_SQL_FUNCTION = useSqlFunction;
const transactionIndex: number = 0;

const marketCreate: MarketEventV1 = {
marketId: 3,
marketCreate: {
base: {
pair: 'DYDX-USD',
minPriceChangePpm: 500,
const marketCreate: MarketEventV1 = {
marketId: 3,
marketCreate: {
base: {
pair: 'DYDX-USD',
minPriceChangePpm: 500,
},
exponent: -5,
},
exponent: -5,
},
};

const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({
marketEvents: [marketCreate],
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});
};

await onMessage(kafkaMessage);
const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({
marketEvents: [marketCreate],
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});

const market: MarketFromDatabase = await MarketTable.findById(
marketCreate.marketId,
) as MarketFromDatabase;
await onMessage(kafkaMessage);

expectMarketMatchesEvent(marketCreate as MarketCreateEventMessage, market);
});
const market: MarketFromDatabase = await MarketTable.findById(
marketCreate.marketId,
) as MarketFromDatabase;

expectMarketMatchesEvent(marketCreate as MarketCreateEventMessage, market);
});

it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])(
'errors when attempting to create an existing market (%s)',
async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_MARKET_CREATE_HANDLER_SQL_FUNCTION = useSqlFunction;
const transactionIndex: number = 0;

it('errors when attempting to create an existing market', async () => {
const transactionIndex: number = 0;
const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({
marketEvents: [defaultMarketCreate],
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});
await expect(onMessage(kafkaMessage)).rejects.toThrowError(
new ParseMessageError('Market in MarketCreate already exists'),
);

const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({
marketEvents: [defaultMarketCreate],
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
// Check that market in database is the old market.
const market: MarketFromDatabase = await MarketTable.findById(
defaultMarketCreate.marketId,
) as MarketFromDatabase;
expect(market.minPriceChangePpm).toEqual(50);

expect(loggerError).toHaveBeenCalledWith(expect.objectContaining({
at: 'MarketCreateHandler#logAndThrowParseMessageError',
message: 'Market in MarketCreate already exists',
}));
expect(loggerCrit).toHaveBeenCalledWith(expect.objectContaining({
at: 'onMessage#onMessage',
message: 'Error: Unable to parse message, this must be due to a bug in V4 node',
}));
expect(producerSendMock.mock.calls.length).toEqual(0);
});
await expect(onMessage(kafkaMessage)).rejects.toThrowError(
new ParseMessageError('Market in MarketCreate already exists'),
);

// Check that market in database is the old market.
const market: MarketFromDatabase = await MarketTable.findById(
defaultMarketCreate.marketId,
) as MarketFromDatabase;
expect(market.minPriceChangePpm).toEqual(50);

expect(loggerError).toHaveBeenCalledWith(expect.objectContaining({
at: 'MarketCreateHandler#logAndThrowParseMessageError',
message: 'Market in MarketCreate already exists',
}));
expect(loggerCrit).toHaveBeenCalledWith(expect.objectContaining({
at: 'onMessage#onMessage',
message: 'Error: Unable to parse message, this must be due to a bug in V4 node',
}));
expect(producerSendMock.mock.calls.length).toEqual(0);
});
});

function expectMarketMatchesEvent(
Expand Down
2 changes: 1 addition & 1 deletion indexer/services/ender/__tests__/scripts/scripts.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ describe('SQL Function Tests', () => {
});

async function getSingleRawQueryResultRow(query: string): Promise<object> {
const queryResult = await storeHelpers.rawQuery(query, {}).catch((error) => {
const queryResult = await storeHelpers.rawQuery(query, {}).catch((error: Error) => {
throw error;
});
return queryResult.rows[0].result;
Expand Down
3 changes: 3 additions & 0 deletions indexer/services/ender/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ export const configSchema = {
USE_LIQUIDATION_HANDLER_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_MARKET_CREATE_HANDLER_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_SUBACCOUNT_UPDATE_SQL_FUNCTION: parseBoolean({
default: true,
}),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import { logger } from '@dydxprotocol-indexer/base';
import { MarketFromDatabase, MarketTable, marketRefresher } from '@dydxprotocol-indexer/postgres';
import {
MarketFromDatabase,
MarketModel,
MarketTable,
marketRefresher,
storeHelpers,
} from '@dydxprotocol-indexer/postgres';
import { MarketEventV1 } from '@dydxprotocol-indexer/v4-protos';
import * as pg from 'pg';

import config from '../../config';
import { ConsolidatedKafkaEvent, MarketCreateEventMessage } from '../../lib/types';
import { Handler } from '../handler';

Expand All @@ -13,13 +21,20 @@ export class MarketCreateHandler extends Handler<MarketEventV1> {
return [`${this.eventType}_${this.event.marketId}`];
}

// eslint-disable-next-line @typescript-eslint/require-await
public async internalHandle(): Promise<ConsolidatedKafkaEvent[]> {
logger.info({
at: 'MarketCreateHandler#handle',
message: 'Received MarketEvent with MarketCreate.',
event: this.event,
});
if (config.USE_MARKET_CREATE_HANDLER_SQL_FUNCTION) {
return this.handleViaSqlFunction();
}
return this.handleViaKnexQueries();
}

// eslint-disable-next-line @typescript-eslint/require-await
public async handleViaKnexQueries(): Promise<ConsolidatedKafkaEvent[]> {
// MarketHandler already makes sure the event has 'marketCreate' as the oneofKind.
const marketCreate: MarketCreateEventMessage = this.event as MarketCreateEventMessage;

Expand All @@ -39,6 +54,37 @@ export class MarketCreateHandler extends Handler<MarketEventV1> {
return [];
}

private async handleViaSqlFunction(): Promise<ConsolidatedKafkaEvent[]> {
const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes;
const result: pg.QueryResult = await storeHelpers.rawQuery(
`SELECT dydx_market_create_handler(
'${JSON.stringify(MarketEventV1.decode(eventDataBinary))}'
) AS result;`,
{ txId: this.txId },
).catch((error: Error) => {
logger.error({
at: 'MarketCreateHandler#handleViaSqlFunction',
message: 'Failed to handle MarketEventV1',
error,
});

if (error.message.includes('Market in MarketCreate already exists')) {
const marketCreate: MarketCreateEventMessage = this.event as MarketCreateEventMessage;
this.logAndThrowParseMessageError(
'Market in MarketCreate already exists',
{ marketCreate },
);
}

throw error;
});

const market: MarketFromDatabase = MarketModel.fromJson(
result.rows[0].result.market) as MarketFromDatabase;
marketRefresher.updateMarket(market);
return [];
}

private async createMarket(marketCreate: MarketCreateEventMessage): Promise<void> {
await MarketTable.create({
id: marketCreate.marketId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ export class LiquidationHandler extends AbstractOrderFillHandler<OrderFillWithLi
'${USDC_ASSET_ID}'
) AS result;`,
{ txId: this.txId },
).catch((error) => {
).catch((error: Error) => {
logger.error({
at: 'orderHandler#handleViaSqlFunction',
at: 'liquidationHandler#handleViaSqlFunction',
message: 'Failed to handle OrderFillEventV1',
error,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export class OrderHandler extends AbstractOrderFillHandler<OrderFillWithLiquidit
'${isOrderCanceled}'
) AS result;`,
{ txId: this.txId },
).catch((error) => {
).catch((error: Error) => {
logger.error({
at: 'orderHandler#handleViaSqlFunction',
message: 'Failed to handle OrderFillEventV1',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export class SubaccountUpdateHandler extends Handler<SubaccountUpdate> {
${this.indexerTendermintEvent.eventIndex},
${transactionIndex}) AS result;`,
{ txId: this.txId },
).catch((error) => {
).catch((error: Error) => {
logger.error({
at: 'subaccountUpdateHandler#handleViaSqlFunction',
message: 'Failed to handle SubaccountUpdateEventV1',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ function newScript(name: string, scriptPath: string): PostgresFunction {
const scripts: string[] = [
'create_extension_pg_stat_statements.sql',
'create_extension_uuid_ossp.sql',
'dydx_market_create_handler.sql',
'dydx_event_id_from_parts.sql',
'dydx_event_to_transaction_index.sql',
'dydx_from_jsonlib_long.sql',
Expand Down Expand Up @@ -63,7 +64,7 @@ export async function createPostgresFunctions(): Promise<void> {
await Promise.all([
dbHelpers.createModelToJsonFunctions(),
...scripts.map((script: string) => storeHelpers.rawQuery(newScript(script, `../../scripts/${script}`).script, {})
.catch((error) => {
.catch((error: Error) => {
logger.error({
at: 'dbHelpers#createModelToJsonFunctions',
message: `Failed to create or replace function contained in ${script}`,
Expand Down
2 changes: 1 addition & 1 deletion indexer/services/ender/src/lib/on-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ async function createInitialRowsViaSqlFunction(
await storeHelpers.rawQuery(
queryString,
{ txId },
).catch((error) => {
).catch((error: Error) => {
logger.error({
at: 'on-message#createInitialRowsViaSqlFunction',
message: 'Failed to create initial rows',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
CREATE OR REPLACE FUNCTION dydx_market_create_handler(event_data jsonb) RETURNS jsonb AS $$
DECLARE
market_record_id integer;
market_record markets%ROWTYPE;
BEGIN
market_record_id = (event_data->'marketId')::integer;
SELECT * INTO market_record FROM markets WHERE "id" = market_record_id;

IF FOUND THEN
RAISE EXCEPTION 'Market in MarketCreate already exists. Record: %', market_record;
END IF;
Comment on lines +16 to +18
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message in the exception is clear and provides useful information for debugging. However, consider adding more context to the error message, such as the event data that caused the exception. This can help with debugging if the error occurs.

- RAISE EXCEPTION 'Market in MarketCreate already exists. Record: %', market_record;
+ RAISE EXCEPTION 'Market in MarketCreate already exists. Event data: %, Record: %', event_data, market_record;

Commitable suggestion (Beta)
Suggested change
IF FOUND THEN
RAISE EXCEPTION 'Market in MarketCreate already exists. Record: %', market_record;
END IF;
IF FOUND THEN
RAISE EXCEPTION 'Market in MarketCreate already exists. Event data: %, Record: %', event_data, market_record;
END IF;


market_record."id" = market_record_id;
market_record."pair" = event_data->'marketCreate'->'base'->>'pair';
market_record."exponent" = (event_data->'marketCreate'->'exponent')::integer;
market_record."minPriceChangePpm" = (event_data->'marketCreate'->'base'->'minPriceChangePpm')::integer;

INSERT INTO markets VALUES (market_record.*);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The INSERT INTO statement doesn't specify the columns for which the values are being inserted. This can lead to errors if the structure of the markets table changes in the future. It's a good practice to always specify the column names in an INSERT INTO statement.

- INSERT INTO markets VALUES (market_record.*);
+ INSERT INTO markets (id, pair, exponent, minPriceChangePpm) VALUES (market_record.id, market_record.pair, market_record.exponent, market_record.minPriceChangePpm);

Commitable suggestion (Beta)
Suggested change
INSERT INTO markets VALUES (market_record.*);
INSERT INTO markets (id, pair, exponent, minPriceChangePpm) VALUES (market_record.id, market_record.pair, market_record.exponent, market_record.minPriceChangePpm);


RETURN jsonb_build_object(
'market',
dydx_to_jsonb(market_record)
);
END;
$$ LANGUAGE plpgsql;
Loading