From a6deed9c2755db42e42aa6c19b34f5458e9a846d Mon Sep 17 00:00:00 2001 From: Nabarun Gogoi Date: Thu, 28 Dec 2023 15:06:47 +0530 Subject: [PATCH] Fix block processing during chain reorg (#498) * Fix block processing during chain reorg * Add new method in test dummy indexer * Add missing semicolon --- .../src/templates/indexer-template.handlebars | 4 ++++ packages/graph-node/test/utils/indexer.ts | 4 ++++ packages/util/src/common.ts | 16 ++++--------- packages/util/src/indexer.ts | 8 +++++-- packages/util/src/job-runner.ts | 23 +++++++++++++++++-- packages/util/src/types.ts | 1 + 6 files changed, 40 insertions(+), 16 deletions(-) diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index 148b480c..8bb78f23 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -869,4 +869,8 @@ export class Indexer implements IndexerInterface { await dbTx.release(); } } + + async getFullTransactions (txHashList: string[]): Promise { + return this._baseIndexer.getFullTransactions(txHashList); + } } diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index c11c4a4c..1aec8ee3 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -334,4 +334,8 @@ export class Indexer implements IndexerInterface { async processStateCheckpoint (contractAddress: string, blockHash: string): Promise { return false; } + + async getFullTransactions (txHashList: string[]): Promise { + return []; + } } diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index e882b8b5..dfa5a5fa 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -23,7 +23,7 @@ const log = debug('vulcanize:common'); const JSONbigNative = JSONbig({ useNativeBigInt: true }); export interface PrefetchedBlock { - block: BlockProgressInterface; + block?: BlockProgressInterface; events: DeepPartial[]; ethFullBlock: EthFullBlock; ethFullTransactions: EthFullTransaction[]; @@ -64,15 +64,7 @@ export const fetchBlocksAtHeight = async ( jobQueueConfig: JobQueueConfig, blockAndEventsMap: Map ): Promise[]> => { - let blocks = []; - - // Try fetching blocks from the db. - const blockProgressEntities = await indexer.getBlocksAtHeight(blockNumber, false); - blocks = blockProgressEntities.map((block: any) => { - block.timestamp = block.blockTimestamp; - - return block; - }); + let blocks: EthFullBlock[] = []; // Try fetching blocks from eth-server until found. while (!blocks.length) { @@ -82,8 +74,8 @@ export const fetchBlocksAtHeight = async ( // Check if all blocks are null and increment blockNumber to index next block number if (ethFullBlocks.length > 0 && ethFullBlocks.every(block => block === null)) { - blockNumber++; log(`Block ${blockNumber} requested was null (FEVM); Fetching next block`); + blockNumber++; continue; } @@ -125,7 +117,7 @@ export const fetchBlocksAtHeight = async ( }); } - await indexer.updateSyncStatusChainHead(blocks[0].blockHash, blocks[0].blockNumber); + await indexer.updateSyncStatusChainHead(blocks[0].blockHash, Number(blocks[0].blockNumber)); return blocksToBeIndexed; }; diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 7825a466..292def97 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -530,11 +530,15 @@ export class Indexer { } async _fetchTxsFromLogs (logs: any[]): Promise { - const txHashes = Array.from([ + const txHashList = Array.from([ ...new Set(logs.map((log) => log.transaction.hash)) ]); - const ethFullTxPromises = txHashes.map(async txHash => { + return this.getFullTransactions(txHashList); + } + + async getFullTransactions (txHashList: string[]): Promise { + const ethFullTxPromises = txHashList.map(async txHash => { return this._ethClient.getFullTransaction(txHash); }); diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 7241baa3..163fd2cd 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -587,6 +587,9 @@ export class JobRunner { } } + const data = this._blockAndEventsMap.get(blockHash); + assert(data); + if (!blockProgress) { // Delay required to process block. const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig; @@ -598,9 +601,24 @@ export class JobRunner { [blockProgress, , ethFullTransactions] = await this._indexer.saveBlockAndFetchEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp }); log(`_indexBlock#saveBlockAndFetchEvents: fetched for block: ${blockProgress.blockHash} num events: ${blockProgress.numEvents}`); console.timeEnd('time:job-runner#_indexBlock-saveBlockAndFetchEvents'); - const data = this._blockAndEventsMap.get(blockHash); - assert(data); + this._blockAndEventsMap.set( + blockHash, + { + ...data, + block: blockProgress, + ethFullTransactions + }); + } else { + const events = await this._indexer.getBlockEvents(blockHash, {}, {}); + + const txHashList = Array.from([ + ...new Set(events.map((event) => event.txHash)) + ]); + + const ethFullTransactions = await this._indexer.getFullTransactions(txHashList); + + // const ethFullTransactions = this._blockAndEventsMap.set( blockHash, { @@ -627,6 +645,7 @@ export class JobRunner { const prefetchedBlock = this._blockAndEventsMap.get(blockHash); assert(prefetchedBlock); const { block, ethFullBlock, ethFullTransactions } = prefetchedBlock; + assert(block, 'BlockProgress not set in blockAndEvents map'); try { log(`Processing events for block ${block.blockNumber}`); diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 277406b8..0af2b4bc 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -233,6 +233,7 @@ export interface IndexerInterface { resetWatcherToBlock (blockNumber: number): Promise clearProcessedBlockData (block: BlockProgressInterface): Promise getResultEvent (event: EventInterface): any + getFullTransactions (txHashList: string[]): Promise } export interface DatabaseInterface {