Skip to content

Commit

Permalink
[IND-477, IND-478, IND-479, IND-480] Update all stateful order handle…
Browse files Browse the repository at this point in the history
…rs to use a SQL function to perform reads & updates. (#774)
  • Loading branch information
lcwik authored and vincentwschau committed Nov 10, 2023
1 parent efe758f commit ded020c
Show file tree
Hide file tree
Showing 16 changed files with 469 additions and 33 deletions.
34 changes: 29 additions & 5 deletions indexer/packages/postgres/src/lib/order-translations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@ import {
*
* @param order
*/
export async function convertToIndexerOrder(
export function convertToIndexerOrderWithSubaccount(
order: OrderFromDatabase,
perpetualMarket: PerpetualMarketFromDatabase,
): Promise<IndexerOrder> {
const subaccount: SubaccountFromDatabase | undefined = await SubaccountTable.findById(
order.subaccountId,
);
subaccount: SubaccountFromDatabase,
): IndexerOrder {
if (!OrderTable.isLongTermOrConditionalOrder(order.orderFlags)) {
logger.error({
at: 'protocol-translations#convertToIndexerOrder',
Expand Down Expand Up @@ -77,3 +75,29 @@ export async function convertToIndexerOrder(

return indexerOrder;
}

/**
* Converts an order from the database to an IndexerOrder proto.
* This is used to resend open stateful orders to Vulcan during Indexer fast sync
* to uncross the orderbook.
*
* @param order
*/
export async function convertToIndexerOrder(
order: OrderFromDatabase,
perpetualMarket: PerpetualMarketFromDatabase,
): Promise<IndexerOrder> {
const subaccount: SubaccountFromDatabase | undefined = await SubaccountTable.findById(
order.subaccountId,
);

if (!subaccount === undefined) {
logger.error({
at: 'protocol-translations#convertToIndexerOrder',
message: 'Subaccount for order not found',
order,
});
throw new Error(`Subaccount for order not found: ${order.subaccountId}`);
}
return convertToIndexerOrderWithSubaccount(order, perpetualMarket, subaccount!);
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import Long from 'long';
import { producer } from '@dydxprotocol-indexer/kafka';
import { ConditionalOrderPlacementHandler } from '../../../src/handlers/stateful-order/conditional-order-placement-handler';
import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions';
import config from '../../../src/config';

describe('conditionalOrderPlacementHandler', () => {
beforeAll(async () => {
Expand Down Expand Up @@ -125,7 +126,14 @@ describe('conditionalOrderPlacementHandler', () => {
});
});

it('successfully places order', async () => {
it.each([
['via knex', false],
['via SQL function', true],
])('successfully places order (%s)', async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction;
const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent(
defaultStatefulOrderEvent,
);
Expand Down Expand Up @@ -154,15 +162,24 @@ describe('conditionalOrderPlacementHandler', () => {
updatedAt: defaultDateTime.toISO(),
updatedAtHeight: defaultHeight.toString(),
});
expectTimingStats();
if (!useSqlFunction) {
expectTimingStats();
}
expectOrderSubaccountKafkaMessage(
producerSendMock,
defaultOrder.orderId!.subaccountId!,
order!,
);
});

it('successfully upserts order', async () => {
it.each([
['via knex', false],
['via SQL function', true],
])('successfully upserts order (%s)', async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction;
const subaccountId: string = SubaccountTable.subaccountIdToUuid(
defaultOrder.orderId!.subaccountId!,
);
Expand Down Expand Up @@ -215,7 +232,9 @@ describe('conditionalOrderPlacementHandler', () => {
updatedAt: defaultDateTime.toISO(),
updatedAtHeight: defaultHeight.toString(),
});
expectTimingStats();
if (!useSqlFunction) {
expectTimingStats();
}
expectOrderSubaccountKafkaMessage(
producerSendMock,
defaultOrder.orderId!.subaccountId!,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ import { ORDER_FLAG_CONDITIONAL } from '@dydxprotocol-indexer/v4-proto-parser';
import { ConditionalOrderTriggeredHandler } from '../../../src/handlers/stateful-order/conditional-order-triggered-handler';
import { defaultPerpetualMarket } from '@dydxprotocol-indexer/postgres/build/__tests__/helpers/constants';
import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions';
import config from '../../../src/config';

describe('statefulOrderRemovalHandler', () => {
describe('conditionalOrderTriggeredHandler', () => {
beforeAll(async () => {
await dbHelpers.migrate();
await createPostgresFunctions();
Expand Down Expand Up @@ -110,7 +111,14 @@ describe('statefulOrderRemovalHandler', () => {
});
});

it('successfully triggers order and sends to vulcan', async () => {
it.each([
['via knex', false],
['via SQL function', true],
])('successfully triggers order and sends to vulcan (%s)', async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction;
await OrderTable.create({
...testConstants.defaultOrderGoodTilBlockTime,
orderFlags: conditionalOrderId.orderFlags.toString(),
Expand Down Expand Up @@ -147,16 +155,25 @@ describe('statefulOrderRemovalHandler', () => {
orderId: conditionalOrderId,
offchainUpdate: expectedOffchainUpdate,
});
expectTimingStats();
if (!useSqlFunction) {
expectTimingStats();
}
});

it('throws error when attempting to trigger an order that does not exist', async () => {
it.each([
['via knex', false],
['via SQL function', true],
])('throws error when attempting to trigger an order that does not exist (%s)', async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction;
const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent(
defaultStatefulOrderEvent,
);

await expect(onMessage(kafkaMessage)).rejects.toThrowError(
new Error(`Unable to update order status with orderId: ${orderId}`),
`Unable to update order status with orderId: ${orderId}`,
);
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import { STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE } from '../../../src/constants';
import { producer } from '@dydxprotocol-indexer/kafka';
import { ORDER_FLAG_LONG_TERM } from '@dydxprotocol-indexer/v4-proto-parser';
import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions';
import config from '../../../src/config';

describe('statefulOrderPlacementHandler', () => {
beforeAll(async () => {
Expand Down Expand Up @@ -138,12 +139,16 @@ describe('statefulOrderPlacementHandler', () => {

it.each([
// TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent
['stateful order placement', defaultStatefulOrderEvent],
['stateful long term order placement', defaultStatefulOrderLongTermEvent],
['stateful order placement (via knex)', defaultStatefulOrderEvent, false],
['stateful order placement (via SQL function)', defaultStatefulOrderEvent, true],
['stateful long term order placement (via knex)', defaultStatefulOrderLongTermEvent, false],
['stateful long term order placement (via SQL function)', defaultStatefulOrderLongTermEvent, true],
])('successfully places order with %s', async (
_name: string,
statefulOrderEvent: StatefulOrderEventV1,
useSqlFunction: boolean,
) => {
config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction;
const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent(
statefulOrderEvent,
);
Expand Down Expand Up @@ -172,7 +177,9 @@ describe('statefulOrderPlacementHandler', () => {
updatedAt: defaultDateTime.toISO(),
updatedAtHeight: defaultHeight.toString(),
});
expectTimingStats();
if (!useSqlFunction) {
expectTimingStats();
}

const expectedOffchainUpdate: OffChainUpdateV1 = {
orderPlace: {
Expand All @@ -189,12 +196,16 @@ describe('statefulOrderPlacementHandler', () => {

it.each([
// TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent
['stateful order placement', defaultStatefulOrderEvent],
['stateful long term order placement', defaultStatefulOrderLongTermEvent],
['stateful order placement (via knex)', defaultStatefulOrderEvent, false],
['stateful order placement (via SQL function)', defaultStatefulOrderEvent, true],
['stateful long term order placement (via knex)', defaultStatefulOrderLongTermEvent, false],
['stateful long term order placement (via SQL function)', defaultStatefulOrderLongTermEvent, true],
])('successfully upserts order with %s', async (
_name: string,
statefulOrderEvent: StatefulOrderEventV1,
useSqlFunction: boolean,
) => {
config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction;
const subaccountId: string = SubaccountTable.subaccountIdToUuid(
defaultOrder.orderId!.subaccountId!,
);
Expand Down Expand Up @@ -247,7 +258,9 @@ describe('statefulOrderPlacementHandler', () => {
updatedAt: defaultDateTime.toISO(),
updatedAtHeight: defaultHeight.toString(),
});
expectTimingStats();
if (!useSqlFunction) {
expectTimingStats();
}
// TODO[IND-20]: Add tests for vulcan messages
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import { stats, STATS_FUNCTION_NAME } from '@dydxprotocol-indexer/base';
import { STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE } from '../../../src/constants';
import { producer } from '@dydxprotocol-indexer/kafka';
import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions';
import config from '../../../src/config';

describe('statefulOrderRemovalHandler', () => {
beforeAll(async () => {
Expand Down Expand Up @@ -104,7 +105,14 @@ describe('statefulOrderRemovalHandler', () => {
});
});

it('successfully cancels and removes order', async () => {
it.each([
['via knex', false],
['via SQL function', true],
])('successfully cancels and removes order (%s)', async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction;
await OrderTable.create({
...testConstants.defaultOrder,
clientId: '0',
Expand All @@ -121,7 +129,9 @@ describe('statefulOrderRemovalHandler', () => {
updatedAt: defaultDateTime.toISO(),
updatedAtHeight: defaultHeight.toString(),
}));
expectTimingStats();
if (!useSqlFunction) {
expectTimingStats();
}

const expectedOffchainUpdate: OffChainUpdateV1 = {
orderRemove: {
Expand All @@ -137,13 +147,20 @@ describe('statefulOrderRemovalHandler', () => {
});
});

it('throws error when attempting to cancel an order that does not exist', async () => {
it.each([
['via knex', false],
['via SQL function', true],
])('throws error when attempting to cancel an order that does not exist (%s)', async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction;
const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent(
defaultStatefulOrderEvent,
);

await expect(onMessage(kafkaMessage)).rejects.toThrowError(
new Error(`Unable to update order status with orderId: ${orderId}`),
`Unable to update order status with orderId: ${orderId}`,
);
});
});
Expand Down
12 changes: 11 additions & 1 deletion indexer/services/ender/__tests__/scripts/scripts.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
IndexerTendermintEvent_BlockEvent,
AssetCreateEventV1,
SubaccountUpdateEventV1,
MarketEventV1,
MarketEventV1, IndexerOrder_ConditionType,
} from '@dydxprotocol-indexer/v4-protos';
import {
BUFFER_ENCODING_UTF_8,
Expand Down Expand Up @@ -175,6 +175,16 @@ describe('SQL Function Tests', () => {
expect(result).toEqual(protocolTranslations.protocolOrderTIFToTIF(value));
});

it.each([
['LIMIT', IndexerOrder_ConditionType.UNRECOGNIZED],
['LIMIT', IndexerOrder_ConditionType.CONDITION_TYPE_UNSPECIFIED],
['TAKE_PROFIT', IndexerOrder_ConditionType.CONDITION_TYPE_TAKE_PROFIT],
['STOP_LIMIT', IndexerOrder_ConditionType.CONDITION_TYPE_STOP_LOSS],
])('dydx_protocol_condition_type_to_order_type (%s)', async (_name: string, value: IndexerOrder_ConditionType) => {
const result = await getSingleRawQueryResultRow(`SELECT dydx_protocol_condition_type_to_order_type('${value}') AS result`);
expect(result).toEqual(protocolTranslations.protocolConditionTypeToOrderType(value));
});

it.each([
'0', '1', '-1', '10000000000000000000000000000', '-20000000000000000000000000000',
])('dydx_from_serializable_int (%s)', async (value: string) => {
Expand Down
3 changes: 3 additions & 0 deletions indexer/services/ender/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ export const configSchema = {
USE_PERPETUAL_MARKET_HANDLER_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_SUBACCOUNT_UPDATE_SQL_FUNCTION: parseBoolean({
default: true,
}),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,29 @@
import { logger } from '@dydxprotocol-indexer/base';
import {
OrderFromDatabase, OrderStatus, OrderTable, OrderUpdateObject, OrderCreateObject, SubaccountTable,
OrderSide, OrderType, protocolTranslations,
OrderFromDatabase,
OrderStatus,
OrderTable,
OrderUpdateObject,
OrderCreateObject,
SubaccountTable,
OrderSide,
OrderType,
protocolTranslations,
PerpetualMarketFromDatabase,
storeHelpers,
OrderModel,
PerpetualMarketModel,
SubaccountFromDatabase,
} from '@dydxprotocol-indexer/postgres';
import { IndexerOrderId, IndexerOrder, IndexerOrder_Side } from '@dydxprotocol-indexer/v4-protos';
import SubaccountModel from '@dydxprotocol-indexer/postgres/build/src/models/subaccount-model';
import {
IndexerOrderId,
IndexerOrder,
IndexerOrder_Side,
StatefulOrderEventV1,
} from '@dydxprotocol-indexer/v4-protos';
import { DateTime } from 'luxon';
import * as pg from 'pg';

import { STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE } from '../constants';
import { getPrice, getSize } from '../lib/helper';
Expand All @@ -21,6 +39,37 @@ export abstract class AbstractStatefulOrderHandler<T> extends Handler<T> {
];
}

protected async handleEventViaSqlFunction():
Promise<[OrderFromDatabase,
PerpetualMarketFromDatabase,
SubaccountFromDatabase | undefined]> {
const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes;
const result: pg.QueryResult = await storeHelpers.rawQuery(
`SELECT dydx_stateful_order_handler(
${this.block.height},
'${this.block.time?.toISOString()}',
'${JSON.stringify(StatefulOrderEventV1.decode(eventDataBinary))}'
) AS result;`,
{ txId: this.txId },
).catch((error: Error) => {
logger.error({
at: 'AbstractStatefulOrderHandler#handleEventViaSqlFunction',
message: 'Failed to handle StatefulOrderEventV1',
error,
});
throw error;
});

return [
OrderModel.fromJson(result.rows[0].result.order) as OrderFromDatabase,
PerpetualMarketModel.fromJson(
result.rows[0].result.perpetual_market) as PerpetualMarketFromDatabase,
result.rows[0].result.subaccount
? SubaccountModel.fromJson(result.rows[0].result.subaccount) as SubaccountFromDatabase
: undefined,
];
}

protected async updateOrderStatus(
orderIdProto: IndexerOrderId,
status: OrderStatus,
Expand Down
Loading

0 comments on commit ded020c

Please sign in to comment.