Skip to content

Commit

Permalink
[CT-919] Add pnl stats task (backport #1675) (#1801)
Browse files Browse the repository at this point in the history
Co-authored-by: dydxwill <119354122+dydxwill@users.noreply.github.com>
  • Loading branch information
mergify[bot] and dydxwill authored Jun 27, 2024
1 parent 6029fa3 commit 141e4cc
Show file tree
Hide file tree
Showing 8 changed files with 425 additions and 0 deletions.
83 changes: 83 additions & 0 deletions indexer/packages/postgres/__tests__/stores/pnl-ticks-table.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,4 +270,87 @@ describe('PnlTicks store', () => {
expect(mostRecent[defaultSubaccountId].equity).toEqual('1014');
expect(mostRecent[defaultSubaccountId2].equity).toEqual('200');
});

it('createMany PnlTicks, find most recent pnl tick times for each account', async () => {
const now = DateTime.utc();
const nowMinusOneHour = now.minus({ hours: 1 });
const nowMinusThreeHours = now.minus({ hours: 3 });
const nowMinusNineHours = now.minus({ hours: 9 });
const nowMinusElevenHours = now.minus({ hours: 11 });

await Promise.all([
BlockTable.create({
blockHeight: '3',
time: defaultBlock.time,
}),
BlockTable.create({
blockHeight: '5',
time: defaultBlock.time,
}),
]);

await PnlTicksTable.createMany([
{
subaccountId: defaultSubaccountId,
equity: '1092',
createdAt: nowMinusOneHour.toISO(),
totalPnl: '1000',
netTransfers: '50',
blockHeight: defaultBlock.blockHeight,
blockTime: defaultBlock.time,
},
{
subaccountId: defaultSubaccountId,
equity: '1097',
createdAt: nowMinusThreeHours.toISO(),
totalPnl: '1000',
netTransfers: '50',
blockHeight: '3',
blockTime: defaultBlock.time,
},
{
subaccountId: defaultSubaccountId,
equity: '1011',
createdAt: nowMinusElevenHours.toISO(),
totalPnl: '1000',
netTransfers: '50',
blockHeight: '5',
blockTime: defaultBlock.time,
},
{
subaccountId: defaultSubaccountId,
equity: '1014',
createdAt: nowMinusNineHours.toISO(),
totalPnl: '1000',
netTransfers: '50',
blockHeight: '5',
blockTime: defaultBlock.time,
},
{
subaccountId: defaultSubaccountId2,
equity: '100',
createdAt: now.toISO(),
totalPnl: '1000',
netTransfers: '50',
blockHeight: '2',
blockTime: defaultBlock2.time,
},
{
subaccountId: defaultSubaccountId2,
equity: '200',
createdAt: nowMinusNineHours.toISO(),
totalPnl: '1000',
netTransfers: '50',
blockHeight: '5',
blockTime: defaultBlock.time,
},
]);

const mostRecentTimes: {
[accountId: string]: string
} = await PnlTicksTable.findMostRecentPnlTickTimeForEachAccount('3');

expect(mostRecentTimes[defaultSubaccountId]).toEqual(nowMinusThreeHours.toISO());
expect(mostRecentTimes[defaultSubaccountId2]).toEqual(nowMinusNineHours.toISO());
});
});
42 changes: 42 additions & 0 deletions indexer/packages/postgres/__tests__/stores/transfer-table.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
defaultSubaccountId3,
defaultTendermintEventId,
defaultTendermintEventId2,
defaultTendermintEventId3,
defaultTransfer,
defaultTransfer2,
defaultTransfer3,
Expand All @@ -31,6 +32,7 @@ import {
} from '../helpers/constants';
import Big from 'big.js';
import { CheckViolationError } from 'objection';
import { DateTime } from 'luxon';

