Skip to content

Commit

Permalink
Fix switch from historical to realtime processing when template creat…
Browse files Browse the repository at this point in the history
…e block is near head
  • Loading branch information
nikugogoi committed Nov 10, 2023
1 parent 2ac30eb commit ee54ce7
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 14 deletions.
29 changes: 15 additions & 14 deletions packages/util/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ export class EventWatcher {
this._indexer.getSyncStatus()
]);

// Wait for events job queue to be empty before starting historical or realtime processing
await this._jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING);
const historicalProcessingQueueSize = await this._jobQueue.getQueueSize(QUEUE_HISTORICAL_PROCESSING, 'completed');

// Stop if there are active or pending historical processing jobs
// Might be created on encountering template create in events processing
if (historicalProcessingQueueSize > 0) {
return;
}

const latestCanonicalBlockNumber = latestBlock.number - MAX_REORG_DEPTH;
let startBlockNumber = latestBlock.number;

Expand All @@ -116,9 +126,6 @@ export class EventWatcher {
}

async startHistoricalBlockProcessing (startBlockNumber: number, endBlockNumber: number): Promise<void> {
// Wait for events job queue to be empty so that historical processing does not move far ahead
await this._jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING);

this._historicalProcessingEndBlockNumber = endBlockNumber;
log(`Starting historical block processing in batches from ${startBlockNumber} up to block ${this._historicalProcessingEndBlockNumber}`);

Expand All @@ -133,14 +140,14 @@ export class EventWatcher {
}

async startRealtimeBlockProcessing (startBlockNumber: number): Promise<void> {
log(`Starting realtime block processing from block ${startBlockNumber}`);
await processBlockByNumber(this._jobQueue, startBlockNumber);

// Check if realtime processing already started and avoid resubscribing to block progress event
if (this._realtimeProcessingStarted) {
return;
}

log(`Starting realtime block processing from block ${startBlockNumber}`);
await processBlockByNumber(this._jobQueue, startBlockNumber);

this._realtimeProcessingStarted = true;

// Creating an AsyncIterable from AsyncIterator to iterate over the values.
Expand Down Expand Up @@ -223,14 +230,8 @@ export class EventWatcher {

// Check if historical processing end block is reached
if (nextBatchStartBlockNumber > this._historicalProcessingEndBlockNumber) {
const historicalProcessingQueueSize = await this._jobQueue.getQueueSize(QUEUE_HISTORICAL_PROCESSING, 'completed');

// Check that there are no active or pending historical processing jobs
// Might be created on encountering template create in events processing
if (historicalProcessingQueueSize === 0) {
// Start next batch of historical processing or realtime processing
this.startBlockProcessing();
}
// Start next batch of historical processing or realtime processing
this.startBlockProcessing();

return;
}
Expand Down
2 changes: 2 additions & 0 deletions packages/util/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ export class JobRunner {
const { data: { blockNumber: startBlock, processingEndBlockNumber } } = job;

if (this._historicalProcessingCompletedUpto) {
// Check if historical processing start is for a previous block which happens incase of template create
if (startBlock < this._historicalProcessingCompletedUpto) {
// Delete any pending historical processing jobs
await this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING);

// Wait for events queue to be empty
Expand Down

0 comments on commit ee54ce7

Please sign in to comment.