diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index 2fb03c7ee..e41acd30f 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -6,7 +6,6 @@ import assert from 'assert'; import debug from 'debug'; import { PubSub } from 'graphql-subscriptions'; import PgBoss from 'pg-boss'; -import { constants } from 'ethers'; import { JobQueue } from './job-queue'; import { BlockProgressInterface, EventInterface, IndexerInterface, EthClient, EventsJobData, EventsQueueJobKind } from './types'; @@ -228,17 +227,6 @@ export class EventWatcher { // endBlock exists if isComplete is true assert(batchEndBlockNumber); - const [block] = await this._indexer.getBlocks({ blockNumber: batchEndBlockNumber }); - const batchEndBlockHash = block ? block.blockHash : constants.AddressZero; - - // Update sync status chain head and canonical block to end block of historical processing - const [syncStatus] = await Promise.all([ - this._indexer.updateSyncStatusCanonicalBlock(batchEndBlockHash, batchEndBlockNumber, true), - this._indexer.updateSyncStatusIndexedBlock(batchEndBlockHash, batchEndBlockNumber, true), - this._indexer.updateSyncStatusChainHead(batchEndBlockHash, batchEndBlockNumber, true) - ]); - log(`Sync status canonical block updated to ${syncStatus.latestCanonicalBlockNumber}`); - const nextBatchStartBlockNumber = batchEndBlockNumber + 1; log(`Historical block processing completed for block range: ${blockNumber} to ${batchEndBlockNumber}`); diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index c7bac2cce..7894b37ed 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -4,7 +4,7 @@ import assert from 'assert'; import debug from 'debug'; -import { ethers } from 'ethers'; +import { constants, ethers } from 'ethers'; import { DeepPartial, In } from 'typeorm'; import PgBoss from 'pg-boss'; @@ -191,20 +191,46 @@ export class JobRunner { endBlock ); - // Push event processing job for each block - const pushJobForBlockPromises = blocks.map(async block => { - const eventsProcessingJob: EventsJobData = { - kind: EventsQueueJobKind.EVENTS, - blockHash: block.blockHash, - isRetryAttempt: false, - // Avoid publishing GQL subscription event in historical processing - // Publishing when realtime processing is listening to events will cause problems - publish: false - }; - this.jobQueue.pushJob(QUEUE_EVENT_PROCESSING, eventsProcessingJob); - }); + let batchEndBlockHash = constants.AddressZero; + const blocksLength = blocks.length; + + if (blocksLength) { + // Push event processing job for each block + const pushJobForBlockPromises = blocks.map(async block => { + const eventsProcessingJob: EventsJobData = { + kind: EventsQueueJobKind.EVENTS, + blockHash: block.blockHash, + isRetryAttempt: false, + // Avoid publishing GQL subscription event in historical processing + // Publishing when realtime processing is listening to events will cause problems + publish: false + }; + this.jobQueue.pushJob(QUEUE_EVENT_PROCESSING, eventsProcessingJob); + }); + + if (blocks[blocksLength - 1].blockNumber === endBlock) { + // If in blocks returned end block is same as the batch end block, set batchEndBlockHash + batchEndBlockHash = blocks[blocksLength - 1].blockHash; + } else { + // Else fetch block hash from upstream for batch end block + const [block] = await this._indexer.getBlocks({ blockNumber: endBlock }); + + if (block) { + batchEndBlockHash = block.blockHash; + } + } + + await Promise.all(pushJobForBlockPromises); + } + + // Update sync status canonical, indexed and chain head block to end block + await Promise.all([ + this._indexer.updateSyncStatusCanonicalBlock(batchEndBlockHash, endBlock, true), + this._indexer.updateSyncStatusIndexedBlock(batchEndBlockHash, endBlock, true), + this._indexer.updateSyncStatusChainHead(batchEndBlockHash, endBlock, true) + ]); + log(`Sync status canonical, indexed and chain head block updated to ${endBlock}`); - await Promise.all(pushJobForBlockPromises); this._historicalProcessingCompletedUpto = endBlock; await this.jobQueue.markComplete(