Skip to content

Commit

Permalink
[IND-469] Update ender market price update handler to execute updat…
Browse files Browse the repository at this point in the history
…es via a SQL function. (#746)
  • Loading branch information
lcwik authored Nov 3, 2023
1 parent 755b0b9 commit 5e0c4e0
Show file tree
Hide file tree
Showing 10 changed files with 305 additions and 108 deletions.
2 changes: 2 additions & 0 deletions indexer/packages/postgres/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import config from './config';
import AssetPositionModel from './models/asset-position-model';
import FillModel from './models/fill-model';
import MarketModel from './models/market-model';
import OraclePriceModel from './models/oracle-price-model';
import OrderModel from './models/order-model';
import PerpetualMarketModel from './models/perpetual-market-model';
import PerpetualPositionModel from './models/perpetual-position-model';
Expand Down Expand Up @@ -85,6 +86,7 @@ export const SQL_TO_JSON_DEFINED_MODELS = [
AssetPositionModel,
FillModel,
MarketModel,
OraclePriceModel,
OrderModel,
PerpetualMarketModel,
PerpetualPositionModel,
Expand Down
1 change: 1 addition & 0 deletions indexer/packages/postgres/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export { postgresConfigSchema } from './config';
export { default as AssetPositionModel } from './models/asset-position-model';
export { default as FillModel } from './models/fill-model';
export { default as MarketModel } from './models/market-model';
export { default as OraclePriceModel } from './models/oracle-price-model';
export { default as OrderModel } from './models/order-model';
export { default as PerpetualMarketModel } from './models/perpetual-market-model';
export { default as PerpetualPositionModel } from './models/perpetual-position-model';
Expand Down
16 changes: 16 additions & 0 deletions indexer/packages/postgres/src/models/oracle-price-model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,22 @@ export default class OraclePriceModel extends Model {
};
}

/**
* A mapping from column name to JSON conversion expected.
* See getSqlConversionForDydxModelTypes for valid conversions.
*
* TODO(IND-239): Ensure that jsonSchema() / sqlToJsonConversions() / model fields match.
*/
static get sqlToJsonConversions() {
return {
id: 'string',
marketId: 'integer',
price: 'string',
effectiveAt: 'date-time',
effectiveAtHeight: 'string',
};
}

id!: string;

marketId!: number;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { MarketPriceUpdateHandler } from '../../../src/handlers/markets/market-p
import Long from 'long';
import { getPrice } from '../../../src/caches/price-cache';
import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions';
import config from '../../../src/config';

describe('marketPriceUpdateHandler', () => {
beforeAll(async () => {
Expand Down Expand Up @@ -95,125 +96,170 @@ describe('marketPriceUpdateHandler', () => {
});
});

it('fails when no market exists', async () => {
const transactionIndex: number = 0;
const marketPriceUpdate: MarketEventV1 = {
marketId: 5,
priceUpdate: {
priceWithExponent: Long.fromValue(50000000, true),
},
};
const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({
marketEvents: [marketPriceUpdate],
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});

await expect(onMessage(kafkaMessage)).rejects.toThrowError(
new ParseMessageError('MarketPriceUpdateEvent contains a non-existent market id'),
);

expect(loggerError).toHaveBeenCalledWith(expect.objectContaining({
at: 'MarketPriceUpdateHandler#logAndThrowParseMessageError',
message: 'MarketPriceUpdateEvent contains a non-existent market id',
}));
expect(loggerCrit).toHaveBeenCalledWith(expect.objectContaining({
at: 'onMessage#onMessage',
message: 'Error: Unable to parse message, this must be due to a bug in V4 node',
}));
expect(producerSendMock.mock.calls.length).toEqual(0);
});
it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])(
'fails when no market exists (%s)',
async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_MARKET_PRICE_UPDATE_HANDLER_SQL_FUNCTION = useSqlFunction;
const transactionIndex: number = 0;
const marketPriceUpdate: MarketEventV1 = {
marketId: 5,
priceUpdate: {
priceWithExponent: Long.fromValue(50000000, true),
},
};
const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({
marketEvents: [marketPriceUpdate],
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});

it('successfully inserts new oracle price for existing market', async () => {
const transactionIndex: number = 0;
await expect(onMessage(kafkaMessage)).rejects.toThrowError(
new ParseMessageError('MarketPriceUpdateEvent contains a non-existent market id'),
);

const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({
marketEvents: [defaultMarketPriceUpdate],
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
expect(loggerError).toHaveBeenCalledWith(expect.objectContaining({
at: 'MarketPriceUpdateHandler#logAndThrowParseMessageError',
message: 'MarketPriceUpdateEvent contains a non-existent market id',
}));
expect(loggerCrit).toHaveBeenCalledWith(expect.objectContaining({
at: 'onMessage#onMessage',
message: 'Error: Unable to parse message, this must be due to a bug in V4 node',
}));
expect(producerSendMock.mock.calls.length).toEqual(0);
});

await onMessage(kafkaMessage);
it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])(
'successfully inserts new oracle price for existing market (%s)',
async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_MARKET_PRICE_UPDATE_HANDLER_SQL_FUNCTION = useSqlFunction;
const transactionIndex: number = 0;

const { market, oraclePrice } = await getDbState(defaultMarketPriceUpdate);
const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({
marketEvents: [defaultMarketPriceUpdate],
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});

expectOraclePriceMatchesEvent(
defaultMarketPriceUpdate as MarketPriceUpdateEventMessage,
oraclePrice,
market,
defaultHeight,
);
await onMessage(kafkaMessage);

expect(getPrice(oraclePrice.marketId)).toEqual(oraclePrice.price);
const { market, oraclePrice } = await getDbState(defaultMarketPriceUpdate);

const contents: MarketMessageContents = generateOraclePriceContents(
oraclePrice,
market.pair,
);
expectOraclePriceMatchesEvent(
defaultMarketPriceUpdate as MarketPriceUpdateEventMessage,
oraclePrice,
market,
defaultHeight,
);

expect(getPrice(oraclePrice.marketId)).toEqual(oraclePrice.price);

expectMarketKafkaMessage({
producerSendMock,
contents: JSON.stringify(contents),
const contents: MarketMessageContents = generateOraclePriceContents(
oraclePrice,
market.pair,
);

expectMarketKafkaMessage({
producerSendMock,
contents: JSON.stringify(contents),
});
});
});

it('successfully inserts new oracle price for market created in same block', async () => {
const transactionIndex: number = 0;
const newMarketId: number = 3000;

// Include an event to create the market
const marketCreate: MarketEventV1 = {
marketId: newMarketId,
marketCreate: {
base: {
pair: 'NEWTOKEN-USD',
minPriceChangePpm: 500,
it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])(
'successfully inserts new oracle price for market created in same block (%s)',
async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_MARKET_PRICE_UPDATE_HANDLER_SQL_FUNCTION = useSqlFunction;
const transactionIndex: number = 0;
const newMarketId: number = 3000;

// Include an event to create the market
const marketCreate: MarketEventV1 = {
marketId: newMarketId,
marketCreate: {
base: {
pair: 'NEWTOKEN-USD',
minPriceChangePpm: 500,
},
exponent: -5,
},
exponent: -5,
},
};
const marketPriceUpdate: MarketEventV1 = {
marketId: newMarketId,
priceUpdate: {
priceWithExponent: Long.fromValue(50000000),
},
};

const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({
marketEvents: [marketCreate, marketPriceUpdate],
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});
};
const marketPriceUpdate: MarketEventV1 = {
marketId: newMarketId,
priceUpdate: {
priceWithExponent: Long.fromValue(50000000),
},
};

const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({
marketEvents: [marketCreate, marketPriceUpdate],
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});

await onMessage(kafkaMessage);
await onMessage(kafkaMessage);

const { market, oraclePrice } = await getDbState(marketPriceUpdate);
const { market, oraclePrice } = await getDbState(marketPriceUpdate);

expectOraclePriceMatchesEvent(
marketPriceUpdate as MarketPriceUpdateEventMessage,
oraclePrice,
market,
defaultHeight,
);
expectOraclePriceMatchesEvent(
marketPriceUpdate as MarketPriceUpdateEventMessage,
oraclePrice,
market,
defaultHeight,
);

expect(getPrice(oraclePrice.marketId)).toEqual(oraclePrice.price);
expect(getPrice(oraclePrice.marketId)).toEqual(oraclePrice.price);

const contents: MarketMessageContents = generateOraclePriceContents(
oraclePrice,
market.pair,
);
const contents: MarketMessageContents = generateOraclePriceContents(
oraclePrice,
market.pair,
);

expectMarketKafkaMessage({
producerSendMock,
contents: JSON.stringify(contents),
expectMarketKafkaMessage({
producerSendMock,
contents: JSON.stringify(contents),
});
});
});
});

