diff --git a/evm/evm-processor/src/ds-archive/portal.ts b/evm/evm-processor/src/ds-archive/portal.ts index 09393927c..14d87b2d3 100644 --- a/evm/evm-processor/src/ds-archive/portal.ts +++ b/evm/evm-processor/src/ds-archive/portal.ts @@ -43,7 +43,7 @@ export class EvmPortal implements DataSource { fromBlock: req.range.from, toBlock: req.range.to, ...req.request - })) { + }, stopOnHead)) { assert(batch.length > 0, 'boundary blocks are expected to be included') let lastBlock = last(batch).header.number assert(lastBlock >= beg) @@ -66,8 +66,6 @@ export class EvmPortal implements DataSource { } top = await height.get() - - if (top < beg && stopOnHead) return } } } diff --git a/evm/evm-processor/src/processor.ts b/evm/evm-processor/src/processor.ts index 759800620..ab66d34be 100644 --- a/evm/evm-processor/src/processor.ts +++ b/evm/evm-processor/src/processor.ts @@ -114,6 +114,8 @@ export interface PortalSettings { requestTimeout?: number bufferThreshold?: number + + newBlockTimeout?: number } @@ -538,6 +540,7 @@ export class EvmBatchProcessor { url: archive.url, queryTimeout: archive.requestTimeout, bufferThreshold: archive.bufferThreshold, + newBlockTimeout: archive.newBlockTimeout, log, }) ) diff --git a/test/erc20-transfers/src/processor.ts b/test/erc20-transfers/src/processor.ts index f03ac16f0..9533af597 100644 --- a/test/erc20-transfers/src/processor.ts +++ b/test/erc20-transfers/src/processor.ts @@ -9,12 +9,13 @@ const CONTRACT = '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48'.toLowerCase() const processor = new EvmBatchProcessor() .setPortal({ - url: 'http://localhost:8000/datasets/ethereum-mainnet', - bufferThreshold: 100 * 1024 * 1024 + url: 'https://portal.sqd.dev/datasets/ethereum-mainnet', + bufferThreshold: 100 * 1024 * 1024, + newBlockTimeout: 5000, }) .setRpcEndpoint('https://rpc.ankr.com/eth') .setFinalityConfirmation(500) - .setBlockRange({from: 0}) + .setBlockRange({from: 20801368}) .setFields({ block: {size: true}, log: {transactionHash: true}, @@ -45,6 +46,6 @@ processor.run(new TypeormDatabase({supportHotBlocks: true}), async ctx => { } } - // ctx.log.info(`found ${transfers.length} transfers`) + ctx.log.info(`found ${transfers.length} transfers`) // await ctx.store.insert(transfers) }) diff --git a/util/portal-client/package.json b/util/portal-client/package.json index 4e822a9c6..c838d681b 100644 --- a/util/portal-client/package.json +++ b/util/portal-client/package.json @@ -18,7 +18,8 @@ "dependencies": { "@subsquid/util-internal": "^3.2.0", "@subsquid/util-internal-range": "^0.3.0", - "@subsquid/util-internal-archive-layout": "^1.0.0" + "@subsquid/util-internal-archive-layout": "^1.0.0", + "@subsquid/util-timeout": "^2.3.2" }, "peerDependencies": { "@subsquid/http-client": "^1.5.0", diff --git a/util/portal-client/src/client.ts b/util/portal-client/src/client.ts index 93380b9d8..7cc7c5351 100644 --- a/util/portal-client/src/client.ts +++ b/util/portal-client/src/client.ts @@ -2,15 +2,15 @@ import {HttpClient} from '@subsquid/http-client' import type {Logger} from '@subsquid/logger' import {AsyncQueue, ensureError, last, wait, withErrorContext} from '@subsquid/util-internal' import {splitLines} from '@subsquid/util-internal-archive-layout' +import {addTimeout, TimeoutError} from '@subsquid/util-timeout' import assert from 'assert' - +import {Readable} from 'stream' export interface PortalQuery { fromBlock: number toBlock?: number } - export interface Block { header: { number: number @@ -18,32 +18,34 @@ export interface Block { } } - export interface Metadata { isRealTime: boolean } - export interface PortalClientOptions { url: string - http?: HttpClient + http: HttpClient log?: Logger queryTimeout?: number bufferThreshold?: number + newBlockTimeout?: number } - export class PortalClient { private url: URL private http: HttpClient private queryTimeout: number private bufferThreshold: number + private newBlockTimeout: number + private log?: Logger constructor(options: PortalClientOptions) { this.url = new URL(options.url) - this.http = options.http || new HttpClient({log: options.log}) + this.log = options.log + this.http = options.http this.queryTimeout = options.queryTimeout ?? 180_000 this.bufferThreshold = options.bufferThreshold ?? 10 * 1024 * 1024 + this.newBlockTimeout = options.newBlockTimeout ?? 120_000 } private getDatasetUrl(path: string): string { @@ -72,7 +74,7 @@ export class PortalClient { httpTimeout: 10_000, }) return { - isRealTime: !!res.real_time + isRealTime: !!res.real_time, } } @@ -99,7 +101,10 @@ export class PortalClient { }) } - async *stream(query: Q): AsyncIterable { + async *stream( + query: Q, + stopOnHead = false + ): AsyncIterable { let queue = new AsyncQueue(1) const ingest = async () => { @@ -111,7 +116,7 @@ export class PortalClient { let archiveQuery = {...query, fromBlock} let res = await this.http - .request('POST', this.getDatasetUrl(`stream`), { + .request('POST', this.getDatasetUrl(`stream`), { json: archiveQuery, retryAttempts: 3, httpTimeout: this.queryTimeout, @@ -123,35 +128,52 @@ export class PortalClient { }) ) - for await (let lines of splitLines(res.body as AsyncIterable)) { - let batch = queue.peek() - if (batch instanceof Error) return - - if (!batch) { - bufferSize = 0 - } - - let blocks = lines.map((line) => { - bufferSize += line.length - return JSON.parse(line) as B - }) - - if (batch) { - // FIXME: won't it overflow stack? - batch.push(...blocks) - if (bufferSize > this.bufferThreshold) { - await queue.wait() - } - } else { - await queue.put(blocks) - } - - fromBlock = last(blocks).header.number + 1 - } - // no blocks left if (res.status == 204) { + if (stopOnHead) return await wait(1000) + } else { + try { + let stream = splitLines(res.body) + + while (true) { + let lines = await addTimeout(stream.next(), this.newBlockTimeout) + if (lines.done) break + + let batch = queue.peek() + if (batch instanceof Error) return + + if (!batch) { + bufferSize = 0 + } + + let blocks = lines.value.map((line) => { + bufferSize += line.length + return JSON.parse(line) as B + }) + + if (batch) { + // FIXME: won't it overflow stack? + batch.push(...blocks) + if (bufferSize > this.bufferThreshold) { + await queue.wait() + } + } else { + await queue.put(blocks) + } + + fromBlock = last(blocks).header.number + 1 + } + } catch (err) { + if (err instanceof TimeoutError) { + this.log?.warn( + `resetting stream, because we haven't seen a new blocks for ${this.newBlockTimeout} ms` + ) + res.body.destroy() + } else { + throw err + } + } } } }