diff --git a/.dockerignore b/.dockerignore index fb3f980..96e057e 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,5 +1,6 @@ Dockerfile docker-compose.yml +out volumes **/dist **/node_modules diff --git a/packages/indexer-database/src/entities/evm/ProposedRootBundle.ts b/packages/indexer-database/src/entities/evm/ProposedRootBundle.ts index 4a7d513..135dbd9 100644 --- a/packages/indexer-database/src/entities/evm/ProposedRootBundle.ts +++ b/packages/indexer-database/src/entities/evm/ProposedRootBundle.ts @@ -48,6 +48,9 @@ export class ProposedRootBundle { @Column() blockNumber: number; + @Column() + finalised: boolean; + @CreateDateColumn() createdAt: Date; } diff --git a/packages/indexer-database/src/entities/evm/RootBundleCanceled.ts b/packages/indexer-database/src/entities/evm/RootBundleCanceled.ts index 7646731..f6ea949 100644 --- a/packages/indexer-database/src/entities/evm/RootBundleCanceled.ts +++ b/packages/indexer-database/src/entities/evm/RootBundleCanceled.ts @@ -30,6 +30,9 @@ export class RootBundleCanceled { @Column() blockNumber: number; + @Column() + finalised: boolean; + @CreateDateColumn() createdAt: Date; } diff --git a/packages/indexer-database/src/entities/evm/RootBundleDisputed.ts b/packages/indexer-database/src/entities/evm/RootBundleDisputed.ts index d928aa8..728bd47 100644 --- a/packages/indexer-database/src/entities/evm/RootBundleDisputed.ts +++ b/packages/indexer-database/src/entities/evm/RootBundleDisputed.ts @@ -30,6 +30,9 @@ export class RootBundleDisputed { @Column() blockNumber: number; + @Column() + finalised: boolean; + @CreateDateColumn() createdAt: Date; } diff --git a/packages/indexer-database/src/entities/evm/RootBundleExecuted.ts b/packages/indexer-database/src/entities/evm/RootBundleExecuted.ts index 13dc9e6..57e172a 100644 --- a/packages/indexer-database/src/entities/evm/RootBundleExecuted.ts +++ b/packages/indexer-database/src/entities/evm/RootBundleExecuted.ts @@ -53,6 +53,9 @@ export class RootBundleExecuted { @Column() blockNumber: number; + @Column() + finalised: boolean; + @CreateDateColumn() createdAt: Date; } diff --git a/packages/indexer-database/src/entities/evm/SetPoolRebalanceRoute.ts b/packages/indexer-database/src/entities/evm/SetPoolRebalanceRoute.ts index 7a2ebde..4fbc491 100644 --- a/packages/indexer-database/src/entities/evm/SetPoolRebalanceRoute.ts +++ b/packages/indexer-database/src/entities/evm/SetPoolRebalanceRoute.ts @@ -7,7 +7,7 @@ import { } from "typeorm"; @Entity({ schema: "evm" }) -@Unique("UK_setPoolRebalanceRoute_transactionHash_transactionIndex_logIndex", [ +@Unique("UK_spr_transactionHash_transactionIndex_logIndex", [ "transactionHash", "transactionIndex", "logIndex", @@ -37,6 +37,9 @@ export class SetPoolRebalanceRoute { @Column({ nullable: false }) logIndex: number; + @Column() + finalised: boolean; + @CreateDateColumn() createdAt: Date; } diff --git a/packages/indexer-database/src/migrations/1727686818331-HubPoolFinalised.ts b/packages/indexer-database/src/migrations/1727686818331-HubPoolFinalised.ts new file mode 100644 index 0000000..3422c71 --- /dev/null +++ b/packages/indexer-database/src/migrations/1727686818331-HubPoolFinalised.ts @@ -0,0 +1,53 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class HubPoolFinalised1727686818331 implements MigrationInterface { + name = "HubPoolFinalised1727686818331"; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `ALTER TABLE "evm"."set_pool_rebalance_route" DROP CONSTRAINT "UK_setPoolRebalanceRoute_transactionHash_transactionIndex_logIn"`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."set_pool_rebalance_route" ADD CONSTRAINT "UK_spr_transactionHash_transactionIndex_logIndex" UNIQUE ("transactionHash", "transactionIndex", "logIndex")`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."proposed_root_bundle" ADD "finalised" boolean NOT NULL`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."root_bundle_disputed" ADD "finalised" boolean NOT NULL`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."root_bundle_executed" ADD "finalised" boolean NOT NULL`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."root_bundle_canceled" ADD "finalised" boolean NOT NULL`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."set_pool_rebalance_route" ADD "finalised" boolean NOT NULL`, + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `ALTER TABLE "evm"."set_pool_rebalance_route" DROP CONSTRAINT "UK_spr_transactionHash_transactionIndex_logIndex"`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."set_pool_rebalance_route" ADD CONSTRAINT "UK_setPoolRebalanceRoute_transactionHash_transactionIndex_logIn" UNIQUE ("transactionHash", "transactionIndex", "logIndex")`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."set_pool_rebalance_route" DROP COLUMN "finalised"`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."root_bundle_executed" DROP COLUMN "finalised"`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."root_bundle_disputed" DROP COLUMN "finalised"`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."proposed_root_bundle" DROP COLUMN "finalised"`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."root_bundle_canceled" DROP COLUMN "finalised"`, + ); + } +} diff --git a/packages/indexer/src/database/HubPoolRepository.ts b/packages/indexer/src/database/HubPoolRepository.ts index 764f7ea..909d397 100644 --- a/packages/indexer/src/database/HubPoolRepository.ts +++ b/packages/indexer/src/database/HubPoolRepository.ts @@ -3,17 +3,13 @@ import * as across from "@across-protocol/sdk"; import { DataSource, entities, utils } from "@repo/indexer-database"; export class HubPoolRepository extends utils.BaseRepository { - constructor( - postgres: DataSource, - logger: winston.Logger, - throwError: boolean, - ) { - super(postgres, logger, throwError); + constructor(postgres: DataSource, logger: winston.Logger) { + super(postgres, logger, true); } public async formatAndSaveProposedRootBundleEvents( proposedRootBundleEvents: across.interfaces.ProposedRootBundle[], - throwError?: boolean, + lastFinalisedBlock: number, ) { const formattedEvents = proposedRootBundleEvents.map((event) => { return { @@ -24,41 +20,59 @@ export class HubPoolRepository extends utils.BaseRepository { bundleEvaluationBlockNumbers: event.bundleEvaluationBlockNumbers.map( (blockNumber) => parseInt(blockNumber.toString()), ), + finalised: event.blockNumber <= lastFinalisedBlock, }; }); - await this.insert(entities.ProposedRootBundle, formattedEvents, throwError); + await this.postgres + .createQueryBuilder(entities.ProposedRootBundle, "b") + .insert() + .values(formattedEvents) + .orUpdate(["finalised"], ["transactionHash"]) + .execute(); } public async formatAndSaveRootBundleDisputedEvents( rootBundleDisputedEvents: across.interfaces.DisputedRootBundle[], - throwError?: boolean, + lastFinalisedBlock: number, ) { const formattedEvents = rootBundleDisputedEvents.map((event) => { return { ...event, requestTime: new Date(event.requestTime * 1000), + finalised: event.blockNumber <= lastFinalisedBlock, }; }); - await this.insert(entities.RootBundleDisputed, formattedEvents, throwError); + await this.postgres + .createQueryBuilder(entities.RootBundleDisputed, "b") + .insert() + .values(formattedEvents) + .orUpdate(["finalised"], ["transactionHash"]) + .execute(); } public async formatAndSaveRootBundleCanceledEvents( rootBundleCanceledEvents: across.interfaces.CancelledRootBundle[], - throwError?: boolean, + lastFinalisedBlock: number, ) { const formattedEvents = rootBundleCanceledEvents.map((event) => { return { ...event, caller: event.disputer, requestTime: new Date(event.requestTime * 1000), + finalised: event.blockNumber <= lastFinalisedBlock, }; }); - await this.insert(entities.RootBundleCanceled, formattedEvents, throwError); + await this.postgres + .createQueryBuilder(entities.RootBundleCanceled, "b") + .insert() + .values(formattedEvents) + .orUpdate(["finalised"], ["transactionHash"]) + .execute(); } public async formatAndSaveRootBundleExecutedEvents( rootBundleExecutedEvents: across.interfaces.ExecutedRootBundle[], - throwError?: boolean, + lastFinalisedBlock: number, ) { const formattedEvents = rootBundleExecutedEvents.map((event) => { return { @@ -68,16 +82,31 @@ export class HubPoolRepository extends utils.BaseRepository { runningBalances: event.runningBalances.map((balance) => balance.toString(), ), + finalised: event.blockNumber <= lastFinalisedBlock, }; }); - await this.insert(entities.RootBundleExecuted, formattedEvents, throwError); + // Split the events into chunks of 1000 to avoid exceeding the max query length + const chunks = across.utils.chunk(formattedEvents, 1000); + await Promise.all( + chunks.map((chunk) => + this.postgres + .createQueryBuilder(entities.RootBundleExecuted, "b") + .insert() + .values(chunk) + .orUpdate( + ["finalised"], + ["chainId", "leafId", "groupIndex", "transactionHash"], + ) + .execute(), + ), + ); } public async formatAndSaveSetPoolRebalanceRouteEvents( setPoolRebalanceRouteEvents: (across.interfaces.DestinationTokenWithBlock & { l2ChainId: number; })[], - throwError?: boolean, + lastFinalisedBlock: number, ) { const formattedEvents = setPoolRebalanceRouteEvents.map((event) => { return { @@ -85,13 +114,18 @@ export class HubPoolRepository extends utils.BaseRepository { destinationChainId: event.l2ChainId, destinationToken: event.l2Token, l1Token: event.l1Token, + finalised: event.blockNumber <= lastFinalisedBlock, }; }); - await this.insert( - entities.SetPoolRebalanceRoute, - formattedEvents, - throwError, - ); + await this.postgres + .createQueryBuilder(entities.SetPoolRebalanceRoute, "b") + .insert() + .values(formattedEvents) + .orUpdate( + ["finalised"], + ["transactionHash", "transactionIndex", "logIndex"], + ) + .execute(); } /** diff --git a/packages/indexer/src/main.ts b/packages/indexer/src/main.ts index 4ba95e4..2f8e53f 100644 --- a/packages/indexer/src/main.ts +++ b/packages/indexer/src/main.ts @@ -2,10 +2,22 @@ import * as services from "./services"; import winston from "winston"; import Redis from "ioredis"; import * as across from "@across-protocol/sdk"; +import * as acrossConstants from "@across-protocol/constants"; +import { providers } from "ethers"; + import { connectToDatabase } from "./database/database.provider"; import * as parseEnv from "./parseEnv"; import { RetryProvidersFactory } from "./web3/RetryProvidersFactory"; import { RedisCache } from "./redis/redisCache"; +import { DatabaseConfig } from "@repo/indexer-database"; +import { HubPoolIndexerDataHandler } from "./services/HubPoolIndexerDataHandler"; +import * as utils from "./utils"; +import { + getFinalisedBlockBufferDistance, + getLoopWaitTimeSeconds, + Indexer, +} from "./data-indexing/service"; +import { HubPoolRepository } from "./database/HubPoolRepository"; async function initializeRedis( config: parseEnv.RedisConfig, @@ -29,22 +41,17 @@ async function initializeRedis( } export async function Main(config: parseEnv.Config, logger: winston.Logger) { - const { redisConfig, postgresConfig, hubConfig, spokeConfigs } = config; + const { redisConfig, postgresConfig, spokeConfigs } = config; const redis = await initializeRedis(redisConfig, logger); const redisCache = new RedisCache(redis); const retryProvidersFactory = new RetryProvidersFactory(redisCache, logger); + retryProvidersFactory.initializeProviders(); const postgres = await connectToDatabase(postgresConfig, logger); const bundleProcessor = new services.bundles.Processor({ logger, redis, postgres, }); - const hubPoolIndexer = new services.hubPoolIndexer.Indexer({ - logger, - redis, - postgres, - ...hubConfig, - }); const spokePoolIndexers = spokeConfigs.map((spokeConfig) => { return new services.spokePoolIndexer.Indexer({ logger, @@ -54,6 +61,32 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { }); }); + const hubPoolIndexerDataHandler = new HubPoolIndexerDataHandler( + logger, + acrossConstants.CHAIN_IDs.MAINNET, + retryProvidersFactory.getProviderForChainId( + acrossConstants.CHAIN_IDs.MAINNET, + ), + new HubPoolRepository(postgres, logger), + ); + const hubPoolIndexer = new Indexer( + { + loopWaitTimeSeconds: getLoopWaitTimeSeconds( + acrossConstants.CHAIN_IDs.MAINNET, + ), + finalisedBlockBufferDistance: getFinalisedBlockBufferDistance( + acrossConstants.CHAIN_IDs.MAINNET, + ), + }, + hubPoolIndexerDataHandler, + retryProvidersFactory.getProviderForChainId( + acrossConstants.CHAIN_IDs.MAINNET, + ), + new RedisCache(redis), + logger, + ); + await hubPoolIndexer.start(); + let exitRequested = false; process.on("SIGINT", () => { if (!exitRequested) { @@ -61,7 +94,7 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { "\nWait for shutdown, or press Ctrl+C again to forcefully exit.", ); spokePoolIndexers.map((s) => s.stop()); - hubPoolIndexer.stop(); + hubPoolIndexer.stopGracefully(); } else { logger.info("\nForcing exit..."); redis?.quit(); @@ -79,7 +112,6 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { const [bundleResults, hubPoolResult, ...spokeResults] = await Promise.allSettled([ bundleProcessor.start(10), - hubPoolIndexer.start(10), ...spokePoolIndexers.map((s) => s.start(10)), ]); @@ -91,7 +123,6 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { (r) => r.status === "fulfilled", ), bundleProcessorRunSuccess: bundleResults.status === "fulfilled", - hubPoolRunSuccess: hubPoolResult.status === "fulfilled", }, }); diff --git a/packages/indexer/src/parseEnv.ts b/packages/indexer/src/parseEnv.ts index bd91d3e..39a6966 100644 --- a/packages/indexer/src/parseEnv.ts +++ b/packages/indexer/src/parseEnv.ts @@ -12,10 +12,6 @@ export type Config = { services.spokePoolIndexer.Config, "logger" | "redis" | "postgres" >[]; - hubConfig: Omit< - services.hubPoolIndexer.Config, - "logger" | "redis" | "postgres" - >; }; export type RedisConfig = { host: string; @@ -236,7 +232,6 @@ export function envToConfig(env: Env): Config { return { redisConfig, postgresConfig, - hubConfig, spokeConfigs, }; } diff --git a/packages/indexer/src/services/HubPoolIndexerDataHandler.ts b/packages/indexer/src/services/HubPoolIndexerDataHandler.ts new file mode 100644 index 0000000..7df9a94 --- /dev/null +++ b/packages/indexer/src/services/HubPoolIndexerDataHandler.ts @@ -0,0 +1,171 @@ +import { Logger } from "winston"; +import * as across from "@across-protocol/sdk"; + +import * as utils from "../utils"; +import { + getDeployedBlockNumber, + getDeployedAddress, +} from "@across-protocol/contracts"; +import { IndexerDataHandler } from "../data-indexing/service/IndexerDataHandler"; +import { BlockRange } from "../data-indexing/model"; +import { HubPoolRepository } from "../database/HubPoolRepository"; +import { getMaxBlockLookBack } from "../web3/constants"; + +type FetchEventsResult = { + proposedRootBundleEvents: (across.interfaces.ProposedRootBundle & { + chainIds: number[]; + })[]; + rootBundleCanceledEvents: across.interfaces.CancelledRootBundle[]; + rootBundleDisputedEvents: across.interfaces.DisputedRootBundle[]; + rootBundleExecutedEvents: across.interfaces.ExecutedRootBundle[]; + setPoolRebalanceRouteEvents: (across.interfaces.DestinationTokenWithBlock & { + l2ChainId: number; + })[]; +}; +export class HubPoolIndexerDataHandler implements IndexerDataHandler { + private hubPoolClient: across.clients.HubPoolClient; + private configStoreClient: across.clients.AcrossConfigStoreClient; + private isInitialized: boolean; + + constructor( + private logger: Logger, + private chainId: number, + private provider: across.providers.RetryProvider, + private hubPoolRepository: HubPoolRepository, + ) { + this.isInitialized = false; + } + + public getDataIdentifier() { + return `${getDeployedAddress("HubPool", this.chainId)}:${this.chainId}`; + } + + public getStartIndexingBlockNumber() { + const deployedBlockNumber = getDeployedBlockNumber("HubPool", this.chainId); + return deployedBlockNumber; + } + + public async processBlockRange( + blockRange: BlockRange, + lastFinalisedBlock: number, + ) { + this.logger.info({ + message: "HubPoolIndexerDataHandler::Processing block range", + blockRange, + lastFinalisedBlock, + }); + if (!this.isInitialized) { + await this.initialize(); + this.isInitialized = true; + } + const events = await this.fetchEventsByRange(blockRange); + this.logger.info({ + message: "HubPoolIndexerDataHandler::Found events", + events: { + proposedRootBundleEvents: events.proposedRootBundleEvents.length, + rootBundleExecutedEvents: events.rootBundleExecutedEvents.length, + rootBundleCanceledEvents: events.rootBundleCanceledEvents.length, + rootBundleDisputedEvents: events.rootBundleDisputedEvents.length, + setPoolRebalanceRouteEvents: events.setPoolRebalanceRouteEvents.length, + }, + }); + await this.storeEvents(events, lastFinalisedBlock); + } + + private async initialize() { + this.configStoreClient = await utils.getConfigStoreClient({ + logger: this.logger, + provider: this.provider, + maxBlockLookBack: getMaxBlockLookBack(this.chainId), + chainId: this.chainId, + }); + this.hubPoolClient = await utils.getHubPoolClient({ + configStoreClient: this.configStoreClient, + provider: this.provider, + logger: this.logger, + maxBlockLookBack: getMaxBlockLookBack(this.chainId), + chainId: this.chainId, + }); + } + + private async fetchEventsByRange( + blockRange: BlockRange, + ): Promise { + const { hubPoolClient, configStoreClient } = this; + + configStoreClient.eventSearchConfig.toBlock = blockRange.to; + hubPoolClient.eventSearchConfig.fromBlock = blockRange.from; + hubPoolClient.eventSearchConfig.toBlock = blockRange.to; + await configStoreClient.update(); + await hubPoolClient.update(); + const proposedRootBundleEvents = + hubPoolClient.getProposedRootBundlesInBlockRange( + blockRange.from, + blockRange.to, + ); + const rootBundleCanceledEvents = + hubPoolClient.getCancelledRootBundlesInBlockRange( + blockRange.from, + blockRange.to, + ); + const rootBundleDisputedEvents = + hubPoolClient.getDisputedRootBundlesInBlockRange( + blockRange.from, + blockRange.to, + ); + const setPoolRebalanceRouteEvents = + hubPoolClient.getTokenMappingsModifiedInBlockRange( + blockRange.from, + blockRange.to, + ); + // we do not have a block range query for executed root bundles + const rootBundleExecutedEvents = hubPoolClient.getExecutedRootBundles(); + + return { + // we need to make sure we filter out all unecessary events for the block range requested + proposedRootBundleEvents: proposedRootBundleEvents.map((p) => ({ + ...p, + chainIds: configStoreClient.getChainIdIndicesForBlock(p.blockNumber), + })), + rootBundleCanceledEvents, + rootBundleDisputedEvents, + rootBundleExecutedEvents: rootBundleExecutedEvents.filter( + (event) => + event.blockNumber >= blockRange.from && + event.blockNumber <= blockRange.to, + ), + setPoolRebalanceRouteEvents, + }; + } + + async storeEvents(events: FetchEventsResult, lastFinalisedBlock: number) { + const { hubPoolRepository } = this; + const { + proposedRootBundleEvents, + rootBundleCanceledEvents, + rootBundleDisputedEvents, + rootBundleExecutedEvents, + setPoolRebalanceRouteEvents, + } = events; + await hubPoolRepository.formatAndSaveProposedRootBundleEvents( + proposedRootBundleEvents, + lastFinalisedBlock, + ); + await hubPoolRepository.formatAndSaveRootBundleCanceledEvents( + rootBundleCanceledEvents, + lastFinalisedBlock, + ); + await hubPoolRepository.formatAndSaveRootBundleDisputedEvents( + rootBundleDisputedEvents, + lastFinalisedBlock, + ); + await hubPoolRepository.formatAndSaveRootBundleExecutedEvents( + rootBundleExecutedEvents, + lastFinalisedBlock, + ); + await hubPoolRepository.formatAndSaveSetPoolRebalanceRouteEvents( + setPoolRebalanceRouteEvents, + lastFinalisedBlock, + ); + } +} diff --git a/packages/indexer/src/services/hubPoolIndexer.ts b/packages/indexer/src/services/hubPoolIndexer.ts deleted file mode 100644 index 2a2f27c..0000000 --- a/packages/indexer/src/services/hubPoolIndexer.ts +++ /dev/null @@ -1,222 +0,0 @@ -import { DataSource } from "@repo/indexer-database"; -import Redis from "ioredis"; -import winston from "winston"; -import { BaseIndexer } from "../generics"; -import * as utils from "../utils"; -import { RangeQueryStore, Ranges } from "../redis/rangeQueryStore"; -import { RedisCache } from "../redis/redisCache"; -import * as across from "@across-protocol/sdk"; -import { HubPoolRepository } from "../database/HubPoolRepository"; -import { getDeployedBlockNumber } from "@across-protocol/contracts"; -import { differenceWith, isEqual } from "lodash"; - -export type Config = { - logger: winston.Logger; - redis: Redis; - postgres: DataSource; - retryProviderConfig: utils.RetryProviderConfig; - hubConfig: { - chainId: number; - maxBlockLookBack: number; - }; - redisKeyPrefix: string; -}; - -/** - * Indexer for the hubpool contract and its component events - */ -export class Indexer extends BaseIndexer { - private resolvedRangeStore: RangeQueryStore; - private hubPoolRepository: HubPoolRepository; - private hubPoolClient: across.clients.HubPoolClient; - private configStoreClient: across.clients.AcrossConfigStoreClient; - constructor(private readonly config: Config) { - super(config.logger, "hubPool"); - } - protected async initialize(): Promise { - const { - logger, - redis, - retryProviderConfig, - hubConfig, - redisKeyPrefix, - postgres, - } = this.config; - this.resolvedRangeStore = new RangeQueryStore({ - redis, - prefix: `${redisKeyPrefix}:rangeQuery:resolved`, - }); - const redisCache = new RedisCache(redis); - const hubPoolProvider = utils.getRetryProvider({ - ...retryProviderConfig, - cache: redisCache, - logger, - ...hubConfig, - }); - const configStoreProvider = utils.getRetryProvider({ - ...retryProviderConfig, - cache: redisCache, - logger, - ...hubConfig, - }); - this.configStoreClient = utils.getConfigStoreClient({ - logger, - provider: configStoreProvider, - maxBlockLookBack: hubConfig.maxBlockLookBack, - chainId: hubConfig.chainId, - }); - this.hubPoolClient = utils.getHubPoolClient({ - configStoreClient: this.configStoreClient, - provider: hubPoolProvider, - logger, - maxBlockLookBack: hubConfig.maxBlockLookBack, - chainId: hubConfig.chainId, - }); - this.hubPoolRepository = new HubPoolRepository(postgres, logger, true); - } - - protected async indexerLogic(): Promise { - const allPendingQueries = await this.getUnprocessedRanges(); - this.logger.info({ - message: `Running hubpool indexer on ${allPendingQueries.length} block range requests`, - at: "HubpoolIndexer", - config: this.config.hubConfig, - }); - for (const query of allPendingQueries) { - if (this.stopRequested) break; - const [fromBlock, toBlock] = query; - try { - this.logger.info({ - message: `Starting hubpool update for block range ${fromBlock} to ${toBlock}`, - at: "HubpoolIndexer", - config: this.config.hubConfig, - query, - }); - const events = await this.fetchEventsByRange(fromBlock, toBlock); - // TODO: may need to catch error to see if there is some data that exists in db already or change storage to overwrite any existing values - await this.storeEvents(events); - - await this.resolvedRangeStore.setByRange(fromBlock, toBlock); - this.logger.info({ - message: `Completed hubpool update for block range ${fromBlock} to ${toBlock}`, - at: "HubpoolIndexer", - config: this.config.hubConfig, - query, - }); - } catch (error) { - if (error instanceof Error) { - this.logger.error({ - message: `Error hubpool updating for block range ${fromBlock} to ${toBlock}`, - at: "HubpoolIndexer", - config: this.config.hubConfig, - query, - errorMessage: error.message, - }); - } else { - // not an error type, throw it and crash app likely - throw error; - } - } - } - } - async getUnprocessedRanges(toBlock?: number): Promise { - const deployedBlockNumber = getDeployedBlockNumber( - "HubPool", - this.config.hubConfig.chainId, - ); - const latestBlockNumber = - toBlock ?? (await this.hubPoolClient.hubPool.provider.getBlockNumber()); - - const allPaginatedBlockRanges = across.utils.getPaginatedBlockRanges({ - fromBlock: deployedBlockNumber, - toBlock: latestBlockNumber, - maxBlockLookBack: this.config.hubConfig.maxBlockLookBack, - }); - - const allQueries = await this.resolvedRangeStore.entries(); - const resolvedRanges = allQueries.map(([, x]) => [x.fromBlock, x.toBlock]); - const needsProcessing = differenceWith( - allPaginatedBlockRanges, - resolvedRanges, - isEqual, - ); - - this.logger.info({ - message: `${needsProcessing.length} block ranges need processing`, - deployedBlockNumber, - latestBlockNumber, - at: "HubpoolIndexer", - config: this.config.hubConfig, - }); - - return needsProcessing; - } - - async fetchEventsByRange(fromBlock: number, toBlock: number) { - const { hubPoolClient, configStoreClient } = this; - - await configStoreClient.update(); - await hubPoolClient.update(); - const proposedRootBundleEvents = - hubPoolClient.getProposedRootBundlesInBlockRange(fromBlock, toBlock); - const rootBundleCanceledEvents = - hubPoolClient.getCancelledRootBundlesInBlockRange(fromBlock, toBlock); - const rootBundleDisputedEvents = - hubPoolClient.getDisputedRootBundlesInBlockRange(fromBlock, toBlock); - const setPoolRebalanceRouteEvents = - hubPoolClient.getTokenMappingsModifiedInBlockRange(fromBlock, toBlock); - // we do not have a block range query for executed root bundles - const rootBundleExecutedEvents = hubPoolClient.getExecutedRootBundles(); - - return { - // we need to make sure we filter out all unecessary events for the block range requested - proposedRootBundleEvents: proposedRootBundleEvents.map((p) => ({ - ...p, - chainIds: configStoreClient.getChainIdIndicesForBlock(p.blockNumber), - })), - rootBundleCanceledEvents, - rootBundleDisputedEvents, - rootBundleExecutedEvents: rootBundleExecutedEvents.filter( - (event) => - event.blockNumber >= fromBlock && event.blockNumber <= toBlock, - ), - setPoolRebalanceRouteEvents, - }; - } - - async storeEvents(params: { - proposedRootBundleEvents: (across.interfaces.ProposedRootBundle & { - chainIds: number[]; - })[]; - rootBundleCanceledEvents: across.interfaces.CancelledRootBundle[]; - rootBundleDisputedEvents: across.interfaces.DisputedRootBundle[]; - rootBundleExecutedEvents: across.interfaces.ExecutedRootBundle[]; - setPoolRebalanceRouteEvents: (across.interfaces.DestinationTokenWithBlock & { - l2ChainId: number; - })[]; - }) { - const { hubPoolRepository } = this; - const { - proposedRootBundleEvents, - rootBundleCanceledEvents, - rootBundleDisputedEvents, - rootBundleExecutedEvents, - setPoolRebalanceRouteEvents, - } = params; - await hubPoolRepository.formatAndSaveProposedRootBundleEvents( - proposedRootBundleEvents, - ); - await hubPoolRepository.formatAndSaveRootBundleCanceledEvents( - rootBundleCanceledEvents, - ); - await hubPoolRepository.formatAndSaveRootBundleDisputedEvents( - rootBundleDisputedEvents, - ); - await hubPoolRepository.formatAndSaveRootBundleExecutedEvents( - rootBundleExecutedEvents, - ); - await hubPoolRepository.formatAndSaveSetPoolRebalanceRouteEvents( - setPoolRebalanceRouteEvents, - ); - } -} diff --git a/packages/indexer/src/services/index.ts b/packages/indexer/src/services/index.ts index 036884d..1963143 100644 --- a/packages/indexer/src/services/index.ts +++ b/packages/indexer/src/services/index.ts @@ -1,4 +1,3 @@ export * as bundles from "./bundles"; export * as spokePoolIndexer from "./spokePoolIndexer"; -export * as hubPoolIndexer from "./hubPoolIndexer"; export * as spokeProcessor from "./spokePoolProcessor"; diff --git a/packages/indexer/src/web3/RetryProvidersFactory.ts b/packages/indexer/src/web3/RetryProvidersFactory.ts index f890fa2..07962bd 100644 --- a/packages/indexer/src/web3/RetryProvidersFactory.ts +++ b/packages/indexer/src/web3/RetryProvidersFactory.ts @@ -19,7 +19,7 @@ export class RetryProvidersFactory { for (const [chainId, providerUrls] of providersUrls.entries()) { const retryProviderEnvs = parseRetryProviderEnvs(chainId); if (!providerUrls || providerUrls.length === 0) { - throw new Error(`No provider urls found for chainId: ${chainId}`); + throw new Error(`Invalid provider urls found for chainId: ${chainId}`); } const standardTtlBlockDistance = getChainCacheFollowDistance(chainId); const provider = new providers.RetryProvider( diff --git a/packages/indexer/src/web3/constants.ts b/packages/indexer/src/web3/constants.ts index 42c2d19..2091560 100644 --- a/packages/indexer/src/web3/constants.ts +++ b/packages/indexer/src/web3/constants.ts @@ -59,3 +59,30 @@ export const getChainCacheFollowDistance = (chainId: number) => { return chainCacheFollowDistance; }; + +const MAX_BLOCK_LOOK_BACK = { + [CHAIN_IDs.MAINNET]: 10000, + [CHAIN_IDs.OPTIMISM]: 10000, + [CHAIN_IDs.POLYGON]: 10000, + [CHAIN_IDs.BOBA]: 4990, + [CHAIN_IDs.ZK_SYNC]: 10000, + [CHAIN_IDs.REDSTONE]: 10000, + [CHAIN_IDs.LISK]: 10000, + [CHAIN_IDs.BASE]: 10000, + [CHAIN_IDs.MODE]: 10000, + [CHAIN_IDs.ARBITRUM]: 10000, + [CHAIN_IDs.LINEA]: 5000, + [CHAIN_IDs.BLAST]: 10000, + [CHAIN_IDs.SCROLL]: 3000, + [CHAIN_IDs.ZORA]: 10000, +}; + +export const getMaxBlockLookBack = (chainId: number) => { + const maxBlockLookBack = MAX_BLOCK_LOOK_BACK[chainId]; + + if (!maxBlockLookBack) { + return 10_000; + } + + return maxBlockLookBack; +};