Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: save bundle included events using a separate class #99

Merged
merged 4 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,6 @@ NODE_RETRY_DELAY=1000

ENABLE_HUBPOOL_INDEXER=true
ENABLE_BUNDLE_EVENTS_PROCESSOR=true
ENABLE_BUNDLE_INCLUDED_EVENTS_SERVICE=true
ENABLE_BUNDLE_BUILDER=true
```
28 changes: 23 additions & 5 deletions packages/indexer/src/database/BundleRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,11 @@ export class BundleRepository extends utils.BaseRepository {
);
const chunkedDeposits = across.utils.chunk(deposits, this.chunkSize);
await Promise.all(
chunkedDeposits.map((eventsChunk) => eventsRepo.insert(eventsChunk)),
chunkedDeposits.map((eventsChunk) =>
eventsRepo.upsert(eventsChunk, {
conflictPaths: { relayHash: true, type: true },
}),
),
);

// Store bundle refunded deposits
Expand All @@ -512,7 +516,11 @@ export class BundleRepository extends utils.BaseRepository {
);
const chunkedRefunds = across.utils.chunk(expiredDeposits, this.chunkSize);
await Promise.all(
chunkedRefunds.map((eventsChunk) => eventsRepo.insert(eventsChunk)),
chunkedRefunds.map((eventsChunk) =>
eventsRepo.upsert(eventsChunk, {
conflictPaths: { relayHash: true, type: true },
}),
),
);

// Store bundle slow fills
Expand All @@ -523,7 +531,11 @@ export class BundleRepository extends utils.BaseRepository {
);
const chunkedSlowFills = across.utils.chunk(slowFills, this.chunkSize);
await Promise.all(
chunkedSlowFills.map((eventsChunk) => eventsRepo.insert(eventsChunk)),
chunkedSlowFills.map((eventsChunk) =>
eventsRepo.upsert(eventsChunk, {
conflictPaths: { relayHash: true, type: true },
}),
),
);

// Store bundle unexecutable slow fills
Expand All @@ -538,7 +550,9 @@ export class BundleRepository extends utils.BaseRepository {
);
await Promise.all(
chunkedUnexecutableSlowFills.map((eventsChunk) =>
eventsRepo.insert(eventsChunk),
eventsRepo.upsert(eventsChunk, {
conflictPaths: { relayHash: true, type: true },
}),
),
);

Expand All @@ -550,7 +564,11 @@ export class BundleRepository extends utils.BaseRepository {
);
const chunkedFills = across.utils.chunk(fills, this.chunkSize);
await Promise.all(
chunkedFills.map((eventsChunk) => eventsRepo.insert(eventsChunk)),
chunkedFills.map((eventsChunk) =>
eventsRepo.upsert(eventsChunk, {
conflictPaths: { relayHash: true, type: true },
}),
),
);

return {
Expand Down
6 changes: 6 additions & 0 deletions packages/indexer/src/parseEnv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export type Config = {
spokePoolChainsEnabled: number[];
enableHubPoolIndexer: boolean;
enableBundleEventsProcessor: boolean;
enableBundleIncludedEventsService: boolean;
enableBundleBuilder: boolean;
};
export type RedisConfig = {
Expand Down Expand Up @@ -165,6 +166,10 @@ export function envToConfig(env: Env): Config {
const enableBundleEventsProcessor = env.ENABLE_BUNDLE_EVENTS_PROCESSOR
? env.ENABLE_BUNDLE_EVENTS_PROCESSOR === "true"
: true;
const enableBundleIncludedEventsService =
env.ENABLE_BUNDLE_INCLUDED_EVENTS_SERVICE
? env.ENABLE_BUNDLE_INCLUDED_EVENTS_SERVICE === "true"
: true;
const enableBundleBuilder = env.ENABLE_BUNDLE_BUILDER
? env.ENABLE_BUNDLE_BUILDER === "true"
: true;
Expand All @@ -184,6 +189,7 @@ export function envToConfig(env: Env): Config {
spokePoolChainsEnabled,
enableHubPoolIndexer,
enableBundleEventsProcessor,
enableBundleIncludedEventsService,
enableBundleBuilder,
};
}
212 changes: 212 additions & 0 deletions packages/indexer/src/services/BundleIncludedEventsService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import { CHAIN_IDs } from "@across-protocol/constants";
import * as across from "@across-protocol/sdk";
import Redis from "ioredis";
import winston from "winston";
import { DataSource, entities } from "@repo/indexer-database";
import { BaseIndexer } from "../generics";
import {
BlockRangeInsertType,
BundleRepository,
} from "../database/BundleRepository";
import * as utils from "../utils";
import { getBlockTime } from "../web3/constants";
import {
buildPoolRebalanceRoot,
getBlockRangeBetweenBundles,
getBundleBlockRanges,
} from "../utils/bundleBuilderUtils";

export type BundleConfig = {
hubChainId: number;
logger: winston.Logger;
redis: Redis;
postgres: DataSource;
hubPoolClientFactory: utils.HubPoolClientFactory;
spokePoolClientFactory: utils.SpokePoolClientFactory;
bundleRepository: BundleRepository;
};

export class BundleIncludedEventsService extends BaseIndexer {
private hubPoolClient: across.clients.HubPoolClient;
private configStoreClient: across.clients.AcrossConfigStoreClient;

constructor(private readonly config: BundleConfig) {
super(config.logger, "BundleIncludedEventsService");
}

protected async indexerLogic(): Promise<void> {
try {
this.config.logger.info({
at: "BundleIncludedEventsService#indexerLogic",
message: "Starting BundleIncludedEventsService",
});
await this.assignSpokePoolEventsToExecutedBundles();

this.config.logger.info({
at: "BundleIncludedEventsService#indexerLogic",
message: "Finished BundleIncludedEventsService",
});
} catch (error) {
this.logger.error({
at: "BundleIncludedEventsService#indexerLogic",
message: "Error in BundleIncludedEventsService",
error,
});
}
}

protected async initialize(): Promise<void> {
const { hubPoolClientFactory } = this.config;
this.hubPoolClient = hubPoolClientFactory.get(this.config.hubChainId);
this.configStoreClient = this.hubPoolClient.configStoreClient;
}

private async assignSpokePoolEventsToExecutedBundles(): Promise<void> {
const { logger, bundleRepository } = this.config;
const executedBundles =
await bundleRepository.getExecutedBundlesWithoutEventsAssociated({
fromBlock: utils.ACROSS_V3_MAINNET_DEPLOYMENT_BLOCK,
});
logger.info({
at: "ExecutedBundleEventsService#assignSpokePoolEventsToExecutedBundles",
message: `Found ${executedBundles.length} executed bundles without events associated`,
});
if (executedBundles.length === 0) {
return;
}

logger.debug({
at: "BundleIncludedEventsService#assignSpokePoolEventsToExecutedBundles",
message: "Updating HubPool and ConfigStore clients",
});
const startTime = Date.now();
await this.configStoreClient.update();
await this.hubPoolClient.update();
const endTime = Date.now();
const duration = endTime - startTime;
logger.debug({
at: "BundleIncludedEventsService#assignSpokePoolEventsToExecutedBundles",
message: `Updated HubPool and ConfigStore clients in ${duration / 1000} seconds`,
});

for (const bundle of executedBundles) {
await this.getEventsIncludedInBundle(bundle);
}
}

private async getEventsIncludedInBundle(
bundle: entities.Bundle,
): Promise<void> {
const { logger, bundleRepository, spokePoolClientFactory } = this.config;
const historicalBundle = await bundleRepository.retrieveMostRecentBundle(
entities.BundleStatus.Executed,
bundle.proposal.blockNumber,
8,
);
// Skip the bundle if we don't have enough historical data
if (!historicalBundle) {
logger.warn({
at: "BundleIncludedEventsService#getEventsIncludedInBundle",
message: `No historical bundle found. Skipping bundle reconstruction of bundle ${bundle.id}`,
});
return;
}
const lookbackRange = getBlockRangeBetweenBundles(
historicalBundle.proposal,
bundle.proposal,
);
const spokeClients = this.getSpokeClientsForLookbackBlockRange(
lookbackRange,
spokePoolClientFactory,
);
logger.debug({
at: "BundleIncludedEventsService#getEventsIncludedInBundle",
message: `Updating spoke clients for lookback range for bundle ${bundle.id}`,
lookbackRange,
});
const startTime = Date.now();
await Promise.all(
Object.values(spokeClients).map((client) => client.update()),
);
const endTime = Date.now();
const duration = endTime - startTime;
logger.debug({
at: "BundleIncludedEventsService#getEventsIncludedInBundle",
message: `Updated spoke clients in ${duration / 1000} seconds for bundle ${bundle.id}`,
});
const clients = {
hubPoolClient: this.hubPoolClient,
configStoreClient: this.configStoreClient,
arweaveClient: null as unknown as across.caching.ArweaveClient, // FIXME: This is a hack to avoid instantiating the Arweave client
};
// Instantiate bundle data client and reconstruct bundle
const bundleDataClient =
new across.clients.BundleDataClient.BundleDataClient(
logger,
clients,
spokeClients,
bundle.proposal.chainIds,
);
// Get bundle ranges as an array of [startBlock, endBlock] for each chain
const bundleBlockRanges = getBundleBlockRanges(bundle);
const bundleData = await bundleDataClient.loadData(
bundleBlockRanges,
spokeClients,
);

// Build pool rebalance root and check it matches with the root of the stored bundle
const poolRebalanceRoot = buildPoolRebalanceRoot(
bundleBlockRanges,
bundleData,
this.hubPoolClient,
this.configStoreClient,
);
if (bundle.poolRebalanceRoot !== poolRebalanceRoot.tree.getHexRoot()) {
logger.warn({
at: "BundleIncludedEventsService#getEventsIncludedInBundle",
message: `Mismatching roots. Skipping bundle ${bundle.id}.`,
});
return;
} else {
const storedEvents = await bundleRepository.storeBundleEvents(
bundleData,
bundle.id,
);
await bundleRepository.updateBundleEventsAssociatedFlag(bundle.id);
logger.info({
at: "BundleIncludedEventsService#getEventsIncludedInBundle",
message: `Stored bundle events for bundle ${bundle.id}`,
storedEvents,
});
}
}

private getSpokeClientsForLookbackBlockRange(
lookbackRange: utils.ProposalRangeResult[],
spokePoolClientFactory: utils.SpokePoolClientFactory,
) {
return lookbackRange.reduce(
(acc, { chainId, startBlock, endBlock }) => {
// We need to instantiate spoke clients using a higher end block than
// the bundle range as deposits which fills are included in this bundle could
// have occured outside the bundle range of the origin chain
// NOTE: A buffer time of 15 minutes has been proven to work for older bundles
const blockTime = getBlockTime(chainId);
const endBlockTimeBuffer = 60 * 15;
const blockBuffer = Math.round(endBlockTimeBuffer / blockTime);
return {
...acc,
[chainId]: spokePoolClientFactory.get(
chainId,
startBlock,
endBlock + blockBuffer,
{
hubPoolClient: this.hubPoolClient,
},
),
};
},
{} as Record<number, across.clients.SpokePoolClient>,
);
}
}
21 changes: 21 additions & 0 deletions packages/indexer/src/services/BundleServicesManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import {
} from "../utils";
import { RetryProvidersFactory } from "../web3/RetryProvidersFactory";
import { BundleRepository } from "../database/BundleRepository";
import { BundleIncludedEventsService } from "./BundleIncludedEventsService";

export class BundleServicesManager {
private bundleEventsProcessor?: BundleEventsProcessor;
private bundleBuilderService?: BundleBuilderService;
private bundleIncludedEventsService?: BundleIncludedEventsService;

public constructor(
private config: Config,
Expand All @@ -31,12 +33,31 @@ export class BundleServicesManager {
return Promise.all([
this.startBundleEventsProcessor(),
this.startBundleBuilderService(),
this.startBundleIncludedEventsService(),
]);
}

public stop() {
this.bundleEventsProcessor?.stop();
this.bundleBuilderService?.stop();
this.bundleIncludedEventsService?.stop();
}

private startBundleIncludedEventsService() {
if (!this.config.enableBundleIncludedEventsService) {
this.logger.warn("Bundle included events service is disabled");
return;
}
this.bundleIncludedEventsService = new BundleIncludedEventsService({
hubChainId: this.config.hubChainId,
logger: this.logger,
redis: this.redis,
postgres: this.postgres,
hubPoolClientFactory: this.hubPoolClientFactory,
spokePoolClientFactory: this.spokePoolClientFactory,
bundleRepository: this.bundleRepository,
});
return this.bundleIncludedEventsService.start(10);
}

private startBundleEventsProcessor() {
Expand Down