describe('Transfer store', () => {
beforeEach(async () => {
Expand Down Expand Up @@ -566,4 +568,44 @@ describe('Transfer store', () => {
[defaultAsset2.id]: Big('-5.3'),
});
});

it('Successfully gets the latest createdAt for subaccounts', async () => {
const now = DateTime.utc();

const transfer2 = {
senderSubaccountId: defaultSubaccountId2,
recipientSubaccountId: defaultSubaccountId,
assetId: defaultAsset2.id,
size: '5',
eventId: defaultTendermintEventId3,
transactionHash: '', // TODO: Add a real transaction Hash
createdAt: now.minus({ hours: 2 }).toISO(),
createdAtHeight: createdHeight,
};

const transfer3 = {
senderSubaccountId: defaultSubaccountId2,
recipientSubaccountId: defaultSubaccountId,
assetId: defaultAsset2.id,
size: '5',
eventId: defaultTendermintEventId2,
transactionHash: '', // TODO: Add a real transaction Hash
createdAt: now.minus({ hours: 1 }).toISO(),
createdAtHeight: createdHeight,
};

await Promise.all([
TransferTable.create(defaultTransfer),
TransferTable.create(transfer2),
TransferTable.create(transfer3),
]);

const transferTimes: { [subaccountId: string]: string } = await
TransferTable.getLastTransferTimeForSubaccounts(
[defaultSubaccountId, defaultSubaccountId2],
);

expect(transferTimes[defaultSubaccountId]).toEqual(defaultTransfer.createdAt);
expect(transferTimes[defaultSubaccountId2]).toEqual(defaultTransfer.createdAt);
});
});
24 changes: 24 additions & 0 deletions indexer/packages/postgres/src/stores/pnl-ticks-table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,27 @@ export async function findMostRecentPnlTickForEachAccount(
'subaccountId',
);
}

export async function findMostRecentPnlTickTimeForEachAccount(
createdOnOrAfterHeight: string,
): Promise<{
[subaccountId: string]: string
}> {
verifyAllInjectableVariables([createdOnOrAfterHeight]);

const result: {
rows: { subaccountId: string, createdAt: string }[]
} = await knexReadReplica.getConnection().raw(
`
SELECT DISTINCT ON ("subaccountId") "subaccountId", "createdAt"
FROM "pnl_ticks"
WHERE "blockHeight" >= '${createdOnOrAfterHeight}'
ORDER BY "subaccountId" ASC, "createdAt" DESC;
`,
) as unknown as { rows: { subaccountId: string, createdAt: string }[] };

return result.rows.reduce((acc, row) => {
acc[row.subaccountId] = row.createdAt;
return acc;
}, {} as { [subaccountId: string]: string });
}
50 changes: 50 additions & 0 deletions indexer/packages/postgres/src/stores/transfer-table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -462,3 +462,53 @@ export async function findById(
.findById(id)
.returning('*');
}

