Skip to content

Commit

Permalink
Update sync status during historical processing in job-runner
Browse files Browse the repository at this point in the history
  • Loading branch information
nikugogoi committed Nov 7, 2023
1 parent 8f9fb0d commit 7008eea
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 26 deletions.
12 changes: 0 additions & 12 deletions packages/util/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import assert from 'assert';
import debug from 'debug';
import { PubSub } from 'graphql-subscriptions';
import PgBoss from 'pg-boss';
import { constants } from 'ethers';

import { JobQueue } from './job-queue';
import { BlockProgressInterface, EventInterface, IndexerInterface, EthClient, EventsJobData, EventsQueueJobKind } from './types';
Expand Down Expand Up @@ -228,17 +227,6 @@ export class EventWatcher {
// endBlock exists if isComplete is true
assert(batchEndBlockNumber);

const [block] = await this._indexer.getBlocks({ blockNumber: batchEndBlockNumber });
const batchEndBlockHash = block ? block.blockHash : constants.AddressZero;

// Update sync status chain head and canonical block to end block of historical processing
const [syncStatus] = await Promise.all([
this._indexer.updateSyncStatusCanonicalBlock(batchEndBlockHash, batchEndBlockNumber, true),
this._indexer.updateSyncStatusIndexedBlock(batchEndBlockHash, batchEndBlockNumber, true),
this._indexer.updateSyncStatusChainHead(batchEndBlockHash, batchEndBlockNumber, true)
]);
log(`Sync status canonical block updated to ${syncStatus.latestCanonicalBlockNumber}`);

const nextBatchStartBlockNumber = batchEndBlockNumber + 1;
log(`Historical block processing completed for block range: ${blockNumber} to ${batchEndBlockNumber}`);

Expand Down
54 changes: 40 additions & 14 deletions packages/util/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import assert from 'assert';
import debug from 'debug';
import { ethers } from 'ethers';
import { constants, ethers } from 'ethers';
import { DeepPartial, In } from 'typeorm';
import PgBoss from 'pg-boss';

Expand Down Expand Up @@ -191,20 +191,46 @@ export class JobRunner {
endBlock
);

// Push event processing job for each block
const pushJobForBlockPromises = blocks.map(async block => {
const eventsProcessingJob: EventsJobData = {
kind: EventsQueueJobKind.EVENTS,
blockHash: block.blockHash,
isRetryAttempt: false,
// Avoid publishing GQL subscription event in historical processing
// Publishing when realtime processing is listening to events will cause problems
publish: false
};
this.jobQueue.pushJob(QUEUE_EVENT_PROCESSING, eventsProcessingJob);
});
let batchEndBlockHash = constants.AddressZero;
const blocksLength = blocks.length;

if (blocksLength) {
// Push event processing job for each block
const pushJobForBlockPromises = blocks.map(async block => {
const eventsProcessingJob: EventsJobData = {
kind: EventsQueueJobKind.EVENTS,
blockHash: block.blockHash,
isRetryAttempt: false,
// Avoid publishing GQL subscription event in historical processing
// Publishing when realtime processing is listening to events will cause problems
publish: false
};
this.jobQueue.pushJob(QUEUE_EVENT_PROCESSING, eventsProcessingJob);
});

if (blocks[blocksLength - 1].blockNumber === endBlock) {
// If in blocks returned end block is same as the batch end block, set batchEndBlockHash
batchEndBlockHash = blocks[blocksLength - 1].blockHash;
} else {
// Else fetch block hash from upstream for batch end block
const [block] = await this._indexer.getBlocks({ blockNumber: endBlock });

if (block) {
batchEndBlockHash = block.blockHash;
}
}

await Promise.all(pushJobForBlockPromises);
}

// Update sync status canonical, indexed and chain head block to end block
await Promise.all([
this._indexer.updateSyncStatusCanonicalBlock(batchEndBlockHash, endBlock, true),
this._indexer.updateSyncStatusIndexedBlock(batchEndBlockHash, endBlock, true),
this._indexer.updateSyncStatusChainHead(batchEndBlockHash, endBlock, true)
]);
log(`Sync status canonical, indexed and chain head block updated to ${endBlock}`);

await Promise.all(pushJobForBlockPromises);
this._historicalProcessingCompletedUpto = endBlock;

await this.jobQueue.markComplete(
Expand Down

0 comments on commit 7008eea

Please sign in to comment.