diff --git a/packages/indexer/src/database/HubPoolRepository.ts b/packages/indexer/src/database/HubPoolRepository.ts index 909d397..48e8630 100644 --- a/packages/indexer/src/database/HubPoolRepository.ts +++ b/packages/indexer/src/database/HubPoolRepository.ts @@ -3,6 +3,8 @@ import * as across from "@across-protocol/sdk"; import { DataSource, entities, utils } from "@repo/indexer-database"; export class HubPoolRepository extends utils.BaseRepository { + private chunkSize = 1000; + constructor(postgres: DataSource, logger: winston.Logger) { super(postgres, logger, true); } @@ -23,12 +25,18 @@ export class HubPoolRepository extends utils.BaseRepository { finalised: event.blockNumber <= lastFinalisedBlock, }; }); - await this.postgres - .createQueryBuilder(entities.ProposedRootBundle, "b") - .insert() - .values(formattedEvents) - .orUpdate(["finalised"], ["transactionHash"]) - .execute(); + + const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); + await Promise.all( + chunkedEvents.map((eventsChunk) => + this.postgres + .createQueryBuilder(entities.ProposedRootBundle, "b") + .insert() + .values(eventsChunk) + .orUpdate(["finalised"], ["transactionHash"]) + .execute(), + ), + ); } public async formatAndSaveRootBundleDisputedEvents( @@ -42,12 +50,17 @@ export class HubPoolRepository extends utils.BaseRepository { finalised: event.blockNumber <= lastFinalisedBlock, }; }); - await this.postgres - .createQueryBuilder(entities.RootBundleDisputed, "b") - .insert() - .values(formattedEvents) - .orUpdate(["finalised"], ["transactionHash"]) - .execute(); + const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); + await Promise.all( + chunkedEvents.map((eventsChunk) => + this.postgres + .createQueryBuilder(entities.RootBundleDisputed, "b") + .insert() + .values(eventsChunk) + .orUpdate(["finalised"], ["transactionHash"]) + .execute(), + ), + ); } public async formatAndSaveRootBundleCanceledEvents( @@ -62,12 +75,18 @@ export class HubPoolRepository extends utils.BaseRepository { finalised: event.blockNumber <= lastFinalisedBlock, }; }); - await this.postgres - .createQueryBuilder(entities.RootBundleCanceled, "b") - .insert() - .values(formattedEvents) - .orUpdate(["finalised"], ["transactionHash"]) - .execute(); + + const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); + await Promise.all( + chunkedEvents.map((eventsChunk) => + this.postgres + .createQueryBuilder(entities.RootBundleCanceled, "b") + .insert() + .values(eventsChunk) + .orUpdate(["finalised"], ["transactionHash"]) + .execute(), + ), + ); } public async formatAndSaveRootBundleExecutedEvents( @@ -85,14 +104,13 @@ export class HubPoolRepository extends utils.BaseRepository { finalised: event.blockNumber <= lastFinalisedBlock, }; }); - // Split the events into chunks of 1000 to avoid exceeding the max query length - const chunks = across.utils.chunk(formattedEvents, 1000); + const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); await Promise.all( - chunks.map((chunk) => + chunkedEvents.map((eventsChunk) => this.postgres .createQueryBuilder(entities.RootBundleExecuted, "b") .insert() - .values(chunk) + .values(eventsChunk) .orUpdate( ["finalised"], ["chainId", "leafId", "groupIndex", "transactionHash"], @@ -117,15 +135,21 @@ export class HubPoolRepository extends utils.BaseRepository { finalised: event.blockNumber <= lastFinalisedBlock, }; }); - await this.postgres - .createQueryBuilder(entities.SetPoolRebalanceRoute, "b") - .insert() - .values(formattedEvents) - .orUpdate( - ["finalised"], - ["transactionHash", "transactionIndex", "logIndex"], - ) - .execute(); + + const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); + await Promise.all( + chunkedEvents.map((eventsChunk) => + this.postgres + .createQueryBuilder(entities.SetPoolRebalanceRoute, "b") + .insert() + .values(eventsChunk) + .orUpdate( + ["finalised"], + ["transactionHash", "transactionIndex", "logIndex"], + ) + .execute(), + ), + ); } /**