export async function getLastTransferTimeForSubaccounts(
subaccountIds: string[],
options: Options = DEFAULT_POSTGRES_OPTIONS,
): Promise<{ [subaccountId: string]: string }> {
if (!subaccountIds.length) {
return {};
}

let baseQuery: QueryBuilder<TransferModel> = setupBaseQuery<TransferModel>(
TransferModel,
options,
);

baseQuery = baseQuery
.select('senderSubaccountId', 'recipientSubaccountId', 'createdAt')
.where((queryBuilder) => {
// eslint-disable-next-line no-void
void queryBuilder.whereIn('senderSubaccountId', subaccountIds)
.orWhereIn('recipientSubaccountId', subaccountIds);
})
.orderBy('createdAt', 'desc');

const result: TransferFromDatabase[] = await baseQuery;

const mapping: { [subaccountId: string]: string } = {};

result.forEach((row) => {
if (
row.senderSubaccountId !== undefined &&
subaccountIds.includes(row.senderSubaccountId)
) {
if (!mapping[row.senderSubaccountId] || row.createdAt > mapping[row.senderSubaccountId]) {
mapping[row.senderSubaccountId] = row.createdAt;
}
}
if (
row.recipientSubaccountId !== undefined &&
subaccountIds.includes(row.recipientSubaccountId)
) {
if (
!mapping[row.recipientSubaccountId] ||
row.createdAt > mapping[row.recipientSubaccountId]) {
mapping[row.recipientSubaccountId] = row.createdAt;
}
}
});

return mapping;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import { logger, stats } from '@dydxprotocol-indexer/base';
import {
BlockTable,
PnlTicksTable,
SubaccountTable,
TransferTable,
testMocks,
dbHelpers,
} from '@dydxprotocol-indexer/postgres';
import runTask from '../../src/tasks/pnl-instrumentation';
import { DateTime } from 'luxon';
import config from '../../src/config';

describe('pnl-instrumentation', () => {
beforeAll(async () => {
await dbHelpers.migrate();
await dbHelpers.clearData();
jest.spyOn(stats, 'gauge');
jest.spyOn(logger, 'error');
});

beforeEach(async () => {
await testMocks.seedData();
});

afterAll(async () => {
await dbHelpers.teardown();
jest.resetAllMocks();
});

afterEach(async () => {
await dbHelpers.clearData();
jest.clearAllMocks();
});

it('succeeds with no stale PNL subaccounts', async () => {
jest.spyOn(BlockTable, 'getLatest').mockResolvedValue({
blockHeight: '12345',
} as any);

jest.spyOn(SubaccountTable, 'getSubaccountsWithTransfers').mockResolvedValue([
{ id: 'subaccount1' },
{ id: 'subaccount2' },
] as any);

jest.spyOn(PnlTicksTable, 'findMostRecentPnlTickTimeForEachAccount').mockResolvedValue({
subaccount1: DateTime.utc().minus({ hours: 1 }).toISO(),
subaccount2: DateTime.utc().minus({ hours: 1 }).toISO(),
});

await runTask();

expect(stats.gauge).toHaveBeenCalledWith(`${config.SERVICE_NAME}.pnl_stale_subaccounts`, 0);
expect(stats.gauge).toHaveBeenCalledWith(`${config.SERVICE_NAME}.pnl_stale_subaccounts_with_prior_pnl`, 0);
expect(stats.gauge).toHaveBeenCalledWith(`${config.SERVICE_NAME}.pnl_stale_subaccounts_without_prior_pnl`, 0);
expect(logger.error).toHaveBeenCalledTimes(0);
});

it('succeeds with stale PNL subaccounts', async () => {
jest.spyOn(BlockTable, 'getLatest').mockResolvedValue({
blockHeight: '12345',
} as any);

jest.spyOn(SubaccountTable, 'getSubaccountsWithTransfers').mockResolvedValue([
{ id: 'subaccount1' },
{ id: 'subaccount2' },
] as any);

jest.spyOn(PnlTicksTable, 'findMostRecentPnlTickTimeForEachAccount').mockResolvedValue({
subaccount1: DateTime.utc().minus({ hours: 3 }).toISO(),
subaccount2: DateTime.utc().minus({ hours: 3 }).toISO(),
});

await runTask();

expect(stats.gauge).toHaveBeenCalledWith(`${config.SERVICE_NAME}.pnl_stale_subaccounts`, 2);
expect(stats.gauge).toHaveBeenCalledWith(`${config.SERVICE_NAME}.pnl_stale_subaccounts_with_prior_pnl`, 2);
expect(stats.gauge).toHaveBeenCalledWith(`${config.SERVICE_NAME}.pnl_stale_subaccounts_without_prior_pnl`, 0);
expect(logger.error).toHaveBeenCalledWith({
at: 'pnl-instrumentation#statPnl',
message: 'Subaccount ids with stale PNL data',
stalePnlSubaccounts: ['subaccount1', 'subaccount2'],
staleTransferSubaccounts: [],
});
});

it('succeeds with stale transfer subaccounts', async () => {
jest.spyOn(BlockTable, 'getLatest').mockResolvedValue({
blockHeight: '12345',
} as any);

jest.spyOn(SubaccountTable, 'getSubaccountsWithTransfers').mockResolvedValue([
{ id: 'subaccount1' },
{ id: 'subaccount2' },
] as any);

jest.spyOn(PnlTicksTable, 'findMostRecentPnlTickTimeForEachAccount').mockResolvedValue({
subaccount1: DateTime.utc().minus({ hours: 1 }).toISO(),
});

jest.spyOn(TransferTable, 'getLastTransferTimeForSubaccounts').mockResolvedValue({
subaccount2: DateTime.utc().minus({ hours: 3 }).toISO(),
});

await runTask();

expect(stats.gauge).toHaveBeenCalledWith(`${config.SERVICE_NAME}.pnl_stale_subaccounts`, 1);
expect(stats.gauge).toHaveBeenCalledWith(`${config.SERVICE_NAME}.pnl_stale_subaccounts_with_prior_pnl`, 0);
expect(stats.gauge).toHaveBeenCalledWith(`${config.SERVICE_NAME}.pnl_stale_subaccounts_without_prior_pnl`, 1);
expect(logger.error).toHaveBeenCalledWith({
at: 'pnl-instrumentation#statPnl',
message: 'Subaccount ids with stale PNL data',
stalePnlSubaccounts: [],
staleTransferSubaccounts: ['subaccount2'],
});
});
});
4 changes: 4 additions & 0 deletions indexer/services/roundtable/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export const configSchema = {
LOOPS_ENABLED_PNL_TICKS: parseBoolean({ default: true }),
LOOPS_ENABLED_REMOVE_EXPIRED_ORDERS: parseBoolean({ default: true }),
LOOPS_ORDERBOOK_INSTRUMENTATION: parseBoolean({ default: true }),
LOOPS_PNL_INSTRUMENTATION: parseBoolean({ default: true }),
LOOPS_CANCEL_STALE_ORDERS: parseBoolean({ default: true }),
LOOPS_ENABLED_UPDATE_RESEARCH_ENVIRONMENT: parseBoolean({ default: false }),
LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS: parseBoolean({ default: true }),
Expand Down Expand Up @@ -70,6 +71,9 @@ export const configSchema = {
LOOPS_INTERVAL_MS_ORDERBOOK_INSTRUMENTATION: parseInteger({
default: 5 * ONE_SECOND_IN_MILLISECONDS,
}),
LOOPS_INTERVAL_MS_PNL_INSTRUMENTATION: parseInteger({
default: ONE_HOUR_IN_MILLISECONDS,
}),
LOOPS_INTERVAL_MS_CANCEL_STALE_ORDERS: parseInteger({
default: THIRTY_SECONDS_IN_MILLISECONDS,
}),
Expand Down
9 changes: 9 additions & 0 deletions indexer/services/roundtable/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import deleteZeroPriceLevelsTask from './tasks/delete-zero-price-levels';
import marketUpdaterTask from './tasks/market-updater';
import orderbookInstrumentationTask from './tasks/orderbook-instrumentation';
import performComplianceStatusTransitionsTask from './tasks/perform-compliance-status-transitions';
import pnlInstrumentationTask from './tasks/pnl-instrumentation';
import removeExpiredOrdersTask from './tasks/remove-expired-orders';
import removeOldOrderUpdatesTask from './tasks/remove-old-order-updates';
import takeFastSyncSnapshotTask from './tasks/take-fast-sync-snapshot';
Expand Down Expand Up @@ -99,6 +100,14 @@ async function start(): Promise<void> {
);
}

if (config.LOOPS_PNL_INSTRUMENTATION) {
startLoop(
pnlInstrumentationTask,
'pnl_instrumentation',
config.LOOPS_INTERVAL_MS_PNL_INSTRUMENTATION,
);
}

if (config.LOOPS_CANCEL_STALE_ORDERS) {
startLoop(
cancelStaleOrdersTask,
Expand Down
Loading

0 comments on commit 141e4cc

Please sign in to comment.