Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: close webrtc streams without data loss #2073

Merged
merged 17 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/interface/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,15 @@
"it-stream-types": "^2.0.1",
"multiformats": "^12.0.1",
"p-defer": "^4.0.0",
"race-signal": "^1.0.0",
"uint8arraylist": "^2.4.3"
},
"devDependencies": {
"@types/sinon": "^10.0.15",
"aegir": "^40.0.8",
"delay": "^6.0.0",
"it-all": "^3.0.3",
"it-drain": "^3.0.3",
"sinon": "^16.0.0",
"sinon-ts": "^1.0.0"
}
Expand Down
68 changes: 45 additions & 23 deletions packages/interface/src/stream-muxer/stream.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { abortableSource } from 'abortable-iterator'
import { type Pushable, pushable } from 'it-pushable'
import defer, { type DeferredPromise } from 'p-defer'
import { raceSignal } from 'race-signal'
import { Uint8ArrayList } from 'uint8arraylist'
import { CodeError } from '../errors.js'
import type { Direction, ReadStatus, Stream, StreamStatus, StreamTimeline, WriteStatus } from '../connection/index.js'
import type { AbortOptions } from '../index.js'
import type { Source } from 'it-stream-types'

// copied from @libp2p/logger to break a circular dependency
interface Logger {
(formatter: any, ...args: any[]): void
error: (formatter: any, ...args: any[]) => void
Expand All @@ -16,6 +18,7 @@ interface Logger {

const ERR_STREAM_RESET = 'ERR_STREAM_RESET'
const ERR_SINK_INVALID_STATE = 'ERR_SINK_INVALID_STATE'
const DEFAULT_SEND_CLOSE_WRITE_TIMEOUT = 5000

export interface AbstractStreamInit {
/**
Expand Down Expand Up @@ -68,6 +71,12 @@ export interface AbstractStreamInit {
* connection when closing the writable end of the stream. (default: 500)
*/
closeTimeout?: number

/**
* After the stream sink has closed, a limit on how long it takes to send
* a close-write message to the remote peer.
*/
sendCloseWriteTimeout?: number
}

function isPromise (res?: any): res is Promise<void> {
Expand All @@ -94,6 +103,7 @@ export abstract class AbstractStream implements Stream {
private readonly onCloseWrite?: () => void
private readonly onReset?: () => void
private readonly onAbort?: (err: Error) => void
private readonly sendCloseWriteTimeout: number

protected readonly log: Logger

Expand All @@ -113,6 +123,7 @@ export abstract class AbstractStream implements Stream {
this.timeline = {
open: Date.now()
}
this.sendCloseWriteTimeout = init.sendCloseWriteTimeout ?? DEFAULT_SEND_CLOSE_WRITE_TIMEOUT

this.onEnd = init.onEnd
this.onCloseRead = init?.onCloseRead
Expand All @@ -128,7 +139,6 @@ export abstract class AbstractStream implements Stream {
this.log.trace('source ended')
}

this.readStatus = 'closed'
this.onSourceEnd(err)
}
})
Expand Down Expand Up @@ -173,11 +183,19 @@ export abstract class AbstractStream implements Stream {
}
}

this.log.trace('sink finished reading from source')
this.writeStatus = 'done'
this.log.trace('sink finished reading from source, write status is "%s"', this.writeStatus)

if (this.writeStatus === 'writing') {
this.writeStatus = 'closing'

this.log.trace('send close write to remote')
await this.sendCloseWrite({
signal: AbortSignal.timeout(this.sendCloseWriteTimeout)
})

this.writeStatus = 'closed'
}

this.log.trace('sink calling closeWrite')
await this.closeWrite(options)
this.onSinkEnd()
} catch (err: any) {
this.log.trace('sink ended with error, calling abort with error', err)
Expand All @@ -196,6 +214,7 @@ export abstract class AbstractStream implements Stream {
}

this.timeline.closeRead = Date.now()
this.readStatus = 'closed'

if (err != null && this.endErr == null) {
this.endErr = err
Expand All @@ -207,6 +226,10 @@ export abstract class AbstractStream implements Stream {
this.log.trace('source and sink ended')
this.timeline.close = Date.now()

if (this.status !== 'aborted' && this.status !== 'reset') {
this.status = 'closed'
}

if (this.onEnd != null) {
this.onEnd(this.endErr)
}
Expand All @@ -221,6 +244,7 @@ export abstract class AbstractStream implements Stream {
}

this.timeline.closeWrite = Date.now()
this.writeStatus = 'closed'

if (err != null && this.endErr == null) {
this.endErr = err
Expand All @@ -232,6 +256,10 @@ export abstract class AbstractStream implements Stream {
this.log.trace('sink and source ended')
this.timeline.close = Date.now()

if (this.status !== 'aborted' && this.status !== 'reset') {
this.status = 'closed'
}

if (this.onEnd != null) {
this.onEnd(this.endErr)
}
Expand Down Expand Up @@ -266,16 +294,16 @@ export abstract class AbstractStream implements Stream {
const readStatus = this.readStatus
this.readStatus = 'closing'

if (readStatus === 'ready') {
this.log.trace('ending internal source queue')
this.streamSource.end()
}

if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeRead == null) {
this.log.trace('send close read to remote')
await this.sendCloseRead(options)
}

if (readStatus === 'ready') {
this.log.trace('ending internal source queue')
this.streamSource.end()
}

this.log.trace('closed readable end of stream')
}

Expand All @@ -286,33 +314,26 @@ export abstract class AbstractStream implements Stream {

this.log.trace('closing writable end of stream with starting write status "%s"', this.writeStatus)

const writeStatus = this.writeStatus

if (this.writeStatus === 'ready') {
this.log.trace('sink was never sunk, sink an empty array')
await this.sink([])
}

this.writeStatus = 'closing'
await raceSignal(this.sink([]), options.signal)
}

if (writeStatus === 'writing') {
if (this.writeStatus === 'writing') {
// stop reading from the source passed to `.sink` in the microtask queue
// - this lets any data queued by the user in the current tick get read
// before we exit
await new Promise((resolve, reject) => {
queueMicrotask(() => {
this.log.trace('aborting source passed to .sink')
this.sinkController.abort()
this.sinkEnd.promise.then(resolve, reject)
raceSignal(this.sinkEnd.promise, options.signal)
.then(resolve, reject)
})
})
}

if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeWrite == null) {
this.log.trace('send close write to remote')
await this.sendCloseWrite(options)
}

this.writeStatus = 'closed'

this.log.trace('closed writable end of stream')
Expand Down Expand Up @@ -357,6 +378,7 @@ export abstract class AbstractStream implements Stream {
const err = new CodeError('stream reset', ERR_STREAM_RESET)

this.status = 'reset'
this.timeline.reset = Date.now()
this._closeSinkAndSource(err)
this.onReset?.()
}
Expand Down Expand Up @@ -423,7 +445,7 @@ export abstract class AbstractStream implements Stream {
return
}

this.log.trace('muxer destroyed')
this.log.trace('stream destroyed')

this._closeSinkAndSource()
}
Expand Down
16 changes: 16 additions & 0 deletions packages/interface/test/fixtures/logger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// copied from @libp2p/logger to break a circular dependency
interface Logger {
(): void
error: () => void
trace: () => void
enabled: boolean
}

export function logger (): Logger {
const output = (): void => {}
output.trace = (): void => {}
output.error = (): void => {}
output.enabled = false

return output
}
Loading