Skip to content

Commit

Permalink
[IND-552] add roundtable task to take fast sync Postgres snapshots ev…
Browse files Browse the repository at this point in the history
…ery 4 hours (#912)
  • Loading branch information
dydxwill authored Jan 5, 2024
1 parent 35abe97 commit b93fc56
Show file tree
Hide file tree
Showing 8 changed files with 345 additions and 11 deletions.
2 changes: 2 additions & 0 deletions indexer/packages/base/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ export const ONE_MINUTE_IN_MILLISECONDS: number = 60 * ONE_SECOND_IN_MILLISECOND
export const FIVE_MINUTES_IN_MILLISECONDS: number = 5 * ONE_MINUTE_IN_MILLISECONDS;
export const TEN_MINUTES_IN_MILLISECONDS: number = 10 * ONE_MINUTE_IN_MILLISECONDS;
export const ONE_HOUR_IN_MILLISECONDS: number = 60 * ONE_MINUTE_IN_MILLISECONDS;
export const FOUR_HOURS_IN_MILLISECONDS: number = 4 * ONE_HOUR_IN_MILLISECONDS;
export const ONE_DAY_IN_MILLISECONDS: number = 24 * ONE_HOUR_IN_MILLISECONDS;
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import config from '../../src/config';
import { asMock } from '@dydxprotocol-indexer/dev';
import {
createDBSnapshot,
getMostRecentDBSnapshotIdentifier,
} from '../../src/helpers/aws';
import takeFastSyncSnapshotTask from '../../src/tasks/take-fast-sync-snapshot';
import { DateTime } from 'luxon';

jest.mock('../../src/helpers/aws');

describe('fast-sync-export-db-snapshot', () => {
const snapshotIdentifier: string = `${config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX}-postgres-main-staging-2022-05-03-04-16`;
beforeAll(() => {
config.RDS_INSTANCE_NAME = 'postgres-main-staging';
});

beforeEach(() => {
jest.resetAllMocks();
asMock(getMostRecentDBSnapshotIdentifier).mockImplementation(
async () => Promise.resolve(snapshotIdentifier),
);
});

afterAll(jest.resetAllMocks);

it('Last snapshot was taken more than interval ago', async () => {
await takeFastSyncSnapshotTask();

expect(createDBSnapshot).toHaveBeenCalled();
});

it('Last snapshot was taken less than interval ago', async () => {
const timestamp: string = DateTime.utc().minus({ minutes: 1 }).toFormat('yyyy-MM-dd-HH-mm');
asMock(getMostRecentDBSnapshotIdentifier).mockImplementation(
async () => Promise.resolve(`${config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX}-postgres-main-staging-${timestamp}`),
);

await takeFastSyncSnapshotTask();

expect(createDBSnapshot).not.toHaveBeenCalled();
});

it('No existing snapshot', async () => {
asMock(getMostRecentDBSnapshotIdentifier).mockImplementation(
async () => Promise.resolve(undefined),
);

await takeFastSyncSnapshotTask();

expect(createDBSnapshot).toHaveBeenCalled();
});
});
11 changes: 11 additions & 0 deletions indexer/services/roundtable/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import {
ONE_HOUR_IN_MILLISECONDS,
ONE_SECOND_IN_MILLISECONDS,
TEN_SECONDS_IN_MILLISECONDS,
FOUR_HOURS_IN_MILLISECONDS,
ONE_DAY_IN_MILLISECONDS,
} from '@dydxprotocol-indexer/base';
import {
kafkaConfigSchema,
Expand All @@ -40,6 +42,8 @@ export const configSchema = {
LOOPS_ORDERBOOK_INSTRUMENTATION: parseBoolean({ default: true }),
LOOPS_CANCEL_STALE_ORDERS: parseBoolean({ default: true }),
LOOPS_ENABLED_UPDATE_RESEARCH_ENVIRONMENT: parseBoolean({ default: false }),
LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS: parseBoolean({ default: true }),
LOOPS_ENABLED_DELETE_OLD_FAST_SYNC_SNAPSHOTS: parseBoolean({ default: true }),
LOOPS_ENABLED_TRACK_LAG: parseBoolean({ default: false }),
LOOPS_ENABLED_REMOVE_OLD_ORDER_UPDATES: parseBoolean({ default: true }),
LOOPS_ENABLED_AGGREGATE_TRADING_REWARDS: parseBoolean({ default: true }),
Expand All @@ -66,6 +70,12 @@ export const configSchema = {
LOOPS_INTERVAL_MS_UPDATE_RESEARCH_ENVIRONMENT: parseInteger({
default: ONE_HOUR_IN_MILLISECONDS,
}),
LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS: parseInteger({
default: FOUR_HOURS_IN_MILLISECONDS,
}),
LOOPS_INTERVAL_MS_DELETE_OLD_FAST_SYNC_SNAPSHOTS: parseInteger({
default: ONE_DAY_IN_MILLISECONDS,
}),
LOOPS_INTERVAL_MS_UPDATE_COMPLIANCE_DATA: parseInteger({
default: FIVE_MINUTES_IN_MILLISECONDS,
}),
Expand Down Expand Up @@ -112,6 +122,7 @@ export const configSchema = {
AWS_ACCOUNT_ID: parseString(),
AWS_REGION: parseString(),
S3_BUCKET_ARN: parseString(),
FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX: parseString({ default: 'fast-sync' }),
ECS_TASK_ROLE_ARN: parseString(),
KMS_KEY_ARN: parseString(),
RDS_INSTANCE_NAME: parseString(),
Expand Down
167 changes: 157 additions & 10 deletions indexer/services/roundtable/src/helpers/aws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,177 @@ enum ExportTaskStatus {
COMPLETE = 'complete',
}

const S3_BUCKET_NAME = config.S3_BUCKET_ARN.split(':::')[1];
export const S3_BUCKET_NAME = config.S3_BUCKET_ARN.split(':::')[1];
export const S3_LOCATION_PREFIX = `s3://${S3_BUCKET_NAME}`;

/**
* Delete snapshots for the RDS instance older than the specified number of days.
* Defaults to 7 days.
* @param rds
* @param daysOld
*/
export async function deleteOldFastSyncSnapshots(rds: RDS, daysOld: number = 7): Promise<void> {
try {
const cutoffTime: number = new Date().getTime() - daysOld * 24 * 60 * 60 * 1000;
let marker;
do {
const response: RDS.DBSnapshotMessage = await rds.describeDBSnapshots({
DBInstanceIdentifier: config.RDS_INSTANCE_NAME,
MaxRecords: 20, // Maximum number of records per page
Marker: marker, // Marker for pagination
}).promise();

if (response.DBSnapshots === undefined) {
logger.error({
at: `${atStart}deleteOldSnapshots`,
message: `No DB snapshots found with identifier: ${config.RDS_INSTANCE_NAME}`,
});
return;
}

// Filter for fast sync snapshots older than cutoffTime
const oldFastSyncSnapshots = response.DBSnapshots.filter((snapshot) => {
if (!snapshot.DBSnapshotIdentifier!.startsWith(
config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX,
)) {
return false;
}
const snapshotDate = snapshot.SnapshotCreateTime!.getTime();
return snapshotDate < cutoffTime;
});

// Delete each old snapshot
for (const snapshot of oldFastSyncSnapshots) {
logger.info({
at: `${atStart}deleteOldSnapshots`,
message: 'Deleting snapshot',
snapshotIdentifier: snapshot.DBSnapshotIdentifier,
});
const snapshotResult: RDS.Types.DeleteDBSnapshotResult = await rds.deleteDBSnapshot(
{ DBSnapshotIdentifier: snapshot.DBSnapshotIdentifier! },
).promise();
logger.info({
at: `${atStart}deleteOldSnapshots`,
message: 'Snapshot deleted',
snapshotIdentifier: snapshotResult.DBSnapshot!.DBSnapshotIdentifier!,
});
}

marker = response.Marker;
} while (marker);
} catch (error) {
logger.error({
at: `${atStart}deleteOldSnapshots`,
message: 'Error deleting old snapshots',
error,
});
throw error;
}
}

/**
* @description Get most recent snapshot identifier for an RDS database.
* @param rds - RDS client
* @param snapshotIdentifierPrefixInclude - Only include snapshots with snapshot identifier
* that starts with snapshotIdentifierPrefixInclude
* @param snapshotIdentifierPrefixExclude - Exclude snapshots with snapshot identifier
* that starts with snapshotIdentifierPrefixExclude
*/
// TODO(CLOB-672): Verify this function returns the most recent DB snapshot.
export async function getMostRecentDBSnapshotIdentifier(rds: RDS): Promise<string> {
const awsResponse: RDS.DBSnapshotMessage = await rds.describeDBSnapshots({
DBInstanceIdentifier: config.RDS_INSTANCE_NAME,
MaxRecords: 20, // this is the minimum
}).promise();
export async function getMostRecentDBSnapshotIdentifier(
rds: RDS,
snapshotIdentifierPrefixInclude?: string,
snapshotIdentifierPrefixExclude?: string,
): Promise<string | undefined> {
let snapshots: RDS.DBSnapshotList = [];
let marker: string | undefined;

do {
const awsResponse: RDS.DBSnapshotMessage = await rds.describeDBSnapshots({
DBInstanceIdentifier: config.RDS_INSTANCE_NAME,
MaxRecords: 20, // Maximum number of records per page
Marker: marker, // Marker for pagination
}).promise();

if (awsResponse.DBSnapshots === undefined) {
throw Error(`No DB snapshots found with identifier: ${config.RDS_INSTANCE_NAME}`);
}

if (awsResponse.DBSnapshots === undefined) {
throw Error(`No DB snapshots found with identifier: ${config.RDS_INSTANCE_NAME}`);
snapshots = snapshots.concat(awsResponse.DBSnapshots);
marker = awsResponse.Marker;
} while (marker);

// Filter snapshots based on include/exclude prefixes
if (snapshotIdentifierPrefixInclude !== undefined) {
snapshots = snapshots
.filter((snapshot) => snapshot.DBSnapshotIdentifier &&
snapshot.DBSnapshotIdentifier.startsWith(snapshotIdentifierPrefixInclude));
}
if (snapshotIdentifierPrefixExclude !== undefined) {
snapshots = snapshots
.filter((snapshot) => snapshot.DBSnapshotIdentifier &&
!snapshot.DBSnapshotIdentifier.startsWith(snapshotIdentifierPrefixExclude));
}

// Sort snapshots by creation time in descending order
snapshots.sort((a, b) => b.SnapshotCreateTime!.getTime() - a.SnapshotCreateTime!.getTime());

logger.info({
at: `${atStart}getMostRecentDBSnapshotIdentifier`,
message: 'Described snapshots for database',
mostRecentSnapshot: awsResponse.DBSnapshots[awsResponse.DBSnapshots.length - 1],
mostRecentSnapshot: snapshots[0],
});

return awsResponse.DBSnapshots[awsResponse.DBSnapshots.length - 1].DBSnapshotIdentifier!;
// Return the latest snapshot identifier
return snapshots[0]?.DBSnapshotIdentifier;
}

/**
* @description Create DB snapshot for an RDS database. Only returns when the
* snapshot is available.
*/
export async function createDBSnapshot(
rds: RDS,
snapshotIdentifier: string,
dbInstanceIdentifier: string,
): Promise<string> {
const params = {
DBInstanceIdentifier: dbInstanceIdentifier,
DBSnapshotIdentifier: snapshotIdentifier,
};

try {
await rds.createDBSnapshot(params).promise();

// Wait for the DB snapshot to become available with the specified waiter configuration
await rds.waitFor('dBSnapshotAvailable', {
DBSnapshotIdentifier: snapshotIdentifier,
$waiter: {
delay: 60, // 60 seconds delay between each request
maxAttempts: 10, // Maximum of 10 attempts
},
}).promise();

// Once it's available, retrieve its details
const statusResponse = await rds.describeDBSnapshots(
{ DBSnapshotIdentifier: snapshotIdentifier },
).promise();

const snapshot = statusResponse.DBSnapshots![0];
if (snapshot.Status === 'available') {
return snapshot.DBSnapshotIdentifier!;
} else {
throw Error(`Snapshot is not in the available state: Status is ${snapshot.Status}`);
}
} catch (error) {
logger.error({
at: `${atStart}createDBSnapshot`,
message: 'Failed to create DB snapshot',
error,
snapshotIdentifier,
});
throw error;
}
}

/**
Expand Down
18 changes: 18 additions & 0 deletions indexer/services/roundtable/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import {
import aggregateTradingRewardsTasks from './tasks/aggregate-trading-rewards';
import cancelStaleOrdersTask from './tasks/cancel-stale-orders';
import createPnlTicksTask from './tasks/create-pnl-ticks';
import deleteOldFastSyncSnapshots from './tasks/delete-old-fast-sync-snapshots';
import deleteZeroPriceLevelsTask from './tasks/delete-zero-price-levels';
import marketUpdaterTask from './tasks/market-updater';
import orderbookInstrumentationTask from './tasks/orderbook-instrumentation';
import removeExpiredOrdersTask from './tasks/remove-expired-orders';
import removeOldOrderUpdatesTask from './tasks/remove-old-order-updates';
import takeFastSyncSnapshotTask from './tasks/take-fast-sync-snapshot';
import trackLag from './tasks/track-lag';
import updateComplianceDataTask from './tasks/update-compliance-data';
import updateResearchEnvironmentTask from './tasks/update-research-environment';
Expand Down Expand Up @@ -100,6 +102,22 @@ async function start(): Promise<void> {
);
}

if (config.LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS) {
startLoop(
takeFastSyncSnapshotTask,
'take_fast_sync_snapshot',
config.LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS,
);
}

if (config.LOOPS_ENABLED_DELETE_OLD_FAST_SYNC_SNAPSHOTS) {
startLoop(
deleteOldFastSyncSnapshots,
'delete_old_fast_sync_snapshots',
config.LOOPS_INTERVAL_MS_DELETE_OLD_FAST_SYNC_SNAPSHOTS,
);
}

startLoop(
() => updateComplianceDataTask(complianceProvider),
'update_compliance_data',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { logger, stats } from '@dydxprotocol-indexer/base';
import RDS from 'aws-sdk/clients/rds';

import config from '../config';
import { deleteOldFastSyncSnapshots } from '../helpers/aws';

const statStart: string = `${config.SERVICE_NAME}.delete_old_fast_sync_snapshots`;

export default async function runTask(): Promise<void> {
const at: string = 'delete-old-fast-sync-snapshots#runTask';
logger.info({ at, message: 'Starting task.' });

const rds: RDS = new RDS();

const startDeleteOldSnapshot: number = Date.now();
// Delete old snapshots.
await deleteOldFastSyncSnapshots(rds);
stats.timing(`${statStart}.deleteOldSnapshots`, Date.now() - startDeleteOldSnapshot);
}
Loading

0 comments on commit b93fc56

Please sign in to comment.