diff --git a/packages/indexer/src/database/SpokePoolRepository.ts b/packages/indexer/src/database/SpokePoolRepository.ts index c0f285fa..6ae5e84f 100644 --- a/packages/indexer/src/database/SpokePoolRepository.ts +++ b/packages/indexer/src/database/SpokePoolRepository.ts @@ -1,9 +1,9 @@ import winston from "winston"; import * as across from "@across-protocol/sdk"; -import { getRelayHashFromEvent } from "@across-protocol/sdk/dist/cjs/utils/SpokeUtils"; -import { DataSource, entities, utils } from "@repo/indexer-database"; +import { DataSource, entities, utils as dbUtils } from "@repo/indexer-database"; +import * as utils from "../utils"; -export class SpokePoolRepository extends utils.BaseRepository { +export class SpokePoolRepository extends dbUtils.BaseRepository { constructor( postgres: DataSource, logger: winston.Logger, @@ -30,15 +30,13 @@ export class SpokePoolRepository extends utils.BaseRepository { } public async formatAndSaveV3FundsDepositedEvents( - v3FundsDepositedEvents: (across.interfaces.DepositWithBlock & { - integratorId: string | undefined; - })[], + v3FundsDepositedEvents: utils.V3FundsDepositedWithIntegradorId[], lastFinalisedBlock: number, ) { const formattedEvents = v3FundsDepositedEvents.map((event) => { return { ...event, - relayHash: getRelayHashFromEvent(event), + relayHash: across.utils.getRelayHashFromEvent(event), ...this.formatRelayData(event), quoteTimestamp: new Date(event.quoteTimestamp * 1000), finalised: event.blockNumber <= lastFinalisedBlock, @@ -73,7 +71,7 @@ export class SpokePoolRepository extends utils.BaseRepository { }, {} as { [key: string]: any }, ), - relayHash: getRelayHashFromEvent(event), + relayHash: across.utils.getRelayHashFromEvent(event), ...this.formatRelayData(event), updatedRecipient: event.relayExecutionInfo.updatedRecipient, updatedOutputAmount: @@ -104,7 +102,7 @@ export class SpokePoolRepository extends utils.BaseRepository { const formattedEvents = requestedV3SlowFillEvents.map((event) => { return { ...event, - relayHash: getRelayHashFromEvent(event), + relayHash: across.utils.getRelayHashFromEvent(event), ...this.formatRelayData(event), finalised: event.blockNumber <= lastFinalisedBlock, }; diff --git a/packages/indexer/src/main.ts b/packages/indexer/src/main.ts index dc93b8aa..c98b08dc 100644 --- a/packages/indexer/src/main.ts +++ b/packages/indexer/src/main.ts @@ -23,6 +23,8 @@ import { import { SpokePoolIndexerDataHandler } from "./services/SpokePoolIndexerDataHandler"; import { SpokePoolProcessor } from "./services/spokePoolProcessor"; import { BundleRepository } from "./database/BundleRepository"; +import { IndexerQueuesService } from "./messaging/service"; +import { IntegratorIdWorker } from "./messaging/IntegratorIdWorker"; async function initializeRedis( config: parseEnv.RedisConfig, @@ -87,6 +89,10 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { spokePoolClientFactory, }); + const indexerQueuesService = new IndexerQueuesService(redis); + // Set up message workers + new IntegratorIdWorker(redis, postgres, logger, retryProvidersFactory); + const spokePoolIndexers = spokePoolChainsEnabled.map((chainId) => { const spokePoolIndexerDataHandler = new SpokePoolIndexerDataHandler( logger, @@ -98,6 +104,7 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { spokePoolClientFactory, new SpokePoolRepository(postgres, logger), new SpokePoolProcessor(postgres, logger, chainId), + indexerQueuesService, ); const spokePoolIndexer = new Indexer( { diff --git a/packages/indexer/src/messaging/IntegratorIdWorker.ts b/packages/indexer/src/messaging/IntegratorIdWorker.ts new file mode 100644 index 00000000..fceceb84 --- /dev/null +++ b/packages/indexer/src/messaging/IntegratorIdWorker.ts @@ -0,0 +1,72 @@ +import Redis from "ioredis"; +import winston from "winston"; +import { Job, Worker } from "bullmq"; +import { DataSource, entities } from "@repo/indexer-database"; +import { IndexerQueues } from "./service"; +import { getIntegratorId } from "../utils"; +import { RetryProvidersFactory } from "../web3/RetryProvidersFactory"; + +export type IntegratorIdMessage = { + relayHash: string; +}; + +/** + * This worker listens to the `IntegratorId` queue and processes each job by: + * - Retrieving the deposit information from the database based on the provided relay hash. + * - Checking if the deposit record already has an integrator ID. + * - If the integrator ID is not set, the worker fetches it from the transaction data. + * - If found, the integrator ID is saved back into the deposit record in the database. + */ +export class IntegratorIdWorker { + public worker: Worker; + constructor( + private redis: Redis, + private postgres: DataSource, + private logger: winston.Logger, + private providerFactory: RetryProvidersFactory, + ) { + this.setWorker(); + } + + public setWorker() { + this.worker = new Worker( + IndexerQueues.IntegratorId, + async (job: Job) => { + const { relayHash } = job.data; + const repository = this.postgres.getRepository( + entities.V3FundsDeposited, + ); + const deposit = await repository.findOne({ + where: { relayHash }, + }); + if (!deposit) { + this.logger.warn({ + at: "IntegratorIdWorker", + message: `Skipping deposit with relay hash ${relayHash}. Not found in the database.`, + }); + return; + } + if (deposit.integratorId !== null) { + this.logger.info({ + at: "IntegratorIdWorker", + message: `Skipping deposit with relay hash ${relayHash}. IntegratorId field already populated.`, + }); + return; + } + const provider = this.providerFactory.getProviderForChainId( + deposit.originChainId, + ); + const integratorId = await getIntegratorId( + provider, + deposit.quoteTimestamp, + deposit.transactionHash, + ); + if (integratorId) { + await repository.update({ relayHash }, { integratorId }); + } + return; + }, + { connection: this.redis, concurrency: 10 }, + ); + } +} diff --git a/packages/indexer/src/messaging/service.ts b/packages/indexer/src/messaging/service.ts index 085335ae..9f23abfe 100644 --- a/packages/indexer/src/messaging/service.ts +++ b/packages/indexer/src/messaging/service.ts @@ -1,10 +1,12 @@ import Redis from "ioredis"; -import { Queue, JobsOptions } from "bullmq"; +import { Queue, JobsOptions, BulkJobOptions } from "bullmq"; -export enum IndexerQueues {} +export enum IndexerQueues { + IntegratorId = "IntegratorId", +} export class IndexerQueuesService { - private queues = {} as Record; + private queues = {} as Record; constructor(private connection: Redis) { this.initializeQueues(); @@ -16,6 +18,10 @@ export class IndexerQueuesService { (queueName) => (this.queues[queueName] = new Queue(queueName, { connection: this.connection, + defaultJobOptions: { + attempts: Number.MAX_SAFE_INTEGER, + removeOnComplete: true, + }, })), ); } @@ -36,10 +42,13 @@ export class IndexerQueuesService { queue: IndexerQueues, jobName: string, messages: T[], + options: BulkJobOptions = {}, ) { const q = this.queues[queue]; if (q) { - await q.addBulk(messages.map((m) => ({ name: jobName, data: m }))); + await q.addBulk( + messages.map((m) => ({ name: jobName, data: m, opts: options })), + ); } } } diff --git a/packages/indexer/src/parseEnv.ts b/packages/indexer/src/parseEnv.ts index 62b815cb..80e4dfa6 100644 --- a/packages/indexer/src/parseEnv.ts +++ b/packages/indexer/src/parseEnv.ts @@ -12,6 +12,7 @@ export type Config = { export type RedisConfig = { host: string; port: number; + maxRetriesPerRequest: null; }; export type ProviderConfig = [providerUrl: string, chainId: number]; @@ -24,6 +25,8 @@ export function parseRedisConfig(env: Env): RedisConfig { return { host: env.REDIS_HOST, port, + // @dev: this retry config is needed for bullmq workers + maxRetriesPerRequest: null, }; } diff --git a/packages/indexer/src/services/SpokePoolIndexerDataHandler.ts b/packages/indexer/src/services/SpokePoolIndexerDataHandler.ts index 9695a3c6..75ecf8e1 100644 --- a/packages/indexer/src/services/SpokePoolIndexerDataHandler.ts +++ b/packages/indexer/src/services/SpokePoolIndexerDataHandler.ts @@ -4,19 +4,19 @@ import { getDeployedAddress, getDeployedBlockNumber, } from "@across-protocol/contracts"; +import { entities } from "@repo/indexer-database"; import { BlockRange } from "../data-indexing/model"; import { IndexerDataHandler } from "../data-indexing/service/IndexerDataHandler"; import * as utils from "../utils"; -import { providers } from "@across-protocol/sdk"; import { SpokePoolRepository } from "../database/SpokePoolRepository"; import { SpokePoolProcessor } from "./spokePoolProcessor"; +import { IndexerQueues, IndexerQueuesService } from "../messaging/service"; +import { IntegratorIdMessage } from "../messaging/IntegratorIdWorker"; type FetchEventsResult = { - v3FundsDepositedEvents: (across.interfaces.DepositWithBlock & { - integratorId: string | undefined; - })[]; + v3FundsDepositedEvents: utils.V3FundsDepositedWithIntegradorId[]; filledV3RelayEvents: across.interfaces.FillWithBlock[]; requestedV3SlowFillEvents: across.interfaces.SlowFillRequestWithBlock[]; requestedSpeedUpV3Events: { @@ -38,12 +38,13 @@ export class SpokePoolIndexerDataHandler implements IndexerDataHandler { private logger: Logger, private chainId: number, private hubPoolChainId: number, - private provider: providers.RetryProvider, + private provider: across.providers.RetryProvider, private configStoreFactory: utils.ConfigStoreClientFactory, private hubPoolFactory: utils.HubPoolClientFactory, private spokePoolFactory: utils.SpokePoolClientFactory, private spokePoolClientRepository: SpokePoolRepository, private spokePoolProcessor: SpokePoolProcessor, + private indexerQueuesService: IndexerQueuesService, ) { this.isInitialized = false; } @@ -94,7 +95,20 @@ export class SpokePoolIndexerDataHandler implements IndexerDataHandler { blockRange, identifier: this.getDataIdentifier(), }); + + // Fetch integratorId synchronously when there are fewer than 1K deposit events + // For larger sets, use the IntegratorId queue for asynchronous processing + const fetchIntegratorIdSync = events.v3FundsDepositedEvents.length < 1000; + if (fetchIntegratorIdSync) { + this.appendIntegratorIdToDeposits(events.v3FundsDepositedEvents); + } + const storedEvents = await this.storeEvents(events, lastFinalisedBlock); + + if (!fetchIntegratorIdSync) { + await this.publishIntegratorIdMessages(storedEvents.deposits); + } + await this.spokePoolProcessor.process(storedEvents); } @@ -122,14 +136,6 @@ export class SpokePoolIndexerDataHandler implements IndexerDataHandler { fromBlock: blockRange.from, toBlock: blockRange.to, }); - const v3FundsDepositedWithIntegradorId = await Promise.all( - v3FundsDepositedEvents.map(async (deposit) => { - return { - ...deposit, - integratorId: await this.getIntegratorId(deposit), - }; - }), - ); const filledV3RelayEvents = spokePoolClient.getFills(); const requestedV3SlowFillEvents = spokePoolClient.getSlowFillRequestsForOriginChain(this.chainId); @@ -140,7 +146,7 @@ export class SpokePoolIndexerDataHandler implements IndexerDataHandler { const tokensBridgedEvents = spokePoolClient.getTokensBridged(); return { - v3FundsDepositedEvents: v3FundsDepositedWithIntegradorId, + v3FundsDepositedEvents, filledV3RelayEvents, requestedV3SlowFillEvents, requestedSpeedUpV3Events, @@ -217,18 +223,34 @@ export class SpokePoolIndexerDataHandler implements IndexerDataHandler { ); } - private async getIntegratorId(deposit: across.interfaces.DepositWithBlock) { - const INTEGRATOR_DELIMITER = "1dc0de"; - const INTEGRATOR_ID_LENGTH = 4; // Integrator ids are 4 characters long - let integratorId = undefined; - const txn = await this.provider.getTransaction(deposit.transactionHash); - const txnData = txn.data; - if (txnData.includes(INTEGRATOR_DELIMITER)) { - integratorId = txnData - .split(INTEGRATOR_DELIMITER) - .pop() - ?.substring(0, INTEGRATOR_ID_LENGTH); - } - return integratorId; + private async appendIntegratorIdToDeposits( + deposits: utils.V3FundsDepositedWithIntegradorId[], + ) { + await across.utils.forEachAsync( + deposits, + async (deposit, index, deposits) => { + const integratorId = await utils.getIntegratorId( + this.provider, + new Date(deposit.quoteTimestamp * 1000), + deposit.transactionHash, + ); + deposits[index] = { ...deposit, integratorId }; + }, + ); + } + + private async publishIntegratorIdMessages( + deposits: entities.V3FundsDeposited[], + ) { + const messages: IntegratorIdMessage[] = deposits.map((deposit) => { + return { + relayHash: deposit.relayHash, + }; + }); + await this.indexerQueuesService.publishMessagesBulk( + IndexerQueues.IntegratorId, + IndexerQueues.IntegratorId, // Use queue name as job name + messages, + ); } } diff --git a/packages/indexer/src/utils/index.ts b/packages/indexer/src/utils/index.ts index 99d2ab96..1faf5dc2 100644 --- a/packages/indexer/src/utils/index.ts +++ b/packages/indexer/src/utils/index.ts @@ -1,3 +1,4 @@ export * from "./contractUtils"; export * from "./contractFactoryUtils"; export * from "./bundleBuilderUtils"; +export * from "./spokePoolUtils"; diff --git a/packages/indexer/src/utils/spokePoolUtils.ts b/packages/indexer/src/utils/spokePoolUtils.ts new file mode 100644 index 00000000..c03f0157 --- /dev/null +++ b/packages/indexer/src/utils/spokePoolUtils.ts @@ -0,0 +1,39 @@ +import { interfaces, providers } from "@across-protocol/sdk"; + +export type V3FundsDepositedWithIntegradorId = interfaces.DepositWithBlock & { + integratorId?: string | undefined; +}; + +/** + * Retrieves the 4-character integrator ID from the transaction data + * associated with the provided transaction hash, if present. + * The integrator ID is expected to be found after the delimiter "1dc0de" in the transaction data. + * @async + * @param provider The provider to fetch transaction details from. + * @param depositDate + * @param txHash The transaction hash to retrieve the input data of. + * @returns The 4-character integrator ID if found, otherwise undefined. + */ +export async function getIntegratorId( + provider: providers.RetryProvider, + depositDate: Date, + txHash: string, +) { + // If deposit was made before integratorId implementation, skip request + const INTEGRATOR_ID_IMPLEMENTATION_DATE = new Date(1718274000 * 1000); + if (depositDate < INTEGRATOR_ID_IMPLEMENTATION_DATE) { + return; + } + const INTEGRATOR_DELIMITER = "1dc0de"; + const INTEGRATOR_ID_LENGTH = 4; // Integrator ids are 4 characters long + let integratorId = undefined; + const txn = await provider.getTransaction(txHash); + const txnData = txn.data; + if (txnData.includes(INTEGRATOR_DELIMITER)) { + integratorId = txnData + .split(INTEGRATOR_DELIMITER) + .pop() + ?.substring(0, INTEGRATOR_ID_LENGTH); + } + return integratorId; +}