Skip to content

Commit

Permalink
fix: save bundle included events using a separate class (#99)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexandru Matei <alexandrumatei3693@gmail.com>
  • Loading branch information
amateima and alexandrumatei36 authored Nov 12, 2024
1 parent 28d0779 commit 63371b8
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 5 deletions.
1 change: 1 addition & 0 deletions packages/indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,6 @@ NODE_RETRY_DELAY=1000
ENABLE_HUBPOOL_INDEXER=true
ENABLE_BUNDLE_EVENTS_PROCESSOR=true
ENABLE_BUNDLE_INCLUDED_EVENTS_SERVICE=true
ENABLE_BUNDLE_BUILDER=true
```
28 changes: 23 additions & 5 deletions packages/indexer/src/database/BundleRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,11 @@ export class BundleRepository extends utils.BaseRepository {
);
const chunkedDeposits = across.utils.chunk(deposits, this.chunkSize);
await Promise.all(
chunkedDeposits.map((eventsChunk) => eventsRepo.insert(eventsChunk)),
chunkedDeposits.map((eventsChunk) =>
eventsRepo.upsert(eventsChunk, {
conflictPaths: { relayHash: true, type: true },
}),
),
);

// Store bundle refunded deposits
Expand All @@ -512,7 +516,11 @@ export class BundleRepository extends utils.BaseRepository {
);
const chunkedRefunds = across.utils.chunk(expiredDeposits, this.chunkSize);
await Promise.all(
chunkedRefunds.map((eventsChunk) => eventsRepo.insert(eventsChunk)),
chunkedRefunds.map((eventsChunk) =>
eventsRepo.upsert(eventsChunk, {
conflictPaths: { relayHash: true, type: true },
}),
),
);

// Store bundle slow fills
Expand All @@ -523,7 +531,11 @@ export class BundleRepository extends utils.BaseRepository {
);
const chunkedSlowFills = across.utils.chunk(slowFills, this.chunkSize);
await Promise.all(
chunkedSlowFills.map((eventsChunk) => eventsRepo.insert(eventsChunk)),
chunkedSlowFills.map((eventsChunk) =>
eventsRepo.upsert(eventsChunk, {
conflictPaths: { relayHash: true, type: true },
}),
),
);

// Store bundle unexecutable slow fills
Expand All @@ -538,7 +550,9 @@ export class BundleRepository extends utils.BaseRepository {
);
await Promise.all(
chunkedUnexecutableSlowFills.map((eventsChunk) =>
eventsRepo.insert(eventsChunk),
eventsRepo.upsert(eventsChunk, {
conflictPaths: { relayHash: true, type: true },
}),
),
);

Expand All @@ -550,7 +564,11 @@ export class BundleRepository extends utils.BaseRepository {
);
const chunkedFills = across.utils.chunk(fills, this.chunkSize);
await Promise.all(
chunkedFills.map((eventsChunk) => eventsRepo.insert(eventsChunk)),
chunkedFills.map((eventsChunk) =>
eventsRepo.upsert(eventsChunk, {
conflictPaths: { relayHash: true, type: true },
}),
),
);

return {
Expand Down
6 changes: 6 additions & 0 deletions packages/indexer/src/parseEnv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export type Config = {
spokePoolChainsEnabled: number[];
enableHubPoolIndexer: boolean;
enableBundleEventsProcessor: boolean;
enableBundleIncludedEventsService: boolean;
enableBundleBuilder: boolean;
};
export type RedisConfig = {
Expand Down Expand Up @@ -165,6 +166,10 @@ export function envToConfig(env: Env): Config {
const enableBundleEventsProcessor = env.ENABLE_BUNDLE_EVENTS_PROCESSOR
? env.ENABLE_BUNDLE_EVENTS_PROCESSOR === "true"
: true;
const enableBundleIncludedEventsService =
env.ENABLE_BUNDLE_INCLUDED_EVENTS_SERVICE
? env.ENABLE_BUNDLE_INCLUDED_EVENTS_SERVICE === "true"
: true;
const enableBundleBuilder = env.ENABLE_BUNDLE_BUILDER
? env.ENABLE_BUNDLE_BUILDER === "true"
: true;
Expand All @@ -184,6 +189,7 @@ export function envToConfig(env: Env): Config {
spokePoolChainsEnabled,
enableHubPoolIndexer,
enableBundleEventsProcessor,
enableBundleIncludedEventsService,
enableBundleBuilder,
};
}
212 changes: 212 additions & 0 deletions packages/indexer/src/services/BundleIncludedEventsService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import { CHAIN_IDs } from "@across-protocol/constants";
import * as across from "@across-protocol/sdk";
import Redis from "ioredis";
import winston from "winston";
import { DataSource, entities } from "@repo/indexer-database";
import { BaseIndexer } from "../generics";
import {
BlockRangeInsertType,
BundleRepository,
} from "../database/BundleRepository";
import * as utils from "../utils";
import { getBlockTime } from "../web3/constants";
import {
buildPoolRebalanceRoot,
getBlockRangeBetweenBundles,
getBundleBlockRanges,
} from "../utils/bundleBuilderUtils";

export type BundleConfig = {
hubChainId: number;
logger: winston.Logger;
redis: Redis;
postgres: DataSource;
hubPoolClientFactory: utils.HubPoolClientFactory;
spokePoolClientFactory: utils.SpokePoolClientFactory;
bundleRepository: BundleRepository;
};

export class BundleIncludedEventsService extends BaseIndexer {
private hubPoolClient: across.clients.HubPoolClient;
private configStoreClient: across.clients.AcrossConfigStoreClient;

constructor(private readonly config: BundleConfig) {
super(config.logger, "BundleIncludedEventsService");
}

protected async indexerLogic(): Promise<void> {
try {
this.config.logger.info({
at: "BundleIncludedEventsService#indexerLogic",
message: "Starting BundleIncludedEventsService",
});
await this.assignSpokePoolEventsToExecutedBundles();

this.config.logger.info({
at: "BundleIncludedEventsService#indexerLogic",
message: "Finished BundleIncludedEventsService",
});
} catch (error) {
this.logger.error({
at: "BundleIncludedEventsService#indexerLogic",
message: "Error in BundleIncludedEventsService",
error,
});
}
}

protected async initialize(): Promise<void> {
const { hubPoolClientFactory } = this.config;
this.hubPoolClient = hubPoolClientFactory.get(this.config.hubChainId);
this.configStoreClient = this.hubPoolClient.configStoreClient;
}

private async assignSpokePoolEventsToExecutedBundles(): Promise<void> {
const { logger, bundleRepository } = this.config;
const executedBundles =
await bundleRepository.getExecutedBundlesWithoutEventsAssociated({
fromBlock: utils.ACROSS_V3_MAINNET_DEPLOYMENT_BLOCK,
});
logger.info({
at: "ExecutedBundleEventsService#assignSpokePoolEventsToExecutedBundles",
message: `Found ${executedBundles.length} executed bundles without events associated`,
});
if (executedBundles.length === 0) {
return;
}

logger.debug({
at: "BundleIncludedEventsService#assignSpokePoolEventsToExecutedBundles",
message: "Updating HubPool and ConfigStore clients",
});
const startTime = Date.now();
await this.configStoreClient.update();
await this.hubPoolClient.update();
const endTime = Date.now();
const duration = endTime - startTime;
logger.debug({
at: "BundleIncludedEventsService#assignSpokePoolEventsToExecutedBundles",
message: `Updated HubPool and ConfigStore clients in ${duration / 1000} seconds`,
});

for (const bundle of executedBundles) {
await this.getEventsIncludedInBundle(bundle);
}
}

private async getEventsIncludedInBundle(
bundle: entities.Bundle,
): Promise<void> {
const { logger, bundleRepository, spokePoolClientFactory } = this.config;
const historicalBundle = await bundleRepository.retrieveMostRecentBundle(
entities.BundleStatus.Executed,
bundle.proposal.blockNumber,
8,
);
// Skip the bundle if we don't have enough historical data
if (!historicalBundle) {
logger.warn({
at: "BundleIncludedEventsService#getEventsIncludedInBundle",
message: `No historical bundle found. Skipping bundle reconstruction of bundle ${bundle.id}`,
});
return;
}
const lookbackRange = getBlockRangeBetweenBundles(
historicalBundle.proposal,
bundle.proposal,
);
const spokeClients = this.getSpokeClientsForLookbackBlockRange(
lookbackRange,
spokePoolClientFactory,
);
logger.debug({
at: "BundleIncludedEventsService#getEventsIncludedInBundle",
message: `Updating spoke clients for lookback range for bundle ${bundle.id}`,
lookbackRange,
});
const startTime = Date.now();
await Promise.all(
Object.values(spokeClients).map((client) => client.update()),
);
const endTime = Date.now();
const duration = endTime - startTime;
logger.debug({
at: "BundleIncludedEventsService#getEventsIncludedInBundle",
message: `Updated spoke clients in ${duration / 1000} seconds for bundle ${bundle.id}`,
});
const clients = {
hubPoolClient: this.hubPoolClient,
configStoreClient: this.configStoreClient,
arweaveClient: null as unknown as across.caching.ArweaveClient, // FIXME: This is a hack to avoid instantiating the Arweave client
};
// Instantiate bundle data client and reconstruct bundle
const bundleDataClient =
new across.clients.BundleDataClient.BundleDataClient(
logger,
clients,
spokeClients,
bundle.proposal.chainIds,
);
// Get bundle ranges as an array of [startBlock, endBlock] for each chain
const bundleBlockRanges = getBundleBlockRanges(bundle);
const bundleData = await bundleDataClient.loadData(
bundleBlockRanges,
spokeClients,
);

// Build pool rebalance root and check it matches with the root of the stored bundle
const poolRebalanceRoot = buildPoolRebalanceRoot(
bundleBlockRanges,
bundleData,
this.hubPoolClient,
this.configStoreClient,
);
if (bundle.poolRebalanceRoot !== poolRebalanceRoot.tree.getHexRoot()) {
logger.warn({
at: "BundleIncludedEventsService#getEventsIncludedInBundle",
message: `Mismatching roots. Skipping bundle ${bundle.id}.`,
});
return;
} else {
const storedEvents = await bundleRepository.storeBundleEvents(
bundleData,
bundle.id,
);
await bundleRepository.updateBundleEventsAssociatedFlag(bundle.id);
logger.info({
at: "BundleIncludedEventsService#getEventsIncludedInBundle",
message: `Stored bundle events for bundle ${bundle.id}`,
storedEvents,
});
}
}

private getSpokeClientsForLookbackBlockRange(
lookbackRange: utils.ProposalRangeResult[],
spokePoolClientFactory: utils.SpokePoolClientFactory,
) {
return lookbackRange.reduce(
(acc, { chainId, startBlock, endBlock }) => {
// We need to instantiate spoke clients using a higher end block than
// the bundle range as deposits which fills are included in this bundle could
// have occured outside the bundle range of the origin chain
// NOTE: A buffer time of 15 minutes has been proven to work for older bundles
const blockTime = getBlockTime(chainId);
const endBlockTimeBuffer = 60 * 15;
const blockBuffer = Math.round(endBlockTimeBuffer / blockTime);
return {
...acc,
[chainId]: spokePoolClientFactory.get(
chainId,
startBlock,
endBlock + blockBuffer,
{
hubPoolClient: this.hubPoolClient,
},
),
};
},
{} as Record<number, across.clients.SpokePoolClient>,
);
}
}
21 changes: 21 additions & 0 deletions packages/indexer/src/services/BundleServicesManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import {
} from "../utils";
import { RetryProvidersFactory } from "../web3/RetryProvidersFactory";
import { BundleRepository } from "../database/BundleRepository";
import { BundleIncludedEventsService } from "./BundleIncludedEventsService";

export class BundleServicesManager {
private bundleEventsProcessor?: BundleEventsProcessor;
private bundleBuilderService?: BundleBuilderService;
private bundleIncludedEventsService?: BundleIncludedEventsService;

public constructor(
private config: Config,
Expand All @@ -31,12 +33,31 @@ export class BundleServicesManager {
return Promise.all([
this.startBundleEventsProcessor(),
this.startBundleBuilderService(),
this.startBundleIncludedEventsService(),
]);
}

public stop() {
this.bundleEventsProcessor?.stop();
this.bundleBuilderService?.stop();
this.bundleIncludedEventsService?.stop();
}

private startBundleIncludedEventsService() {
if (!this.config.enableBundleIncludedEventsService) {
this.logger.warn("Bundle included events service is disabled");
return;
}
this.bundleIncludedEventsService = new BundleIncludedEventsService({
hubChainId: this.config.hubChainId,
logger: this.logger,
redis: this.redis,
postgres: this.postgres,
hubPoolClientFactory: this.hubPoolClientFactory,
spokePoolClientFactory: this.spokePoolClientFactory,
bundleRepository: this.bundleRepository,
});
return this.bundleIncludedEventsService.start(10);
}

private startBundleEventsProcessor() {
Expand Down

0 comments on commit 63371b8

Please sign in to comment.