Skip to content

Commit

Permalink
[IND-481] Use a single SQL function to process a block. (#827)
Browse files Browse the repository at this point in the history
* [IND-481] Use a single SQL function to process a block.
  • Loading branch information
lcwik authored Dec 1, 2023
1 parent 5b49c90 commit 5901af4
Show file tree
Hide file tree
Showing 89 changed files with 657 additions and 993 deletions.
6 changes: 4 additions & 2 deletions indexer/packages/kafka/src/batch-kafka-producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { logger } from '@dydxprotocol-indexer/base';
import { Producer, RecordMetadata } from 'kafkajs';
import _ from 'lodash';

import config from './config';
import { KafkaTopics } from './types';

/**
Expand All @@ -28,7 +27,10 @@ export class BatchKafkaProducer {
constructor(
topic: KafkaTopics,
producer: Producer,
maxBatchSizeBytes: number = config.KAFKA_MAX_BATCH_WEBSOCKET_MESSAGE_SIZE_BYTES,
// Note that default parameters are bound during module load time making it difficult
// to modify the parameter during a test so we explicitly require callers to pass in
// config.KAFKA_MAX_BATCH_WEBSOCKET_MESSAGE_SIZE_BYTES.
maxBatchSizeBytes: number,
) {
this.maxBatchSizeBytes = maxBatchSizeBytes;
this.producer = producer;
Expand Down
26 changes: 10 additions & 16 deletions indexer/packages/postgres/src/helpers/stores-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,25 +64,19 @@ export async function rawQuery(
options: Options,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
): Promise<Knex.Raw<any>> {
if (options.readReplica) {
if (options.txId) {
return knexReadReplica.getConnection().raw(queryString).transacting(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
<Knex.Transaction<any, any>>Transaction.get(options.txId),
);
} else {
return knexReadReplica.getConnection().raw(queryString);
}
} else {
if (options.txId) {
return knexPrimary.raw(queryString).transacting(
const connection = options.readReplica ? knexReadReplica.getConnection() : knexPrimary;
let queryBuilder = options.bindings === undefined
? connection.raw(queryString) : connection.raw(queryString, options.bindings);
if (options.txId) {
queryBuilder = queryBuilder.transacting(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
<Knex.Transaction<any, any>>Transaction.get(options.txId),
);
} else {
return knexPrimary.raw(queryString);
}
);
}
if (options.sqlOptions) {
queryBuilder = queryBuilder.options(options.sqlOptions);
}
return queryBuilder;
}

/* ------- Bulk Helpers ------- */
Expand Down
5 changes: 5 additions & 0 deletions indexer/packages/postgres/src/types/utility-types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
/* ------- UTILITY TYPES ------- */
import { RawBinding } from 'knex';

export type IsoString = string;

export type RegexPattern = string;
Expand All @@ -17,6 +19,9 @@ export interface Options {
orderBy?: [string, Ordering][];
readReplica?: boolean,
random?: boolean;
bindings?: readonly RawBinding[];
// eslint-disable-next-line @typescript-eslint/no-explicit-any
sqlOptions?: Readonly<{ [key: string]: any }>;
}

export enum Ordering {
Expand Down
2 changes: 2 additions & 0 deletions indexer/services/bazooka/src/vulcan-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { Long } from '@dydxprotocol-indexer/v4-protos/build/codegen/helpers';
import Big from 'big.js';
import _ from 'lodash';

import config from './config';
import { ZERO } from './constants';

interface VulcanMessage {
Expand Down Expand Up @@ -134,6 +135,7 @@ export async function sendStatefulOrderMessages() {
const batchProducer: BatchKafkaProducer = new BatchKafkaProducer(
KafkaTopics.TO_VULCAN,
producer,
config.KAFKA_MAX_BATCH_WEBSOCKET_MESSAGE_SIZE_BYTES,
);
for (const message of messages) {
batchProducer.addMessageAndMaybeFlush(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ describe('assetHandler', () => {

const handler: AssetCreationHandler = new AssetCreationHandler(
block,
0,
indexerTendermintEvent,
0,
defaultAssetCreateEvent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ describe('fundingHandler', () => {

const handler: FundingHandler = new FundingHandler(
block,
0,
indexerTendermintEvent,
0,
defaultFundingUpdateSampleEvent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ describe('liquidityTierHandler', () => {

const handler: LiquidityTierHandler = new LiquidityTierHandler(
block,
0,
indexerTendermintEvent,
0,
defaultLiquidityTierUpsertEvent,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { logger, ParseMessageError } from '@dydxprotocol-indexer/base';
import { logger } from '@dydxprotocol-indexer/base';
import {
dbHelpers, MarketFromDatabase, MarketTable, testMocks,
} from '@dydxprotocol-indexer/postgres';
Expand Down Expand Up @@ -46,7 +46,6 @@ describe('marketCreateHandler', () => {
});

const loggerCrit = jest.spyOn(logger, 'crit');
const loggerError = jest.spyOn(logger, 'error');
const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send');

describe('getParallelizationIds', () => {
Expand Down Expand Up @@ -75,6 +74,7 @@ describe('marketCreateHandler', () => {

const handler: MarketCreateHandler = new MarketCreateHandler(
block,
0,
indexerTendermintEvent,
0,
marketEvent,
Expand Down Expand Up @@ -128,7 +128,7 @@ describe('marketCreateHandler', () => {
txHash: defaultTxHash,
});
await expect(onMessage(kafkaMessage)).rejects.toThrowError(
new ParseMessageError('Market in MarketCreate already exists'),
'Market in MarketCreate already exists',
);

// Check that market in database is the old market.
Expand All @@ -137,13 +137,9 @@ describe('marketCreateHandler', () => {
) as MarketFromDatabase;
expect(market.minPriceChangePpm).toEqual(50);

expect(loggerError).toHaveBeenCalledWith(expect.objectContaining({
at: 'MarketCreateHandler#logAndThrowParseMessageError',
message: 'Market in MarketCreate already exists',
}));
expect(loggerCrit).toHaveBeenCalledWith(expect.objectContaining({
at: 'onMessage#onMessage',
message: 'Error: Unable to parse message, this must be due to a bug in V4 node',
at: expect.stringContaining('PL/pgSQL function dydx_market_create_handler('),
message: expect.stringContaining('Market in MarketCreate already exists'),
}));
expect(producerSendMock.mock.calls.length).toEqual(0);
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { logger, ParseMessageError } from '@dydxprotocol-indexer/base';
import { logger } from '@dydxprotocol-indexer/base';
import {
dbHelpers, MarketFromDatabase, MarketTable, testMocks,
} from '@dydxprotocol-indexer/postgres';
Expand Down Expand Up @@ -40,7 +40,6 @@ describe('marketModifyHandler', () => {
});

const loggerCrit = jest.spyOn(logger, 'crit');
const loggerError = jest.spyOn(logger, 'error');
const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send');

describe('getParallelizationIds', () => {
Expand Down Expand Up @@ -69,6 +68,7 @@ describe('marketModifyHandler', () => {

const handler: MarketModifyHandler = new MarketModifyHandler(
block,
0,
indexerTendermintEvent,
0,
marketEvent,
Expand Down Expand Up @@ -115,16 +115,11 @@ describe('marketModifyHandler', () => {
});

await expect(onMessage(kafkaMessage)).rejects.toThrowError(
new ParseMessageError('Market in MarketModify doesn\'t exist'),
'Market in MarketModify doesn\'t exist',
);

expect(loggerError).toHaveBeenCalledWith(expect.objectContaining({
at: 'MarketModifyHandler#logAndThrowParseMessageError',
message: 'Market in MarketModify doesn\'t exist',
}));
expect(loggerCrit).toHaveBeenCalledWith(expect.objectContaining({
at: 'onMessage#onMessage',
message: 'Error: Unable to parse message, this must be due to a bug in V4 node',
at: expect.stringContaining('PL/pgSQL function dydx_market_modify_handler('),
message: expect.stringContaining('Market in MarketModify doesn\'t exist'),
}));
expect(producerSendMock.mock.calls.length).toEqual(0);
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { logger, ParseMessageError } from '@dydxprotocol-indexer/base';
import { logger } from '@dydxprotocol-indexer/base';
import {
dbHelpers,
MarketFromDatabase,
Expand Down Expand Up @@ -54,7 +54,6 @@ describe('marketPriceUpdateHandler', () => {
jest.resetAllMocks();
});
const loggerCrit = jest.spyOn(logger, 'crit');
const loggerError = jest.spyOn(logger, 'error');
const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send');

describe('getParallelizationIds', () => {
Expand Down Expand Up @@ -83,6 +82,7 @@ describe('marketPriceUpdateHandler', () => {

const handler: MarketPriceUpdateHandler = new MarketPriceUpdateHandler(
block,
0,
indexerTendermintEvent,
0,
marketEvent,
Expand Down Expand Up @@ -111,16 +111,12 @@ describe('marketPriceUpdateHandler', () => {
});

await expect(onMessage(kafkaMessage)).rejects.toThrowError(
new ParseMessageError('MarketPriceUpdateEvent contains a non-existent market id'),
'MarketPriceUpdateEvent contains a non-existent market id',
);

expect(loggerError).toHaveBeenCalledWith(expect.objectContaining({
at: 'MarketPriceUpdateHandler#logAndThrowParseMessageError',
message: 'MarketPriceUpdateEvent contains a non-existent market id',
}));
expect(loggerCrit).toHaveBeenCalledWith(expect.objectContaining({
at: 'onMessage#onMessage',
message: 'Error: Unable to parse message, this must be due to a bug in V4 node',
at: expect.stringContaining('PL/pgSQL function dydx_market_price_update_handler('),
message: expect.stringContaining('MarketPriceUpdateEvent contains a non-existent market id'),
}));
expect(producerSendMock.mock.calls.length).toEqual(0);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ describe('DeleveragingHandler', () => {

const handler: DeleveragingHandler = new DeleveragingHandler(
block,
0,
indexerTendermintEvent,
0,
defaultDeleveragingEvent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ describe('LiquidationHandler', () => {

const handler: LiquidationHandler = new LiquidationHandler(
block,
0,
indexerTendermintEvent,
0,
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ describe('OrderHandler', () => {

const handler: OrderHandler = new OrderHandler(
block,
0,
indexerTendermintEvent,
0,
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ describe('perpetualMarketHandler', () => {

const handler: PerpetualMarketCreationHandler = new PerpetualMarketCreationHandler(
block,
0,
indexerTendermintEvent,
0,
defaultPerpetualMarketCreateEvent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ describe('conditionalOrderPlacementHandler', () => {

const handler: ConditionalOrderPlacementHandler = new ConditionalOrderPlacementHandler(
block,
0,
indexerTendermintEvent,
0,
defaultStatefulOrderEvent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ describe('conditionalOrderTriggeredHandler', () => {

const handler: ConditionalOrderTriggeredHandler = new ConditionalOrderTriggeredHandler(
block,
0,
indexerTendermintEvent,
0,
defaultStatefulOrderEvent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ describe('statefulOrderPlacementHandler', () => {

const handler: StatefulOrderPlacementHandler = new StatefulOrderPlacementHandler(
block,
0,
indexerTendermintEvent,
0,
statefulOrderEvent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ describe('statefulOrderRemovalHandler', () => {

const handler: StatefulOrderRemovalHandler = new StatefulOrderRemovalHandler(
block,
0,
indexerTendermintEvent,
0,
defaultStatefulOrderEvent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ describe('subaccountUpdateHandler', () => {

const handler: SubaccountUpdateHandler = new SubaccountUpdateHandler(
block,
0,
indexerTendermintEvent,
0,
defaultEmptySubaccountUpdate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ describe('transferHandler', () => {

const handler: TransferHandler = new TransferHandler(
block,
0,
indexerTendermintEvent,
0,
defaultTransferEvent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ describe('update-clob-pair-handler', () => {

const handler: UpdateClobPairHandler = new UpdateClobPairHandler(
block,
0,
indexerTendermintEvent,
0,
defaultUpdateClobPairEvent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ describe('update-perpetual-handler', () => {

const handler: UpdatePerpetualHandler = new UpdatePerpetualHandler(
block,
0,
indexerTendermintEvent,
0,
defaultUpdatePerpetualEvent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ function generateFakeHandler(parallelizationIds: string[]): FakeHandler {

const handler: FakeHandler = new FakeHandler(
block,
0,
defaultEvent,
fakeTxId,
{},
Expand Down
14 changes: 8 additions & 6 deletions indexer/services/ender/__tests__/lib/block-processor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { BlockProcessor } from '../../src/lib/block-processor';
import { BatchedHandlers } from '../../src/lib/batched-handlers';
import { SyncHandlers } from '../../src/lib/sync-handlers';
import { mock, MockProxy } from 'jest-mock-extended';
import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions';

describe('block-processor', () => {
let batchedHandlers: MockProxy<BatchedHandlers>;
Expand All @@ -36,6 +37,7 @@ describe('block-processor', () => {

beforeAll(async () => {
await dbHelpers.migrate();
await createPostgresFunctions();
});

afterEach(async () => {
Expand Down Expand Up @@ -90,18 +92,18 @@ describe('block-processor', () => {
transactionIndex0,
eventIndex0,
),
createIndexerTendermintEvent(
DydxIndexerSubtypes.ASSET,
defaultAssetEventBinary,
transactionIndex0,
eventIndex1,
),
createIndexerTendermintEvent(
DydxIndexerSubtypes.MARKET,
defaultMarketEventBinary,
transactionIndex1,
eventIndex0,
),
createIndexerTendermintEvent(
DydxIndexerSubtypes.ASSET,
defaultAssetEventBinary,
transactionIndex0,
eventIndex1,
),
];

it('batched handlers called before sync handlers for normal blocks', async () => {
Expand Down
Loading

0 comments on commit 5901af4

Please sign in to comment.