diff --git a/packages/indexer/README.md b/packages/indexer/README.md index 5e4c5f9..1e4de20 100644 --- a/packages/indexer/README.md +++ b/packages/indexer/README.md @@ -49,5 +49,6 @@ NODE_RETRY_DELAY=1000 ENABLE_HUBPOOL_INDEXER=true ENABLE_BUNDLE_EVENTS_PROCESSOR=true +ENABLE_BUNDLE_INCLUDED_EVENTS_SERVICE=true ENABLE_BUNDLE_BUILDER=true ``` diff --git a/packages/indexer/src/database/BundleRepository.ts b/packages/indexer/src/database/BundleRepository.ts index 417e34a..c020b42 100644 --- a/packages/indexer/src/database/BundleRepository.ts +++ b/packages/indexer/src/database/BundleRepository.ts @@ -501,7 +501,11 @@ export class BundleRepository extends utils.BaseRepository { ); const chunkedDeposits = across.utils.chunk(deposits, this.chunkSize); await Promise.all( - chunkedDeposits.map((eventsChunk) => eventsRepo.insert(eventsChunk)), + chunkedDeposits.map((eventsChunk) => + eventsRepo.upsert(eventsChunk, { + conflictPaths: { relayHash: true, type: true }, + }), + ), ); // Store bundle refunded deposits @@ -512,7 +516,11 @@ export class BundleRepository extends utils.BaseRepository { ); const chunkedRefunds = across.utils.chunk(expiredDeposits, this.chunkSize); await Promise.all( - chunkedRefunds.map((eventsChunk) => eventsRepo.insert(eventsChunk)), + chunkedRefunds.map((eventsChunk) => + eventsRepo.upsert(eventsChunk, { + conflictPaths: { relayHash: true, type: true }, + }), + ), ); // Store bundle slow fills @@ -523,7 +531,11 @@ export class BundleRepository extends utils.BaseRepository { ); const chunkedSlowFills = across.utils.chunk(slowFills, this.chunkSize); await Promise.all( - chunkedSlowFills.map((eventsChunk) => eventsRepo.insert(eventsChunk)), + chunkedSlowFills.map((eventsChunk) => + eventsRepo.upsert(eventsChunk, { + conflictPaths: { relayHash: true, type: true }, + }), + ), ); // Store bundle unexecutable slow fills @@ -538,7 +550,9 @@ export class BundleRepository extends utils.BaseRepository { ); await Promise.all( chunkedUnexecutableSlowFills.map((eventsChunk) => - eventsRepo.insert(eventsChunk), + eventsRepo.upsert(eventsChunk, { + conflictPaths: { relayHash: true, type: true }, + }), ), ); @@ -550,7 +564,11 @@ export class BundleRepository extends utils.BaseRepository { ); const chunkedFills = across.utils.chunk(fills, this.chunkSize); await Promise.all( - chunkedFills.map((eventsChunk) => eventsRepo.insert(eventsChunk)), + chunkedFills.map((eventsChunk) => + eventsRepo.upsert(eventsChunk, { + conflictPaths: { relayHash: true, type: true }, + }), + ), ); return { diff --git a/packages/indexer/src/parseEnv.ts b/packages/indexer/src/parseEnv.ts index 0ba792b..e30a2d5 100644 --- a/packages/indexer/src/parseEnv.ts +++ b/packages/indexer/src/parseEnv.ts @@ -10,6 +10,7 @@ export type Config = { spokePoolChainsEnabled: number[]; enableHubPoolIndexer: boolean; enableBundleEventsProcessor: boolean; + enableBundleIncludedEventsService: boolean; enableBundleBuilder: boolean; }; export type RedisConfig = { @@ -165,6 +166,10 @@ export function envToConfig(env: Env): Config { const enableBundleEventsProcessor = env.ENABLE_BUNDLE_EVENTS_PROCESSOR ? env.ENABLE_BUNDLE_EVENTS_PROCESSOR === "true" : true; + const enableBundleIncludedEventsService = + env.ENABLE_BUNDLE_INCLUDED_EVENTS_SERVICE + ? env.ENABLE_BUNDLE_INCLUDED_EVENTS_SERVICE === "true" + : true; const enableBundleBuilder = env.ENABLE_BUNDLE_BUILDER ? env.ENABLE_BUNDLE_BUILDER === "true" : true; @@ -184,6 +189,7 @@ export function envToConfig(env: Env): Config { spokePoolChainsEnabled, enableHubPoolIndexer, enableBundleEventsProcessor, + enableBundleIncludedEventsService, enableBundleBuilder, }; } diff --git a/packages/indexer/src/services/BundleIncludedEventsService.ts b/packages/indexer/src/services/BundleIncludedEventsService.ts new file mode 100644 index 0000000..e1d6ef0 --- /dev/null +++ b/packages/indexer/src/services/BundleIncludedEventsService.ts @@ -0,0 +1,212 @@ +import { CHAIN_IDs } from "@across-protocol/constants"; +import * as across from "@across-protocol/sdk"; +import Redis from "ioredis"; +import winston from "winston"; +import { DataSource, entities } from "@repo/indexer-database"; +import { BaseIndexer } from "../generics"; +import { + BlockRangeInsertType, + BundleRepository, +} from "../database/BundleRepository"; +import * as utils from "../utils"; +import { getBlockTime } from "../web3/constants"; +import { + buildPoolRebalanceRoot, + getBlockRangeBetweenBundles, + getBundleBlockRanges, +} from "../utils/bundleBuilderUtils"; + +export type BundleConfig = { + hubChainId: number; + logger: winston.Logger; + redis: Redis; + postgres: DataSource; + hubPoolClientFactory: utils.HubPoolClientFactory; + spokePoolClientFactory: utils.SpokePoolClientFactory; + bundleRepository: BundleRepository; +}; + +export class BundleIncludedEventsService extends BaseIndexer { + private hubPoolClient: across.clients.HubPoolClient; + private configStoreClient: across.clients.AcrossConfigStoreClient; + + constructor(private readonly config: BundleConfig) { + super(config.logger, "BundleIncludedEventsService"); + } + + protected async indexerLogic(): Promise { + try { + this.config.logger.info({ + at: "BundleIncludedEventsService#indexerLogic", + message: "Starting BundleIncludedEventsService", + }); + await this.assignSpokePoolEventsToExecutedBundles(); + + this.config.logger.info({ + at: "BundleIncludedEventsService#indexerLogic", + message: "Finished BundleIncludedEventsService", + }); + } catch (error) { + this.logger.error({ + at: "BundleIncludedEventsService#indexerLogic", + message: "Error in BundleIncludedEventsService", + error, + }); + } + } + + protected async initialize(): Promise { + const { hubPoolClientFactory } = this.config; + this.hubPoolClient = hubPoolClientFactory.get(this.config.hubChainId); + this.configStoreClient = this.hubPoolClient.configStoreClient; + } + + private async assignSpokePoolEventsToExecutedBundles(): Promise { + const { logger, bundleRepository } = this.config; + const executedBundles = + await bundleRepository.getExecutedBundlesWithoutEventsAssociated({ + fromBlock: utils.ACROSS_V3_MAINNET_DEPLOYMENT_BLOCK, + }); + logger.info({ + at: "ExecutedBundleEventsService#assignSpokePoolEventsToExecutedBundles", + message: `Found ${executedBundles.length} executed bundles without events associated`, + }); + if (executedBundles.length === 0) { + return; + } + + logger.debug({ + at: "BundleIncludedEventsService#assignSpokePoolEventsToExecutedBundles", + message: "Updating HubPool and ConfigStore clients", + }); + const startTime = Date.now(); + await this.configStoreClient.update(); + await this.hubPoolClient.update(); + const endTime = Date.now(); + const duration = endTime - startTime; + logger.debug({ + at: "BundleIncludedEventsService#assignSpokePoolEventsToExecutedBundles", + message: `Updated HubPool and ConfigStore clients in ${duration / 1000} seconds`, + }); + + for (const bundle of executedBundles) { + await this.getEventsIncludedInBundle(bundle); + } + } + + private async getEventsIncludedInBundle( + bundle: entities.Bundle, + ): Promise { + const { logger, bundleRepository, spokePoolClientFactory } = this.config; + const historicalBundle = await bundleRepository.retrieveMostRecentBundle( + entities.BundleStatus.Executed, + bundle.proposal.blockNumber, + 8, + ); + // Skip the bundle if we don't have enough historical data + if (!historicalBundle) { + logger.warn({ + at: "BundleIncludedEventsService#getEventsIncludedInBundle", + message: `No historical bundle found. Skipping bundle reconstruction of bundle ${bundle.id}`, + }); + return; + } + const lookbackRange = getBlockRangeBetweenBundles( + historicalBundle.proposal, + bundle.proposal, + ); + const spokeClients = this.getSpokeClientsForLookbackBlockRange( + lookbackRange, + spokePoolClientFactory, + ); + logger.debug({ + at: "BundleIncludedEventsService#getEventsIncludedInBundle", + message: `Updating spoke clients for lookback range for bundle ${bundle.id}`, + lookbackRange, + }); + const startTime = Date.now(); + await Promise.all( + Object.values(spokeClients).map((client) => client.update()), + ); + const endTime = Date.now(); + const duration = endTime - startTime; + logger.debug({ + at: "BundleIncludedEventsService#getEventsIncludedInBundle", + message: `Updated spoke clients in ${duration / 1000} seconds for bundle ${bundle.id}`, + }); + const clients = { + hubPoolClient: this.hubPoolClient, + configStoreClient: this.configStoreClient, + arweaveClient: null as unknown as across.caching.ArweaveClient, // FIXME: This is a hack to avoid instantiating the Arweave client + }; + // Instantiate bundle data client and reconstruct bundle + const bundleDataClient = + new across.clients.BundleDataClient.BundleDataClient( + logger, + clients, + spokeClients, + bundle.proposal.chainIds, + ); + // Get bundle ranges as an array of [startBlock, endBlock] for each chain + const bundleBlockRanges = getBundleBlockRanges(bundle); + const bundleData = await bundleDataClient.loadData( + bundleBlockRanges, + spokeClients, + ); + + // Build pool rebalance root and check it matches with the root of the stored bundle + const poolRebalanceRoot = buildPoolRebalanceRoot( + bundleBlockRanges, + bundleData, + this.hubPoolClient, + this.configStoreClient, + ); + if (bundle.poolRebalanceRoot !== poolRebalanceRoot.tree.getHexRoot()) { + logger.warn({ + at: "BundleIncludedEventsService#getEventsIncludedInBundle", + message: `Mismatching roots. Skipping bundle ${bundle.id}.`, + }); + return; + } else { + const storedEvents = await bundleRepository.storeBundleEvents( + bundleData, + bundle.id, + ); + await bundleRepository.updateBundleEventsAssociatedFlag(bundle.id); + logger.info({ + at: "BundleIncludedEventsService#getEventsIncludedInBundle", + message: `Stored bundle events for bundle ${bundle.id}`, + storedEvents, + }); + } + } + + private getSpokeClientsForLookbackBlockRange( + lookbackRange: utils.ProposalRangeResult[], + spokePoolClientFactory: utils.SpokePoolClientFactory, + ) { + return lookbackRange.reduce( + (acc, { chainId, startBlock, endBlock }) => { + // We need to instantiate spoke clients using a higher end block than + // the bundle range as deposits which fills are included in this bundle could + // have occured outside the bundle range of the origin chain + // NOTE: A buffer time of 15 minutes has been proven to work for older bundles + const blockTime = getBlockTime(chainId); + const endBlockTimeBuffer = 60 * 15; + const blockBuffer = Math.round(endBlockTimeBuffer / blockTime); + return { + ...acc, + [chainId]: spokePoolClientFactory.get( + chainId, + startBlock, + endBlock + blockBuffer, + { + hubPoolClient: this.hubPoolClient, + }, + ), + }; + }, + {} as Record, + ); + } +} diff --git a/packages/indexer/src/services/BundleServicesManager.ts b/packages/indexer/src/services/BundleServicesManager.ts index 2d21c22..17dd2ef 100644 --- a/packages/indexer/src/services/BundleServicesManager.ts +++ b/packages/indexer/src/services/BundleServicesManager.ts @@ -11,10 +11,12 @@ import { } from "../utils"; import { RetryProvidersFactory } from "../web3/RetryProvidersFactory"; import { BundleRepository } from "../database/BundleRepository"; +import { BundleIncludedEventsService } from "./BundleIncludedEventsService"; export class BundleServicesManager { private bundleEventsProcessor?: BundleEventsProcessor; private bundleBuilderService?: BundleBuilderService; + private bundleIncludedEventsService?: BundleIncludedEventsService; public constructor( private config: Config, @@ -31,12 +33,31 @@ export class BundleServicesManager { return Promise.all([ this.startBundleEventsProcessor(), this.startBundleBuilderService(), + this.startBundleIncludedEventsService(), ]); } public stop() { this.bundleEventsProcessor?.stop(); this.bundleBuilderService?.stop(); + this.bundleIncludedEventsService?.stop(); + } + + private startBundleIncludedEventsService() { + if (!this.config.enableBundleIncludedEventsService) { + this.logger.warn("Bundle included events service is disabled"); + return; + } + this.bundleIncludedEventsService = new BundleIncludedEventsService({ + hubChainId: this.config.hubChainId, + logger: this.logger, + redis: this.redis, + postgres: this.postgres, + hubPoolClientFactory: this.hubPoolClientFactory, + spokePoolClientFactory: this.spokePoolClientFactory, + bundleRepository: this.bundleRepository, + }); + return this.bundleIncludedEventsService.start(10); } private startBundleEventsProcessor() {