async function getDbState(marketPriceUpdate: MarketEventV1): Promise<any> {
Expand Down
29 changes: 22 additions & 7 deletions indexer/services/ender/__tests__/scripts/scripts.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
PositionSide,
TendermintEventTable,
FillTable,
OraclePriceTable,
OrderTable,
protocolTranslations,
SubaccountTable,
Expand Down Expand Up @@ -350,15 +351,29 @@ describe('SQL Function Tests', () => {
}
});

it('dydx_uuid_from_transaction_parts (%s)', async () => {
const transactionParts = {
blockHeight: '123456',
transactionIndex: 123,
};
it.each([
[
'123456',
123,
],
])('dydx_uuid_from_transaction_parts (%s, %s)', async (blockHeight: string, transactionIndex: number) => {
const result = await getSingleRawQueryResultRow(
`SELECT dydx_uuid_from_transaction_parts('${blockHeight}', '${transactionIndex}') AS result`);
expect(result).toEqual(
TransactionTable.uuid(blockHeight, transactionIndex),
);
});

it.each([
[
123,
'123456',
],
])('dydx_uuid_from_oracle_price_parts (%s, %s)', async (marketId: number, blockHeight: string) => {
const result = await getSingleRawQueryResultRow(
`SELECT dydx_uuid_from_transaction_parts('${transactionParts.blockHeight}', '${transactionParts.transactionIndex}') AS result`);
`SELECT dydx_uuid_from_oracle_price_parts('${marketId}', '${blockHeight}') AS result`);
expect(result).toEqual(
TransactionTable.uuid(transactionParts.blockHeight, transactionParts.transactionIndex),
OraclePriceTable.uuid(marketId, blockHeight),
);
});

Expand Down
3 changes: 3 additions & 0 deletions indexer/services/ender/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ export const configSchema = {
USE_MARKET_MODIFY_HANDLER_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_MARKET_PRICE_UPDATE_HANDLER_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_SUBACCOUNT_UPDATE_SQL_FUNCTION: parseBoolean({
default: true,
}),
Expand Down
Loading

0 comments on commit 5e0c4e0

Please sign in to comment.