Skip to content

Commit

Permalink
feat: migrate HubPoolIndexer to new Indexer (#52)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexandru Matei <alexandrumatei3693@gmail.com>
  • Loading branch information
amateima and alexandrumatei36 authored Oct 3, 2024
1 parent b50f8ac commit c9e7da4
Show file tree
Hide file tree
Showing 15 changed files with 364 additions and 260 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
Dockerfile
docker-compose.yml
out
volumes
**/dist
**/node_modules
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ export class ProposedRootBundle {
@Column()
blockNumber: number;

@Column()
finalised: boolean;

@CreateDateColumn()
createdAt: Date;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ export class RootBundleCanceled {
@Column()
blockNumber: number;

@Column()
finalised: boolean;

@CreateDateColumn()
createdAt: Date;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ export class RootBundleDisputed {
@Column()
blockNumber: number;

@Column()
finalised: boolean;

@CreateDateColumn()
createdAt: Date;
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ export class RootBundleExecuted {
@Column()
blockNumber: number;

@Column()
finalised: boolean;

@CreateDateColumn()
createdAt: Date;
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
} from "typeorm";

@Entity({ schema: "evm" })
@Unique("UK_setPoolRebalanceRoute_transactionHash_transactionIndex_logIndex", [
@Unique("UK_spr_transactionHash_transactionIndex_logIndex", [
"transactionHash",
"transactionIndex",
"logIndex",
Expand Down Expand Up @@ -37,6 +37,9 @@ export class SetPoolRebalanceRoute {
@Column({ nullable: false })
logIndex: number;

@Column()
finalised: boolean;

@CreateDateColumn()
createdAt: Date;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { MigrationInterface, QueryRunner } from "typeorm";

export class HubPoolFinalised1727686818331 implements MigrationInterface {
name = "HubPoolFinalised1727686818331";

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE "evm"."set_pool_rebalance_route" DROP CONSTRAINT "UK_setPoolRebalanceRoute_transactionHash_transactionIndex_logIn"`,
);
await queryRunner.query(
`ALTER TABLE "evm"."set_pool_rebalance_route" ADD CONSTRAINT "UK_spr_transactionHash_transactionIndex_logIndex" UNIQUE ("transactionHash", "transactionIndex", "logIndex")`,
);
await queryRunner.query(
`ALTER TABLE "evm"."proposed_root_bundle" ADD "finalised" boolean NOT NULL`,
);
await queryRunner.query(
`ALTER TABLE "evm"."root_bundle_disputed" ADD "finalised" boolean NOT NULL`,
);
await queryRunner.query(
`ALTER TABLE "evm"."root_bundle_executed" ADD "finalised" boolean NOT NULL`,
);
await queryRunner.query(
`ALTER TABLE "evm"."root_bundle_canceled" ADD "finalised" boolean NOT NULL`,
);
await queryRunner.query(
`ALTER TABLE "evm"."set_pool_rebalance_route" ADD "finalised" boolean NOT NULL`,
);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE "evm"."set_pool_rebalance_route" DROP CONSTRAINT "UK_spr_transactionHash_transactionIndex_logIndex"`,
);
await queryRunner.query(
`ALTER TABLE "evm"."set_pool_rebalance_route" ADD CONSTRAINT "UK_setPoolRebalanceRoute_transactionHash_transactionIndex_logIn" UNIQUE ("transactionHash", "transactionIndex", "logIndex")`,
);
await queryRunner.query(
`ALTER TABLE "evm"."set_pool_rebalance_route" DROP COLUMN "finalised"`,
);
await queryRunner.query(
`ALTER TABLE "evm"."root_bundle_executed" DROP COLUMN "finalised"`,
);
await queryRunner.query(
`ALTER TABLE "evm"."root_bundle_disputed" DROP COLUMN "finalised"`,
);
await queryRunner.query(
`ALTER TABLE "evm"."proposed_root_bundle" DROP COLUMN "finalised"`,
);
await queryRunner.query(
`ALTER TABLE "evm"."root_bundle_canceled" DROP COLUMN "finalised"`,
);
}
}
74 changes: 54 additions & 20 deletions packages/indexer/src/database/HubPoolRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,13 @@ import * as across from "@across-protocol/sdk";
import { DataSource, entities, utils } from "@repo/indexer-database";

export class HubPoolRepository extends utils.BaseRepository {
constructor(
postgres: DataSource,
logger: winston.Logger,
throwError: boolean,
) {
super(postgres, logger, throwError);
constructor(postgres: DataSource, logger: winston.Logger) {
super(postgres, logger, true);
}

public async formatAndSaveProposedRootBundleEvents(
proposedRootBundleEvents: across.interfaces.ProposedRootBundle[],
throwError?: boolean,
lastFinalisedBlock: number,
) {
const formattedEvents = proposedRootBundleEvents.map((event) => {
return {
Expand All @@ -24,41 +20,59 @@ export class HubPoolRepository extends utils.BaseRepository {
bundleEvaluationBlockNumbers: event.bundleEvaluationBlockNumbers.map(
(blockNumber) => parseInt(blockNumber.toString()),
),
finalised: event.blockNumber <= lastFinalisedBlock,
};
});
await this.insert(entities.ProposedRootBundle, formattedEvents, throwError);
await this.postgres
.createQueryBuilder(entities.ProposedRootBundle, "b")
.insert()
.values(formattedEvents)
.orUpdate(["finalised"], ["transactionHash"])
.execute();
}

public async formatAndSaveRootBundleDisputedEvents(
rootBundleDisputedEvents: across.interfaces.DisputedRootBundle[],
throwError?: boolean,
lastFinalisedBlock: number,
) {
const formattedEvents = rootBundleDisputedEvents.map((event) => {
return {
...event,
requestTime: new Date(event.requestTime * 1000),
finalised: event.blockNumber <= lastFinalisedBlock,
};
});
await this.insert(entities.RootBundleDisputed, formattedEvents, throwError);
await this.postgres
.createQueryBuilder(entities.RootBundleDisputed, "b")
.insert()
.values(formattedEvents)
.orUpdate(["finalised"], ["transactionHash"])
.execute();
}

public async formatAndSaveRootBundleCanceledEvents(
rootBundleCanceledEvents: across.interfaces.CancelledRootBundle[],
throwError?: boolean,
lastFinalisedBlock: number,
) {
const formattedEvents = rootBundleCanceledEvents.map((event) => {
return {
...event,
caller: event.disputer,
requestTime: new Date(event.requestTime * 1000),
finalised: event.blockNumber <= lastFinalisedBlock,
};
});
await this.insert(entities.RootBundleCanceled, formattedEvents, throwError);
await this.postgres
.createQueryBuilder(entities.RootBundleCanceled, "b")
.insert()
.values(formattedEvents)
.orUpdate(["finalised"], ["transactionHash"])
.execute();
}

public async formatAndSaveRootBundleExecutedEvents(
rootBundleExecutedEvents: across.interfaces.ExecutedRootBundle[],
throwError?: boolean,
lastFinalisedBlock: number,
) {
const formattedEvents = rootBundleExecutedEvents.map((event) => {
return {
Expand All @@ -68,30 +82,50 @@ export class HubPoolRepository extends utils.BaseRepository {
runningBalances: event.runningBalances.map((balance) =>
balance.toString(),
),
finalised: event.blockNumber <= lastFinalisedBlock,
};
});
await this.insert(entities.RootBundleExecuted, formattedEvents, throwError);
// Split the events into chunks of 1000 to avoid exceeding the max query length
const chunks = across.utils.chunk(formattedEvents, 1000);
await Promise.all(
chunks.map((chunk) =>
this.postgres
.createQueryBuilder(entities.RootBundleExecuted, "b")
.insert()
.values(chunk)
.orUpdate(
["finalised"],
["chainId", "leafId", "groupIndex", "transactionHash"],
)
.execute(),
),
);
}

public async formatAndSaveSetPoolRebalanceRouteEvents(
setPoolRebalanceRouteEvents: (across.interfaces.DestinationTokenWithBlock & {
l2ChainId: number;
})[],
throwError?: boolean,
lastFinalisedBlock: number,
) {
const formattedEvents = setPoolRebalanceRouteEvents.map((event) => {
return {
...event,
destinationChainId: event.l2ChainId,
destinationToken: event.l2Token,
l1Token: event.l1Token,
finalised: event.blockNumber <= lastFinalisedBlock,
};
});
await this.insert(
entities.SetPoolRebalanceRoute,
formattedEvents,
throwError,
);
await this.postgres
.createQueryBuilder(entities.SetPoolRebalanceRoute, "b")
.insert()
.values(formattedEvents)
.orUpdate(
["finalised"],
["transactionHash", "transactionIndex", "logIndex"],
)
.execute();
}

/**
Expand Down
51 changes: 41 additions & 10 deletions packages/indexer/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,22 @@ import * as services from "./services";
import winston from "winston";
import Redis from "ioredis";
import * as across from "@across-protocol/sdk";
import * as acrossConstants from "@across-protocol/constants";
import { providers } from "ethers";

import { connectToDatabase } from "./database/database.provider";
import * as parseEnv from "./parseEnv";
import { RetryProvidersFactory } from "./web3/RetryProvidersFactory";
import { RedisCache } from "./redis/redisCache";
import { DatabaseConfig } from "@repo/indexer-database";
import { HubPoolIndexerDataHandler } from "./services/HubPoolIndexerDataHandler";
import * as utils from "./utils";
import {
getFinalisedBlockBufferDistance,
getLoopWaitTimeSeconds,
Indexer,
} from "./data-indexing/service";
import { HubPoolRepository } from "./database/HubPoolRepository";

async function initializeRedis(
config: parseEnv.RedisConfig,
Expand All @@ -29,22 +41,17 @@ async function initializeRedis(
}

export async function Main(config: parseEnv.Config, logger: winston.Logger) {
const { redisConfig, postgresConfig, hubConfig, spokeConfigs } = config;
const { redisConfig, postgresConfig, spokeConfigs } = config;
const redis = await initializeRedis(redisConfig, logger);
const redisCache = new RedisCache(redis);
const retryProvidersFactory = new RetryProvidersFactory(redisCache, logger);
retryProvidersFactory.initializeProviders();
const postgres = await connectToDatabase(postgresConfig, logger);
const bundleProcessor = new services.bundles.Processor({
logger,
redis,
postgres,
});
const hubPoolIndexer = new services.hubPoolIndexer.Indexer({
logger,
redis,
postgres,
...hubConfig,
});
const spokePoolIndexers = spokeConfigs.map((spokeConfig) => {
return new services.spokePoolIndexer.Indexer({
logger,
Expand All @@ -54,14 +61,40 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
});
});

const hubPoolIndexerDataHandler = new HubPoolIndexerDataHandler(
logger,
acrossConstants.CHAIN_IDs.MAINNET,
retryProvidersFactory.getProviderForChainId(
acrossConstants.CHAIN_IDs.MAINNET,
),
new HubPoolRepository(postgres, logger),
);
const hubPoolIndexer = new Indexer(
{
loopWaitTimeSeconds: getLoopWaitTimeSeconds(
acrossConstants.CHAIN_IDs.MAINNET,
),
finalisedBlockBufferDistance: getFinalisedBlockBufferDistance(
acrossConstants.CHAIN_IDs.MAINNET,
),
},
hubPoolIndexerDataHandler,
retryProvidersFactory.getProviderForChainId(
acrossConstants.CHAIN_IDs.MAINNET,
),
new RedisCache(redis),
logger,
);
await hubPoolIndexer.start();

let exitRequested = false;
process.on("SIGINT", () => {
if (!exitRequested) {
logger.info(
"\nWait for shutdown, or press Ctrl+C again to forcefully exit.",
);
spokePoolIndexers.map((s) => s.stop());
hubPoolIndexer.stop();
hubPoolIndexer.stopGracefully();
} else {
logger.info("\nForcing exit...");
redis?.quit();
Expand All @@ -79,7 +112,6 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
const [bundleResults, hubPoolResult, ...spokeResults] =
await Promise.allSettled([
bundleProcessor.start(10),
hubPoolIndexer.start(10),
...spokePoolIndexers.map((s) => s.start(10)),
]);

Expand All @@ -91,7 +123,6 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
(r) => r.status === "fulfilled",
),
bundleProcessorRunSuccess: bundleResults.status === "fulfilled",
hubPoolRunSuccess: hubPoolResult.status === "fulfilled",
},
});

Expand Down
5 changes: 0 additions & 5 deletions packages/indexer/src/parseEnv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ export type Config = {
services.spokePoolIndexer.Config,
"logger" | "redis" | "postgres"
>[];
hubConfig: Omit<
services.hubPoolIndexer.Config,
"logger" | "redis" | "postgres"
>;
};
export type RedisConfig = {
host: string;
Expand Down Expand Up @@ -236,7 +232,6 @@ export function envToConfig(env: Env): Config {
return {
redisConfig,
postgresConfig,
hubConfig,
spokeConfigs,
};
}
Loading

0 comments on commit c9e7da4

Please sign in to comment.