Skip to content

Commit

Permalink
Add new job queue for historical blocks processing (#442)
Browse files Browse the repository at this point in the history
* Add TODOs for historical blocks processing

* Add new job for historical blocks processing

* Handle historical job completion

* Fetch latest block in chain and start historical block processing

* Fix starting realtime block processing from latest canonical block

* Refactor historical block processing method and add logs

* Add dummy indexer methods in graph-node to pass test

* Changes in codegen for historical processing in generated watcher
  • Loading branch information
nikugogoi authored Nov 1, 2023
1 parent 15ee523 commit 6c17662
Show file tree
Hide file tree
Showing 14 changed files with 307 additions and 52 deletions.
12 changes: 12 additions & 0 deletions packages/codegen/src/templates/database-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,12 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.updateSyncStatusChainHead(repo, blockHash, blockNumber, force);
}

async forceUpdateSyncStatus (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
const repo = queryRunner.manager.getRepository(SyncStatus);

return this._baseDatabase.forceUpdateSyncStatus(repo, blockHash, blockNumber);
}

async getSyncStatus (queryRunner: QueryRunner): Promise<SyncStatus | undefined> {
const repo = queryRunner.manager.getRepository(SyncStatus);

Expand All @@ -271,6 +277,12 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.getBlocksAtHeight(repo, height, isPruned);
}

async getLatestProcessedBlockProgress (isPruned: boolean): Promise<BlockProgress | undefined> {
const repo = this._conn.getRepository(BlockProgress);

return this._baseDatabase.getLatestProcessedBlockProgress(repo, isPruned);
}

