Skip to content

Commit

Permalink
add stream timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
belopash committed Nov 6, 2024
1 parent 4494d21 commit 2808f0a
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 44 deletions.
4 changes: 1 addition & 3 deletions evm/evm-processor/src/ds-archive/portal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export class EvmPortal implements DataSource<Block, DataRequest> {
fromBlock: req.range.from,
toBlock: req.range.to,
...req.request
})) {
}, stopOnHead)) {
assert(batch.length > 0, 'boundary blocks are expected to be included')
let lastBlock = last(batch).header.number
assert(lastBlock >= beg)
Expand All @@ -66,8 +66,6 @@ export class EvmPortal implements DataSource<Block, DataRequest> {
}

top = await height.get()

if (top < beg && stopOnHead) return
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions evm/evm-processor/src/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ export interface PortalSettings {
requestTimeout?: number

bufferThreshold?: number

newBlockTimeout?: number
}


Expand Down Expand Up @@ -538,6 +540,7 @@ export class EvmBatchProcessor<F extends FieldSelection = {}> {
url: archive.url,
queryTimeout: archive.requestTimeout,
bufferThreshold: archive.bufferThreshold,
newBlockTimeout: archive.newBlockTimeout,
log,
})
)
Expand Down
9 changes: 5 additions & 4 deletions test/erc20-transfers/src/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ const CONTRACT = '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48'.toLowerCase()

const processor = new EvmBatchProcessor()
.setPortal({
url: 'http://localhost:8000/datasets/ethereum-mainnet',
bufferThreshold: 100 * 1024 * 1024
url: 'https://portal.sqd.dev/datasets/ethereum-mainnet',
bufferThreshold: 100 * 1024 * 1024,
newBlockTimeout: 5000,
})
.setRpcEndpoint('https://rpc.ankr.com/eth')
.setFinalityConfirmation(500)
.setBlockRange({from: 0})
.setBlockRange({from: 20801368})
.setFields({
block: {size: true},
log: {transactionHash: true},
Expand Down Expand Up @@ -45,6 +46,6 @@ processor.run(new TypeormDatabase({supportHotBlocks: true}), async ctx => {
}
}

// ctx.log.info(`found ${transfers.length} transfers`)
ctx.log.info(`found ${transfers.length} transfers`)
// await ctx.store.insert(transfers)
})
3 changes: 2 additions & 1 deletion util/portal-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
"dependencies": {
"@subsquid/util-internal": "^3.2.0",
"@subsquid/util-internal-range": "^0.3.0",
"@subsquid/util-internal-archive-layout": "^1.0.0"
"@subsquid/util-internal-archive-layout": "^1.0.0",
"@subsquid/util-timeout": "^2.3.2"
},
"peerDependencies": {
"@subsquid/http-client": "^1.5.0",
Expand Down
94 changes: 58 additions & 36 deletions util/portal-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,50 @@ import {HttpClient} from '@subsquid/http-client'
import type {Logger} from '@subsquid/logger'
import {AsyncQueue, ensureError, last, wait, withErrorContext} from '@subsquid/util-internal'
import {splitLines} from '@subsquid/util-internal-archive-layout'
import {addTimeout, TimeoutError} from '@subsquid/util-timeout'
import assert from 'assert'

import {Readable} from 'stream'

export interface PortalQuery {
fromBlock: number
toBlock?: number
}


export interface Block {
header: {
number: number
hash: string
}
}


export interface Metadata {
isRealTime: boolean
}


export interface PortalClientOptions {
url: string
http?: HttpClient
http: HttpClient
log?: Logger
queryTimeout?: number
bufferThreshold?: number
newBlockTimeout?: number
}


export class PortalClient {
private url: URL
private http: HttpClient
private queryTimeout: number
private bufferThreshold: number
private newBlockTimeout: number
private log?: Logger

constructor(options: PortalClientOptions) {
this.url = new URL(options.url)
this.http = options.http || new HttpClient({log: options.log})
this.log = options.log
this.http = options.http
this.queryTimeout = options.queryTimeout ?? 180_000
this.bufferThreshold = options.bufferThreshold ?? 10 * 1024 * 1024
this.newBlockTimeout = options.newBlockTimeout ?? 120_000
}

private getDatasetUrl(path: string): string {
Expand Down Expand Up @@ -72,7 +74,7 @@ export class PortalClient {
httpTimeout: 10_000,
})
return {
isRealTime: !!res.real_time
isRealTime: !!res.real_time,
}
}

Expand All @@ -99,7 +101,10 @@ export class PortalClient {
})
}

async *stream<B extends Block = Block, Q extends PortalQuery = PortalQuery>(query: Q): AsyncIterable<B[]> {
async *stream<B extends Block = Block, Q extends PortalQuery = PortalQuery>(
query: Q,
stopOnHead = false
): AsyncIterable<B[]> {
let queue = new AsyncQueue<B[] | Error>(1)

const ingest = async () => {
Expand All @@ -111,7 +116,7 @@ export class PortalClient {
let archiveQuery = {...query, fromBlock}

let res = await this.http
.request<NodeJS.ReadableStream>('POST', this.getDatasetUrl(`stream`), {
.request<Readable>('POST', this.getDatasetUrl(`stream`), {
json: archiveQuery,
retryAttempts: 3,
httpTimeout: this.queryTimeout,
Expand All @@ -123,35 +128,52 @@ export class PortalClient {
})
)

for await (let lines of splitLines(res.body as AsyncIterable<Buffer>)) {
let batch = queue.peek()
if (batch instanceof Error) return

if (!batch) {
bufferSize = 0
}

let blocks = lines.map((line) => {
bufferSize += line.length
return JSON.parse(line) as B
})

if (batch) {
// FIXME: won't it overflow stack?
batch.push(...blocks)
if (bufferSize > this.bufferThreshold) {
await queue.wait()
}
} else {
await queue.put(blocks)
}

fromBlock = last(blocks).header.number + 1
}

// no blocks left
if (res.status == 204) {
if (stopOnHead) return
await wait(1000)
} else {
try {
let stream = splitLines(res.body)

while (true) {
let lines = await addTimeout(stream.next(), this.newBlockTimeout)
if (lines.done) break

let batch = queue.peek()
if (batch instanceof Error) return

if (!batch) {
bufferSize = 0
}

let blocks = lines.value.map((line) => {
bufferSize += line.length
return JSON.parse(line) as B
})

if (batch) {
// FIXME: won't it overflow stack?
batch.push(...blocks)
if (bufferSize > this.bufferThreshold) {
await queue.wait()
}
} else {
await queue.put(blocks)
}

fromBlock = last(blocks).header.number + 1
}
} catch (err) {
if (err instanceof TimeoutError) {
this.log?.warn(
`resetting stream, because we haven't seen a new blocks for ${this.newBlockTimeout} ms`
)
res.body.destroy()
} else {
throw err
}
}
}
}
}
Expand Down

0 comments on commit 2808f0a

Please sign in to comment.