Skip to content

Commit

Permalink
feat: fetch integrator id asynchronously when backfilling (#83)
Browse files Browse the repository at this point in the history
  • Loading branch information
melisaguevara authored Oct 28, 2024
1 parent 58fda7f commit 323be1c
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 40 deletions.
16 changes: 7 additions & 9 deletions packages/indexer/src/database/SpokePoolRepository.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import winston from "winston";
import * as across from "@across-protocol/sdk";
import { getRelayHashFromEvent } from "@across-protocol/sdk/dist/cjs/utils/SpokeUtils";
import { DataSource, entities, utils } from "@repo/indexer-database";
import { DataSource, entities, utils as dbUtils } from "@repo/indexer-database";
import * as utils from "../utils";

export class SpokePoolRepository extends utils.BaseRepository {
export class SpokePoolRepository extends dbUtils.BaseRepository {
constructor(
postgres: DataSource,
logger: winston.Logger,
Expand All @@ -30,15 +30,13 @@ export class SpokePoolRepository extends utils.BaseRepository {
}

public async formatAndSaveV3FundsDepositedEvents(
v3FundsDepositedEvents: (across.interfaces.DepositWithBlock & {
integratorId: string | undefined;
})[],
v3FundsDepositedEvents: utils.V3FundsDepositedWithIntegradorId[],
lastFinalisedBlock: number,
) {
const formattedEvents = v3FundsDepositedEvents.map((event) => {
return {
...event,
relayHash: getRelayHashFromEvent(event),
relayHash: across.utils.getRelayHashFromEvent(event),
...this.formatRelayData(event),
quoteTimestamp: new Date(event.quoteTimestamp * 1000),
finalised: event.blockNumber <= lastFinalisedBlock,
Expand Down Expand Up @@ -73,7 +71,7 @@ export class SpokePoolRepository extends utils.BaseRepository {
},
{} as { [key: string]: any },
),
relayHash: getRelayHashFromEvent(event),
relayHash: across.utils.getRelayHashFromEvent(event),
...this.formatRelayData(event),
updatedRecipient: event.relayExecutionInfo.updatedRecipient,
updatedOutputAmount:
Expand Down Expand Up @@ -104,7 +102,7 @@ export class SpokePoolRepository extends utils.BaseRepository {
const formattedEvents = requestedV3SlowFillEvents.map((event) => {
return {
...event,
relayHash: getRelayHashFromEvent(event),
relayHash: across.utils.getRelayHashFromEvent(event),
...this.formatRelayData(event),
finalised: event.blockNumber <= lastFinalisedBlock,
};
Expand Down
7 changes: 7 additions & 0 deletions packages/indexer/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import {
import { SpokePoolIndexerDataHandler } from "./services/SpokePoolIndexerDataHandler";
import { SpokePoolProcessor } from "./services/spokePoolProcessor";
import { BundleRepository } from "./database/BundleRepository";
import { IndexerQueuesService } from "./messaging/service";
import { IntegratorIdWorker } from "./messaging/IntegratorIdWorker";

async function initializeRedis(
config: parseEnv.RedisConfig,
Expand Down Expand Up @@ -87,6 +89,10 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
spokePoolClientFactory,
});

const indexerQueuesService = new IndexerQueuesService(redis);
// Set up message workers
new IntegratorIdWorker(redis, postgres, logger, retryProvidersFactory);

const spokePoolIndexers = spokePoolChainsEnabled.map((chainId) => {
const spokePoolIndexerDataHandler = new SpokePoolIndexerDataHandler(
logger,
Expand All @@ -98,6 +104,7 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
spokePoolClientFactory,
new SpokePoolRepository(postgres, logger),
new SpokePoolProcessor(postgres, logger, chainId),
indexerQueuesService,
);
const spokePoolIndexer = new Indexer(
{
Expand Down
72 changes: 72 additions & 0 deletions packages/indexer/src/messaging/IntegratorIdWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import Redis from "ioredis";
import winston from "winston";
import { Job, Worker } from "bullmq";
import { DataSource, entities } from "@repo/indexer-database";
import { IndexerQueues } from "./service";
import { getIntegratorId } from "../utils";
import { RetryProvidersFactory } from "../web3/RetryProvidersFactory";

export type IntegratorIdMessage = {
relayHash: string;
};

/**
* This worker listens to the `IntegratorId` queue and processes each job by:
* - Retrieving the deposit information from the database based on the provided relay hash.
* - Checking if the deposit record already has an integrator ID.
* - If the integrator ID is not set, the worker fetches it from the transaction data.
* - If found, the integrator ID is saved back into the deposit record in the database.
*/
export class IntegratorIdWorker {
public worker: Worker;
constructor(
private redis: Redis,
private postgres: DataSource,
private logger: winston.Logger,
private providerFactory: RetryProvidersFactory,
) {
this.setWorker();
}

public setWorker() {
this.worker = new Worker(
IndexerQueues.IntegratorId,
async (job: Job<IntegratorIdMessage>) => {
const { relayHash } = job.data;
const repository = this.postgres.getRepository(
entities.V3FundsDeposited,
);
const deposit = await repository.findOne({
where: { relayHash },
});
if (!deposit) {
this.logger.warn({
at: "IntegratorIdWorker",
message: `Skipping deposit with relay hash ${relayHash}. Not found in the database.`,
});
return;
}
if (deposit.integratorId !== null) {
this.logger.info({
at: "IntegratorIdWorker",
message: `Skipping deposit with relay hash ${relayHash}. IntegratorId field already populated.`,
});
return;
}
const provider = this.providerFactory.getProviderForChainId(
deposit.originChainId,
);
const integratorId = await getIntegratorId(
provider,
deposit.quoteTimestamp,
deposit.transactionHash,
);
if (integratorId) {
await repository.update({ relayHash }, { integratorId });
}
return;
},
{ connection: this.redis, concurrency: 10 },
);
}
}
17 changes: 13 additions & 4 deletions packages/indexer/src/messaging/service.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import Redis from "ioredis";
import { Queue, JobsOptions } from "bullmq";
import { Queue, JobsOptions, BulkJobOptions } from "bullmq";

export enum IndexerQueues {}
export enum IndexerQueues {
IntegratorId = "IntegratorId",
}

export class IndexerQueuesService {
private queues = {} as Record<string, Queue>;
private queues = {} as Record<IndexerQueues, Queue>;

constructor(private connection: Redis) {
this.initializeQueues();
Expand All @@ -16,6 +18,10 @@ export class IndexerQueuesService {
(queueName) =>
(this.queues[queueName] = new Queue(queueName, {
connection: this.connection,
defaultJobOptions: {
attempts: Number.MAX_SAFE_INTEGER,
removeOnComplete: true,
},
})),
);
}
Expand All @@ -36,10 +42,13 @@ export class IndexerQueuesService {
queue: IndexerQueues,
jobName: string,
messages: T[],
options: BulkJobOptions = {},
) {
const q = this.queues[queue];
if (q) {
await q.addBulk(messages.map((m) => ({ name: jobName, data: m })));
await q.addBulk(
messages.map((m) => ({ name: jobName, data: m, opts: options })),
);
}
}
}
3 changes: 3 additions & 0 deletions packages/indexer/src/parseEnv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export type Config = {
export type RedisConfig = {
host: string;
port: number;
maxRetriesPerRequest: null;
};
export type ProviderConfig = [providerUrl: string, chainId: number];

Expand All @@ -24,6 +25,8 @@ export function parseRedisConfig(env: Env): RedisConfig {
return {
host: env.REDIS_HOST,
port,
// @dev: this retry config is needed for bullmq workers
maxRetriesPerRequest: null,
};
}

Expand Down
76 changes: 49 additions & 27 deletions packages/indexer/src/services/SpokePoolIndexerDataHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ import {
getDeployedAddress,
getDeployedBlockNumber,
} from "@across-protocol/contracts";
import { entities } from "@repo/indexer-database";

import { BlockRange } from "../data-indexing/model";
import { IndexerDataHandler } from "../data-indexing/service/IndexerDataHandler";

import * as utils from "../utils";
import { providers } from "@across-protocol/sdk";
import { SpokePoolRepository } from "../database/SpokePoolRepository";
import { SpokePoolProcessor } from "./spokePoolProcessor";
import { IndexerQueues, IndexerQueuesService } from "../messaging/service";
import { IntegratorIdMessage } from "../messaging/IntegratorIdWorker";

type FetchEventsResult = {
v3FundsDepositedEvents: (across.interfaces.DepositWithBlock & {
integratorId: string | undefined;
})[];
v3FundsDepositedEvents: utils.V3FundsDepositedWithIntegradorId[];
filledV3RelayEvents: across.interfaces.FillWithBlock[];
requestedV3SlowFillEvents: across.interfaces.SlowFillRequestWithBlock[];
requestedSpeedUpV3Events: {
Expand All @@ -38,12 +38,13 @@ export class SpokePoolIndexerDataHandler implements IndexerDataHandler {
private logger: Logger,
private chainId: number,
private hubPoolChainId: number,
private provider: providers.RetryProvider,
private provider: across.providers.RetryProvider,
private configStoreFactory: utils.ConfigStoreClientFactory,
private hubPoolFactory: utils.HubPoolClientFactory,
private spokePoolFactory: utils.SpokePoolClientFactory,
private spokePoolClientRepository: SpokePoolRepository,
private spokePoolProcessor: SpokePoolProcessor,
private indexerQueuesService: IndexerQueuesService,
) {
this.isInitialized = false;
}
Expand Down Expand Up @@ -94,7 +95,20 @@ export class SpokePoolIndexerDataHandler implements IndexerDataHandler {
blockRange,
identifier: this.getDataIdentifier(),
});

// Fetch integratorId synchronously when there are fewer than 1K deposit events
// For larger sets, use the IntegratorId queue for asynchronous processing
const fetchIntegratorIdSync = events.v3FundsDepositedEvents.length < 1000;
if (fetchIntegratorIdSync) {
this.appendIntegratorIdToDeposits(events.v3FundsDepositedEvents);
}

const storedEvents = await this.storeEvents(events, lastFinalisedBlock);

if (!fetchIntegratorIdSync) {
await this.publishIntegratorIdMessages(storedEvents.deposits);
}

await this.spokePoolProcessor.process(storedEvents);
}

Expand Down Expand Up @@ -122,14 +136,6 @@ export class SpokePoolIndexerDataHandler implements IndexerDataHandler {
fromBlock: blockRange.from,
toBlock: blockRange.to,
});
const v3FundsDepositedWithIntegradorId = await Promise.all(
v3FundsDepositedEvents.map(async (deposit) => {
return {
...deposit,
integratorId: await this.getIntegratorId(deposit),
};
}),
);
const filledV3RelayEvents = spokePoolClient.getFills();
const requestedV3SlowFillEvents =
spokePoolClient.getSlowFillRequestsForOriginChain(this.chainId);
Expand All @@ -140,7 +146,7 @@ export class SpokePoolIndexerDataHandler implements IndexerDataHandler {
const tokensBridgedEvents = spokePoolClient.getTokensBridged();

return {
v3FundsDepositedEvents: v3FundsDepositedWithIntegradorId,
v3FundsDepositedEvents,
filledV3RelayEvents,
requestedV3SlowFillEvents,
requestedSpeedUpV3Events,
Expand Down Expand Up @@ -217,18 +223,34 @@ export class SpokePoolIndexerDataHandler implements IndexerDataHandler {
);
}

private async getIntegratorId(deposit: across.interfaces.DepositWithBlock) {
const INTEGRATOR_DELIMITER = "1dc0de";
const INTEGRATOR_ID_LENGTH = 4; // Integrator ids are 4 characters long
let integratorId = undefined;
const txn = await this.provider.getTransaction(deposit.transactionHash);
const txnData = txn.data;
if (txnData.includes(INTEGRATOR_DELIMITER)) {
integratorId = txnData
.split(INTEGRATOR_DELIMITER)
.pop()
?.substring(0, INTEGRATOR_ID_LENGTH);
}
return integratorId;
private async appendIntegratorIdToDeposits(
deposits: utils.V3FundsDepositedWithIntegradorId[],
) {
await across.utils.forEachAsync(
deposits,
async (deposit, index, deposits) => {
const integratorId = await utils.getIntegratorId(
this.provider,
new Date(deposit.quoteTimestamp * 1000),
deposit.transactionHash,
);
deposits[index] = { ...deposit, integratorId };
},
);
}

private async publishIntegratorIdMessages(
deposits: entities.V3FundsDeposited[],
) {
const messages: IntegratorIdMessage[] = deposits.map((deposit) => {
return {
relayHash: deposit.relayHash,
};
});
await this.indexerQueuesService.publishMessagesBulk(
IndexerQueues.IntegratorId,
IndexerQueues.IntegratorId, // Use queue name as job name
messages,
);
}
}
1 change: 1 addition & 0 deletions packages/indexer/src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from "./contractUtils";
export * from "./contractFactoryUtils";
export * from "./bundleBuilderUtils";
export * from "./spokePoolUtils";
39 changes: 39 additions & 0 deletions packages/indexer/src/utils/spokePoolUtils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { interfaces, providers } from "@across-protocol/sdk";

export type V3FundsDepositedWithIntegradorId = interfaces.DepositWithBlock & {
integratorId?: string | undefined;
};

/**
* Retrieves the 4-character integrator ID from the transaction data
* associated with the provided transaction hash, if present.
* The integrator ID is expected to be found after the delimiter "1dc0de" in the transaction data.
* @async
* @param provider The provider to fetch transaction details from.
* @param depositDate
* @param txHash The transaction hash to retrieve the input data of.
* @returns The 4-character integrator ID if found, otherwise undefined.
*/
export async function getIntegratorId(
provider: providers.RetryProvider,
depositDate: Date,
txHash: string,
) {
// If deposit was made before integratorId implementation, skip request
const INTEGRATOR_ID_IMPLEMENTATION_DATE = new Date(1718274000 * 1000);
if (depositDate < INTEGRATOR_ID_IMPLEMENTATION_DATE) {
return;
}
const INTEGRATOR_DELIMITER = "1dc0de";
const INTEGRATOR_ID_LENGTH = 4; // Integrator ids are 4 characters long
let integratorId = undefined;
const txn = await provider.getTransaction(txHash);
const txnData = txn.data;
if (txnData.includes(INTEGRATOR_DELIMITER)) {
integratorId = txnData
.split(INTEGRATOR_DELIMITER)
.pop()
?.substring(0, INTEGRATOR_ID_LENGTH);
}
return integratorId;
}

0 comments on commit 323be1c

Please sign in to comment.