async markBlocksAsPruned (queryRunner: QueryRunner, blocks: BlockProgress[]): Promise<void> {
const repo = queryRunner.manager.getRepository(BlockProgress);

Expand Down
10 changes: 9 additions & 1 deletion packages/codegen/src/templates/indexer-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ export class Indexer implements IndexerInterface {
if (!this._serverConfig.enableState) {
return;
}

const dbTx = await this._db.createTransactionRunner();
let res;

Expand Down Expand Up @@ -637,6 +637,10 @@ export class Indexer implements IndexerInterface {
return syncStatus;
}

async forceUpdateSyncStatus (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._baseIndexer.forceUpdateSyncStatus(blockHash, blockNumber);
}

async getEvent (id: string): Promise<Event | undefined> {
return this._baseIndexer.getEvent(id);
}
Expand All @@ -653,6 +657,10 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
}

async getLatestProcessedBlockProgress (isPruned: boolean): Promise<BlockProgress | undefined> {
return this._db.getLatestProcessedBlockProgress(isPruned);
}

async fetchEventsAndSaveBlocks (blocks: DeepPartial<BlockProgress>[]): Promise<{ blockProgress: BlockProgress, events: DeepPartial<Event>[] }[]> {
return this._baseIndexer.fetchEventsAndSaveBlocks(blocks, this._eventSignaturesMap, this.parseEventNameAndArgs.bind(this));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export const main = async (): Promise<any> => {

await jobRunnerCmd.exec(async (jobRunner: JobRunner): Promise<void> => {
await jobRunner.subscribeBlockProcessingQueue();
await jobRunner.subscribeHistoricalProcessingQueue();
await jobRunner.subscribeEventProcessingQueue();
await jobRunner.subscribeBlockCheckpointQueue();
await jobRunner.subscribeHooksQueue();
Expand Down
13 changes: 13 additions & 0 deletions packages/graph-node/test/utils/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ export class Indexer implements IndexerInterface {
return [];
}

async getLatestProcessedBlockProgress (isPruned: boolean): Promise<BlockProgressInterface | undefined> {
assert(isPruned);

return undefined;
}

async getBlockEvents (blockHash: string): Promise<Array<EventInterface>> {
assert(blockHash);

Expand Down Expand Up @@ -150,6 +156,13 @@ export class Indexer implements IndexerInterface {
return {} as SyncStatusInterface;
}

async forceUpdateSyncStatus (blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
assert(blockNumber);
assert(blockHash);

return {} as SyncStatusInterface;
}

async markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise<void> {
assert(blocks);

Expand Down
9 changes: 6 additions & 3 deletions packages/util/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ export const fetchBlocksAtHeight = async (
cid,
blockHash,
parentHash,
blockTimestamp: timestamp
blockTimestamp: Number(timestamp)
});
}

Expand Down Expand Up @@ -182,6 +182,9 @@ export const _fetchBatchBlocks = async (
while (true) {
console.time('time:common#fetchBatchBlocks-getBlocks');

// TODO: Fetch logs by filter before fetching blocks
// TODO: Fetch only blocks needed for returned logs
// TODO: Save blocks and logs to DB
const blockPromises = blockNumbers.map(async blockNumber => indexer.getBlocks({ blockNumber }));
const settledResults = await Promise.allSettled(blockPromises);

Expand Down Expand Up @@ -238,7 +241,7 @@ export const _fetchBatchBlocks = async (
}

blocks.forEach(block => {
block.blockTimestamp = block.timestamp;
block.blockTimestamp = Number(block.timestamp);
block.blockNumber = Number(block.blockNumber);
});

Expand All @@ -265,7 +268,7 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block

if (indexer.processBlockAfterEvents) {
if (!dbBlock.isComplete) {
await indexer.processBlockAfterEvents(block.blockHash, block.blockNumber);
await indexer.processBlockAfterEvents(dbBlock.blockHash, dbBlock.blockNumber);
}
}

Expand Down
1 change: 1 addition & 0 deletions packages/util/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export const MAX_REORG_DEPTH = 16;
export const DIFF_MERGE_BATCH_SIZE = 10000;

export const QUEUE_BLOCK_PROCESSING = 'block-processing';
export const QUEUE_HISTORICAL_PROCESSING = 'historical-processing';
export const QUEUE_EVENT_PROCESSING = 'event-processing';
export const QUEUE_CHAIN_PRUNING = 'chain-pruning';
export const QUEUE_BLOCK_CHECKPOINT = 'block-checkpoint';
Expand Down
28 changes: 28 additions & 0 deletions packages/util/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,26 @@ export class Database {
return await repo.save(entity);
}

async forceUpdateSyncStatus (repo: Repository<SyncStatusInterface>, blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
let entity = await repo.findOne();

if (!entity) {
entity = repo.create({
initialIndexedBlockHash: blockHash,
initialIndexedBlockNumber: blockNumber
});
}

entity.chainHeadBlockHash = blockHash;
entity.chainHeadBlockNumber = blockNumber;
entity.latestCanonicalBlockHash = blockHash;
entity.latestCanonicalBlockNumber = blockNumber;
entity.latestIndexedBlockHash = blockHash;
entity.latestIndexedBlockNumber = blockNumber;

return await repo.save(entity);
}

async getBlockProgress (repo: Repository<BlockProgressInterface>, blockHash: string): Promise<BlockProgressInterface | undefined> {
return repo.findOne({ where: { blockHash } });
}
Expand All @@ -182,6 +202,14 @@ export class Database {
.getMany();
}

async getLatestProcessedBlockProgress (repo: Repository<BlockProgressInterface>, isPruned: boolean): Promise<BlockProgressInterface | undefined> {
return repo.createQueryBuilder('block_progress')
.where('is_pruned = :isPruned AND is_complete = :isComplete', { isPruned, isComplete: true })
.orderBy('block_number', 'DESC')
.limit(1)
.getOne();
}

async saveBlockProgress (repo: Repository<BlockProgressInterface>, block: DeepPartial<BlockProgressInterface>): Promise<BlockProgressInterface> {
blockProgressCount.inc(1);

Expand Down
117 changes: 108 additions & 9 deletions packages/util/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
import assert from 'assert';
import debug from 'debug';
import { PubSub } from 'graphql-subscriptions';
import PgBoss from 'pg-boss';
import { constants } from 'ethers';

import { JobQueue } from './job-queue';
import { BlockProgressInterface, EventInterface, IndexerInterface, EthClient } from './types';
import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME, JOB_KIND_EVENTS, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './constants';
import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME, JOB_KIND_EVENTS, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_HISTORICAL_PROCESSING } from './constants';
import { createPruningJob, processBlockByNumber } from './common';
import { OrderDirection } from './database';
import { HISTORICAL_BLOCKS_BATCH_SIZE, HistoricalJobData } from './job-runner';

const EVENT = 'event';

Expand All @@ -26,6 +29,7 @@ export class EventWatcher {

_shutDown = false;
_signalCount = 0;
_historicalProcessingEndBlockNumber = 0;

constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
this._ethClient = ethClient;
Expand All @@ -44,6 +48,7 @@ export class EventWatcher {

async start (): Promise<void> {
await this.initBlockProcessingOnCompleteHandler();
await this.initHistoricalProcessingOnCompleteHandler();
await this.initEventProcessingOnCompleteHandler();

this.startBlockProcessing();
Expand All @@ -57,24 +62,57 @@ export class EventWatcher {
});
}

async initHistoricalProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_HISTORICAL_PROCESSING, async (job) => {
await this.historicalProcessingCompleteHandler(job);
});
}

async initEventProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
await this.eventProcessingCompleteHandler(job);
});
}

async startBlockProcessing (): Promise<void> {
const syncStatus = await this._indexer.getSyncStatus();
let startBlockNumber: number;

if (!syncStatus) {
// Get latest block in chain.
const { block: currentBlock } = await this._ethClient.getBlockByHash();
startBlockNumber = currentBlock.number;
} else {
// Get latest block in chain and sync status from DB.
const [{ block: latestBlock }, syncStatus] = await Promise.all([
this._ethClient.getBlockByHash(),
this._indexer.getSyncStatus()
]);

const latestCanonicalBlockNumber = latestBlock.number - MAX_REORG_DEPTH;
let startBlockNumber = latestBlock.number;

if (syncStatus) {
startBlockNumber = syncStatus.chainHeadBlockNumber + 1;
}

// Check if starting block for watcher is before latest canonical block
if (startBlockNumber < latestCanonicalBlockNumber) {
await this.startHistoricalBlockProcessing(startBlockNumber, latestCanonicalBlockNumber);

return;
}

await this.startRealtimeBlockProcessing(startBlockNumber);
}

async startHistoricalBlockProcessing (startBlockNumber: number, endBlockNumber: number): Promise<void> {
this._historicalProcessingEndBlockNumber = endBlockNumber;
log(`Starting historical block processing up to block ${this._historicalProcessingEndBlockNumber}`);

// Push job for historical block processing
await this._jobQueue.pushJob(
QUEUE_HISTORICAL_PROCESSING,
{
blockNumber: startBlockNumber
}
);
}

async startRealtimeBlockProcessing (startBlockNumber: number): Promise<void> {
log(`Starting realtime block processing from block ${startBlockNumber}`);
await processBlockByNumber(this._jobQueue, startBlockNumber);

// Creating an AsyncIterable from AsyncIterator to iterate over the values.
Expand Down Expand Up @@ -139,6 +177,67 @@ export class EventWatcher {
}
}

async historicalProcessingCompleteHandler (job: PgBoss.Job<any>): Promise<void> {
const { id, data: { failed, request: { data } } } = job;
const { blockNumber }: HistoricalJobData = data;

if (failed) {
log(`Job ${id} for queue ${QUEUE_HISTORICAL_PROCESSING} failed`);
return;
}

// TODO: Get batch size from config
const nextBatchStartBlockNumber = blockNumber + HISTORICAL_BLOCKS_BATCH_SIZE + 1;
log(`Historical block processing completed for block range: ${blockNumber} to ${nextBatchStartBlockNumber}`);

// Check if historical processing endBlock / latest canonical block is reached
if (nextBatchStartBlockNumber > this._historicalProcessingEndBlockNumber) {
let newSyncStatusBlock: {
blockNumber: number;
blockHash: string;
} | undefined;

// Fetch latest processed block from DB
const latestProcessedBlock = await this._indexer.getLatestProcessedBlockProgress(false);

if (latestProcessedBlock) {
if (latestProcessedBlock.blockNumber > this._historicalProcessingEndBlockNumber) {
// Set new sync status to latest processed block
newSyncStatusBlock = {
blockHash: latestProcessedBlock.blockHash,
blockNumber: latestProcessedBlock.blockNumber
};
}
}

if (!newSyncStatusBlock) {
const [block] = await this._indexer.getBlocks({ blockNumber: this._historicalProcessingEndBlockNumber });

newSyncStatusBlock = {
// At latestCanonicalBlockNumber height null block might be returned in case of FEVM
blockHash: block ? block.blockHash : constants.AddressZero,
blockNumber: this._historicalProcessingEndBlockNumber
};
}

// Update sync status to max of latest processed block or latest canonical block
const syncStatus = await this._indexer.forceUpdateSyncStatus(newSyncStatusBlock.blockHash, newSyncStatusBlock.blockNumber);
log(`Sync status canonical block updated to ${syncStatus.latestCanonicalBlockNumber}`);
// Start realtime processing
this.startBlockProcessing();

return;
}

// Push job for next batch of blocks
await this._jobQueue.pushJob(
QUEUE_HISTORICAL_PROCESSING,
{
blockNumber: nextBatchStartBlockNumber
}
);
}

async eventProcessingCompleteHandler (job: any): Promise<void> {
const { id, data: { request, failed, state, createdOn } } = job;

Expand Down
8 changes: 7 additions & 1 deletion packages/util/src/fill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,13 @@ const prefetchBlocks = async (
const blockProgress = await indexer.getBlockProgress(blockHash);

if (!blockProgress) {
await indexer.saveBlockAndFetchEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
await indexer.saveBlockAndFetchEvents({
cid,
blockHash,
blockNumber: Number(blockNumber),
parentHash,
blockTimestamp: Number(timestamp)
});
}
});

Expand Down
3 changes: 2 additions & 1 deletion packages/util/src/index-block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ export const indexBlock = async (
const blocks = await indexer.getBlocks({ blockNumber: argv.block });

blockProgressEntities = blocks.map((block: any): Partial<BlockProgressInterface> => {
block.blockTimestamp = block.timestamp;
block.blockTimestamp = Number(block.timestamp);
block.blockNumber = Number(block.blockNumber);

return block;
});
Expand Down
17 changes: 17 additions & 0 deletions packages/util/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,23 @@ export class Indexer {
return res;
}

async forceUpdateSyncStatus (blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
const dbTx = await this._db.createTransactionRunner();
let res;

try {
res = await this._db.forceUpdateSyncStatus(dbTx, blockHash, blockNumber);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}

return res;
}

async getBlocks (blockFilter: { blockNumber?: number, blockHash?: string }): Promise<any> {
assert(blockFilter.blockHash || blockFilter.blockNumber);
const result = await this._ethClient.getBlocks(blockFilter);
Expand Down
Loading

0 comments on commit 6c17662

Please sign in to comment.