Skip to content

Commit

Permalink
[CT-839] Add blockHeight to subaccount websocket message (#1585)
Browse files Browse the repository at this point in the history
  • Loading branch information
dydxwill authored May 28, 2024
1 parent 6f6ec38 commit 50919ec
Show file tree
Hide file tree
Showing 28 changed files with 234 additions and 17 deletions.
4 changes: 4 additions & 0 deletions indexer/packages/kafka/src/websocket-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export function generateSubaccountMessageContents(
order: OrderFromDatabase | undefined,
perpetualMarket: PerpetualMarketFromDatabase,
placementStatus: OrderPlaceV1_OrderPlacementStatus,
blockHeight: string | undefined,
): SubaccountMessageContents {
const orderTIF: TimeInForce = protocolTranslations.protocolOrderTIFToTIF(
redisOrder.order!.timeInForce,
Expand Down Expand Up @@ -89,6 +90,7 @@ export function generateSubaccountMessageContents(
triggerPrice: getTriggerPrice(redisOrder.order!, perpetualMarket),
},
],
...(blockHeight && { blockHeight }),
};
return contents;
}
Expand All @@ -98,12 +100,14 @@ export function createSubaccountWebsocketMessage(
order: OrderFromDatabase | undefined,
perpetualMarket: PerpetualMarketFromDatabase,
placementStatus: OrderPlaceV1_OrderPlacementStatus,
blockHeight: string | undefined,
): Buffer {
const contents: SubaccountMessageContents = generateSubaccountMessageContents(
redisOrder,
order,
perpetualMarket,
placementStatus,
blockHeight,
);

const subaccountMessage: SubaccountMessage = SubaccountMessage.fromPartial({
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { clearData, migrate, teardown } from '../../src/helpers/db-helpers';
import { clear, getLatestBlockHeight, updateBlockHeight } from '../../src/loops/block-height-refresher';
import { defaultBlock2 } from '../helpers/constants';
import { seedData } from '../helpers/mock-generators';
import config from '../../src/config';

describe('blockHeightRefresher', () => {
beforeAll(async () => {
await migrate();
await seedData();
await updateBlockHeight();
});

afterAll(async () => {
await clearData();
await teardown();
});

describe('getLatestBlockHeight', () => {
it('successfully gets the latest block height after update', async () => {
await updateBlockHeight();
expect(getLatestBlockHeight()).toBe(defaultBlock2.blockHeight);
});
});

describe('clear', () => {
it('throws an error if block height does not exist', () => {
clear();
expect(() => getLatestBlockHeight()).toThrowError('Unable to find latest block height');
});

it('throws an error when clear is called in non-test environment', () => {
const originalEnv = config.NODE_ENV;
config.NODE_ENV = 'production';
expect(() => clear()).toThrowError('clear cannot be used in non-test env');
config.NODE_ENV = originalEnv;
});
});
});
1 change: 1 addition & 0 deletions indexer/packages/postgres/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export const postgresConfigSchema = {
ASSET_REFRESHER_INTERVAL_MS: parseInteger({ default: 30_000 }), // 30 seconds
MARKET_REFRESHER_INTERVAL_MS: parseInteger({ default: 30_000 }), // 30 seconds
LIQUIDITY_TIER_REFRESHER_INTERVAL_MS: parseInteger({ default: 30_000 }), // 30 seconds
BLOCK_HEIGHT_REFRESHER_INTERVAL_MS: parseInteger({ default: 1_000 }), // 1 second
USE_READ_REPLICA: parseBoolean({ default: false }),

// Optional environment variables.
Expand Down
1 change: 1 addition & 0 deletions indexer/packages/postgres/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export * as TradingRewardAggregationTable from './stores/trading-reward-aggregat

export * as perpetualMarketRefresher from './loops/perpetual-market-refresher';
export * as assetRefresher from './loops/asset-refresher';
export * as blockHeightRefresher from './loops/block-height-refresher';
export * as liquidityTierRefresher from './loops/liquidity-tier-refresher';

export * as uuid from './helpers/uuid';
Expand Down
61 changes: 61 additions & 0 deletions indexer/packages/postgres/src/loops/block-height-refresher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import {
stats,
logger,
NodeEnv,
} from '@dydxprotocol-indexer/base';

import config from '../config';
import * as BlockTable from '../stores/block-table';
import { BlockFromDatabase, Options } from '../types';
import { startUpdateLoop } from './loopHelper';

let latestBlockHeight: string = '';

/**
* Refresh loop to cache the latest block height from the database in-memory.
*/
export async function start(): Promise<void> {
await startUpdateLoop(
updateBlockHeight,
config.BLOCK_HEIGHT_REFRESHER_INTERVAL_MS,
'updateBlockHeight',
);
}

/**
* Updates in-memory latest block height.
*/
export async function updateBlockHeight(options?: Options): Promise<void> {
const startTime: number = Date.now();
try {
const latestBlock: BlockFromDatabase = await BlockTable.getLatest(
options || { readReplica: true },
);
latestBlockHeight = latestBlock.blockHeight;
stats.timing(`${config.SERVICE_NAME}.loops.update_block_height`, Date.now() - startTime);
// eslint-disable-next-line no-empty
} catch (error) { }
}

/**
* Gets the latest block height.
*/
export function getLatestBlockHeight(): string {
if (!latestBlockHeight) {
const message: string = 'Unable to find latest block height';
logger.error({
at: 'block-height-refresher#getLatestBlockHeight',
message,
});
throw new Error(message);
}
return latestBlockHeight;
}

export function clear(): void {
if (config.NODE_ENV !== NodeEnv.TEST) {
throw new Error('clear cannot be used in non-test env');
}

latestBlockHeight = '';
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export interface SubaccountMessageContents {
fills?: FillSubaccountMessageContents[],
transfers?: TransferSubaccountMessageContents,
tradingReward?: TradingRewardSubaccountMessageContents,
blockHeight?: string,
}

export interface PerpetualPositionSubaccountMessageContents {
Expand Down
7 changes: 5 additions & 2 deletions indexer/services/comlink/public/websocket-documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ This channel provides realtime information about orders, fills, transfers, perpe

### Initial Response

Returns everything from the `/v4/addresses/:address/subaccountNumber/:subaccountNumber`, and `/v4/orders?addresses=${address}&subaccountNumber=${subaccountNumber}&status=OPEN`.
Returns everything from the `/v4/addresses/:address/subaccountNumber/:subaccountNumber`, and `/v4/orders?addresses=${address}&subaccountNumber=${subaccountNumber}&status=OPEN` and the latest block height.

### Example
```tsx
Expand All @@ -84,7 +84,8 @@ Returns everything from the `/v4/addresses/:address/subaccountNumber/:subaccount
},
"marginEnabled": true
},
"orders": []
"orders": [],
"blockHeight": "5"
}
}
```
Expand Down Expand Up @@ -117,6 +118,8 @@ export interface SubaccountMessageContents {
fills?: FillSubaccountMessageContents[],
// Transfers that occur on the subaccount
transfers?: TransferSubaccountMessageContents,
// Latest block processed by Indexer
blockHeight?: string,
}

export interface PerpetualPositionSubaccountMessageContents {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,7 @@ async function expectUpdatedPositionsSubaccountKafkaMessage(
_.keyBy(perpMarkets, PerpetualMarketColumns.id),
assetPositions,
_.keyBy(assets, AssetColumns.id),
blockHeight,
);

expectSubaccountKafkaMessage({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ function expectTransfersSubaccountKafkaMessage(
event.sender!.subaccountId!,
event.sender!.subaccountId!,
event.recipient!.subaccountId,
blockHeight,
);
}

Expand All @@ -661,6 +662,7 @@ function expectTransfersSubaccountKafkaMessage(
event.recipient!.subaccountId!,
event.sender!.subaccountId,
event.recipient!.subaccountId!,
blockHeight,
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,7 @@ export async function expectFillSubaccountKafkaMessageFromLiquidationEvent(
[convertPerpetualPosition(position!)],
perpetualMarketRefresher.getPerpetualMarketsMap(),
),
blockHeight,
};

expectSubaccountKafkaMessage({
Expand Down Expand Up @@ -739,6 +740,7 @@ export function expectOrderSubaccountKafkaMessage(
orders: [
orderObject,
],
blockHeight,
};

expectSubaccountKafkaMessage({
Expand Down Expand Up @@ -798,6 +800,7 @@ export async function expectOrderFillAndPositionSubaccountKafkaMessageFromIds(
fills: [
generateFillSubaccountMessage(fill!, perpetualMarket!.ticker),
],
blockHeight,
};

if (position !== undefined) {
Expand Down
14 changes: 14 additions & 0 deletions indexer/services/ender/__tests__/helpers/kafka-helper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import { updateBlockCache } from '../../src/caches/block-cache';
import { defaultPreviousHeight, defaultWalletAddress } from './constants';

describe('kafka-helper', () => {
const blockHeight: string = '5';

describe('addPositionsToContents', () => {
const defaultPerpetualPosition: PerpetualPositionFromDatabase = {
id: '',
Expand Down Expand Up @@ -81,10 +83,12 @@ describe('kafka-helper', () => {
{},
[],
{},
blockHeight,
);

expect(contents.perpetualPositions).toEqual(undefined);
expect(contents.assetPositions).toEqual(undefined);
expect(contents.blockHeight).toEqual(blockHeight);
});

it('successfully adds one asset position and one perp position', () => {
Expand All @@ -100,6 +104,7 @@ describe('kafka-helper', () => {
{ [defaultPerpetualMarket.id]: defaultPerpetualMarket },
[defaultAssetPosition],
{ [defaultAsset.id]: defaultAsset },
blockHeight,
);

expect(contents.perpetualPositions!.length).toEqual(1);
Expand Down Expand Up @@ -129,6 +134,7 @@ describe('kafka-helper', () => {
side: 'LONG',
size: defaultAssetPosition.size,
});
expect(contents.blockHeight).toEqual(blockHeight);
});

it('successfully adds one asset position', () => {
Expand All @@ -144,6 +150,7 @@ describe('kafka-helper', () => {
{},
[defaultAssetPosition],
{ [defaultAsset.id]: defaultAsset },
blockHeight,
);

expect(contents.perpetualPositions).toBeUndefined();
Expand All @@ -158,6 +165,7 @@ describe('kafka-helper', () => {
side: 'LONG',
size: defaultAssetPosition.size,
});
expect(contents.blockHeight).toEqual(blockHeight);
});

it('successfully adds one perp position', () => {
Expand All @@ -173,6 +181,7 @@ describe('kafka-helper', () => {
{ [defaultPerpetualMarket.id]: defaultPerpetualMarket },
[],
{},
blockHeight,
);

expect(contents.perpetualPositions!.length).toEqual(1);
Expand All @@ -193,6 +202,7 @@ describe('kafka-helper', () => {
});

expect(contents.assetPositions).toBeUndefined();
expect(contents.blockHeight).toEqual(blockHeight);
});

it('successfully adds multiple positions', () => {
Expand Down Expand Up @@ -222,6 +232,7 @@ describe('kafka-helper', () => {
},
],
{ [defaultAsset.id]: defaultAsset },
blockHeight,
);

// check perpetual positions
Expand Down Expand Up @@ -277,6 +288,7 @@ describe('kafka-helper', () => {
side: 'LONG',
size: assetSize,
});
expect(contents.blockHeight).toEqual(blockHeight);
});
});

Expand Down Expand Up @@ -343,6 +355,7 @@ describe('kafka-helper', () => {
senderSubaccountId,
senderSubaccountId,
recipientSubaccountId,
transfer.createdAtHeight,
);

expect(contents.transfers).toEqual({
Expand All @@ -361,6 +374,7 @@ describe('kafka-helper', () => {
createdAtHeight: transfer.createdAtHeight,
transactionHash: transfer.transactionHash,
});
expect(contents.blockHeight).toEqual(transfer.createdAtHeight);
});

it('successfully adds a transfer_in', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ export abstract class AbstractOrderFillHandler<T> extends Handler<T> {
[position],
perpetualMarketRefresher.getPerpetualMarketsMap(),
),
blockHeight: this.block.height.toString(),
};
if (order !== undefined) {
message.orders = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export class ConditionalOrderPlacementHandler extends
orders: [
generateOrderSubaccountMessage(conditionalOrder, perpetualMarket.ticker),
],
blockHeight: this.block.height.toString(),
};

return [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { generateSubaccountMessageContents } from '@dydxprotocol-indexer/kafka';
import {
OrderFromDatabase, OrderModel,
OrderFromDatabase,
OrderModel,
OrderTable,
PerpetualMarketFromDatabase,
perpetualMarketRefresher,
Expand Down Expand Up @@ -98,6 +99,7 @@ export class StatefulOrderPlacementHandler
dbOrder,
perpetualMarket,
OrderPlaceV1_OrderPlacementStatus.ORDER_PLACEMENT_STATUS_OPENED,
this.block.height.toString(),
);

const subaccountIdProto: SubaccountId = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ export class SubaccountUpdateHandler extends Handler<SubaccountUpdate> {
perpetualMarketsMapping,
updatedAssetPositions,
assetsMap,
this.block.height.toString(),
);

return this.generateConsolidatedSubaccountKafkaEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export class TradingRewardsHandler extends Handler<TradingRewardsEventV1> {

const subaccountMessageContents: SubaccountMessageContents = {
tradingReward: tradingRewardSubaccountMessageContents,
blockHeight: this.block.height.toString(),
};

kafkaEvents.push(
Expand Down
2 changes: 2 additions & 0 deletions indexer/services/ender/src/handlers/transfer-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export class TransferHandler extends Handler<TransferEventV1> {
this.event.sender!.subaccountId!,
this.event.sender!.subaccountId,
this.event.recipient!.subaccountId,
this.block.height.toString(),
);

kafkaEvents.push(
Expand All @@ -74,6 +75,7 @@ export class TransferHandler extends Handler<TransferEventV1> {
this.event.recipient!.subaccountId!,
this.event.sender!.subaccountId,
this.event.recipient!.subaccountId,
this.block.height.toString(),
);

kafkaEvents.push(
Expand Down
Loading

0 comments on commit 50919ec

Please sign in to comment.