Skip to content

Commit

Permalink
fix: close webrtc streams
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain committed Sep 23, 2023
1 parent 972b10a commit 30b2513
Show file tree
Hide file tree
Showing 19 changed files with 897 additions and 425 deletions.
16 changes: 12 additions & 4 deletions packages/transport-webrtc/.aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ export default {
before: async () => {
const { createLibp2p } = await import('libp2p')
const { circuitRelayServer } = await import('libp2p/circuit-relay')
const { identifyService } = await import('libp2p/identify')
const { webSockets } = await import('@libp2p/websockets')
const { noise } = await import('@chainsafe/libp2p-noise')
const { yamux } = await import('@chainsafe/libp2p-yamux')
Expand All @@ -34,11 +33,20 @@ export default {
reservations: {
maxReservations: Infinity
}
}),
identify: identifyService()
})
},
connectionManager: {
minConnections: 0
minConnections: 0,
inboundConnectionThreshold: Infinity
},
connectionGater: {
denyDialMultiaddr: (ma) => {
if (ma.toOptions().family === 6) {
return true
}

return false
}
}
})

Expand Down
2 changes: 1 addition & 1 deletion packages/transport-webrtc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
"@multiformats/mafmt": "^12.1.2",
"@multiformats/multiaddr": "^12.1.5",
"@multiformats/multiaddr-matcher": "^1.0.1",
"abortable-iterator": "^5.0.1",
"detect-browser": "^5.3.0",
"it-length-prefixed": "^9.0.1",
"it-pipe": "^3.0.1",
Expand All @@ -65,6 +64,7 @@
"node-datachannel": "^0.4.3",
"p-defer": "^4.0.0",
"p-event": "^6.0.0",
"p-timeout": "^6.1.2",
"protons-runtime": "^5.0.0",
"uint8arraylist": "^2.4.3",
"uint8arrays": "^4.0.6"
Expand Down
34 changes: 34 additions & 0 deletions packages/transport-webrtc/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,40 @@ import { WebRTCDirectTransport, type WebRTCTransportDirectInit, type WebRTCDirec
import type { WebRTCTransportComponents, WebRTCTransportInit } from './private-to-private/transport.js'
import type { Transport } from '@libp2p/interface/transport'

export interface DataChannelOptions {
/**
* The maximum message size sendable over the channel
*/
maxMessageSize?: number

/**
* If the channel's `bufferedAmount` grows over this amount in bytes, wait
* for it to drain before sending more data (default: 16MB)
*/
maxBufferedAmount?: number

/**
* When `bufferedAmount` is above `maxBufferedAmount`, we pause sending until
* the `bufferedAmountLow` event fires - this controls how long we wait for
* that event in ms (default: 30s)
*/
bufferedAmountLowEventTimeout?: number

/**
* When closing a stream, we wait for `bufferedAmount` to become 0 before
* closing the underlying RTCDataChannel - this controls how long we wait
* (default: 30s)
*/
drainTimeout?: number

/**
* When closing a stream we first send a FIN flag to the remote and wait
* for a FIN_ACK reply before closing the underlying RTCDataChannel - this
* controls how long we wait for the acknowledgement (default: 5s)
*/
closeTimeout?: number
}

/**
* @param {WebRTCTransportDirectInit} init - WebRTC direct transport configuration
* @param init.dataChannel - DataChannel configurations
Expand Down
12 changes: 10 additions & 2 deletions packages/transport-webrtc/src/maconn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type { CounterGroup } from '@libp2p/interface/metrics'
import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr'
import type { Source, Sink } from 'it-stream-types'

const log = logger('libp2p:webrtc:connection')
const log = logger('libp2p:webrtc:maconn')

interface WebRTCMultiaddrConnectionInit {
/**
Expand Down Expand Up @@ -65,8 +65,16 @@ export class WebRTCMultiaddrConnection implements MultiaddrConnection {
this.timeline = init.timeline
this.peerConnection = init.peerConnection

const initialState = this.peerConnection.connectionState

this.peerConnection.onconnectionstatechange = () => {
if (this.peerConnection.connectionState === 'closed' || this.peerConnection.connectionState === 'disconnected' || this.peerConnection.connectionState === 'failed') {
log.trace('peer connection state change', this.peerConnection.connectionState, 'initial state', initialState)

if (this.peerConnection.connectionState === 'disconnected') {
// attempt to reconnect
this.peerConnection.restartIce()
} else if (this.peerConnection.connectionState === 'closed') {
// nothing else to do but close the connection
this.timeline.close = Date.now()
}
}
Expand Down
101 changes: 58 additions & 43 deletions packages/transport-webrtc/src/muxer.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import { logger } from '@libp2p/logger'
import { createStream } from './stream.js'
import { nopSink, nopSource } from './util.js'
import type { DataChannelOpts } from './stream.js'
import { drainAndClose, nopSink, nopSource } from './util.js'
import type { DataChannelOptions } from './index.js'
import type { Stream } from '@libp2p/interface/connection'
import type { CounterGroup } from '@libp2p/interface/metrics'
import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface/stream-muxer'
import type { AbortOptions } from '@multiformats/multiaddr'
import type { Source, Sink } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'

const log = logger('libp2p:webrtc:muxer')

const PROTOCOL = '/webrtc'

export interface DataChannelMuxerFactoryInit {
Expand All @@ -17,19 +20,16 @@ export interface DataChannelMuxerFactoryInit {
peerConnection: RTCPeerConnection

/**
* Optional metrics for this data channel muxer
* The protocol to use
*/
metrics?: CounterGroup
protocol?: string

/**
* Data channel options
* Optional metrics for this data channel muxer
*/
dataChannelOptions?: Partial<DataChannelOpts>
metrics?: CounterGroup

/**
* The protocol to use
*/
protocol?: string
dataChannelOptions?: DataChannelOptions
}

export class DataChannelMuxerFactory implements StreamMuxerFactory {
Expand All @@ -41,23 +41,23 @@ export class DataChannelMuxerFactory implements StreamMuxerFactory {
private readonly peerConnection: RTCPeerConnection
private streamBuffer: Stream[] = []
private readonly metrics?: CounterGroup
private readonly dataChannelOptions?: Partial<DataChannelOpts>
private readonly dataChannelOptions?: DataChannelOptions

constructor (init: DataChannelMuxerFactoryInit) {
this.peerConnection = init.peerConnection
this.metrics = init.metrics
this.protocol = init.protocol ?? PROTOCOL
this.dataChannelOptions = init.dataChannelOptions
this.dataChannelOptions = init.dataChannelOptions ?? {}

// store any datachannels opened before upgrade has been completed
this.peerConnection.ondatachannel = ({ channel }) => {
const stream = createStream({
channel,
direction: 'inbound',
dataChannelOptions: init.dataChannelOptions,
onEnd: () => {
this.streamBuffer = this.streamBuffer.filter(s => s.id !== stream.id)
}
},
...this.dataChannelOptions
})
this.streamBuffer.push(stream)
}
Expand Down Expand Up @@ -90,34 +90,15 @@ export class DataChannelMuxer implements StreamMuxer {
public protocol: string

private readonly peerConnection: RTCPeerConnection
private readonly dataChannelOptions?: DataChannelOpts
private readonly dataChannelOptions: DataChannelOptions
private readonly metrics?: CounterGroup

/**
* Gracefully close all tracked streams and stop the muxer
*/
close: (options?: AbortOptions) => Promise<void> = async () => { }

/**
* Abort all tracked streams and stop the muxer
*/
abort: (err: Error) => void = () => { }

/**
* The stream source, a no-op as the transport natively supports multiplexing
*/
source: AsyncGenerator<Uint8Array, any, unknown> = nopSource()

/**
* The stream destination, a no-op as the transport natively supports multiplexing
*/
sink: Sink<Source<Uint8Array | Uint8ArrayList>, Promise<void>> = nopSink

constructor (readonly init: DataChannelMuxerInit) {
this.streams = init.streams
this.peerConnection = init.peerConnection
this.protocol = init.protocol ?? PROTOCOL
this.metrics = init.metrics
this.dataChannelOptions = init.dataChannelOptions ?? {}

/**
* Fired when a data channel has been added to the connection has been
Expand All @@ -129,19 +110,19 @@ export class DataChannelMuxer implements StreamMuxer {
const stream = createStream({
channel,
direction: 'inbound',
dataChannelOptions: this.dataChannelOptions,
onEnd: () => {
log.trace('stream %s %s %s onEnd', stream.direction, stream.id, stream.protocol)
drainAndClose(channel, `inbound ${stream.id} ${stream.protocol}`, this.dataChannelOptions.drainTimeout)
this.streams = this.streams.filter(s => s.id !== stream.id)
this.metrics?.increment({ stream_end: true })
init?.onStreamEnd?.(stream)
}
},
...this.dataChannelOptions
})

this.streams.push(stream)
if ((init?.onIncomingStream) != null) {
this.metrics?.increment({ incoming_stream: true })
init.onIncomingStream(stream)
}
this.metrics?.increment({ incoming_stream: true })
init?.onIncomingStream?.(stream)
}

const onIncomingStream = init?.onIncomingStream
Expand All @@ -150,19 +131,53 @@ export class DataChannelMuxer implements StreamMuxer {
}
}

/**
* Gracefully close all tracked streams and stop the muxer
*/
async close (options?: AbortOptions): Promise<void> {
try {
await Promise.all(
this.streams.map(async stream => stream.close(options))
)
} catch (err: any) {
this.abort(err)
}
}

/**
* Abort all tracked streams and stop the muxer
*/
abort (err: Error): void {
for (const stream of this.streams) {
stream.abort(err)
}
}

/**
* The stream source, a no-op as the transport natively supports multiplexing
*/
source: AsyncGenerator<Uint8Array, any, unknown> = nopSource()

/**
* The stream destination, a no-op as the transport natively supports multiplexing
*/
sink: Sink<Source<Uint8Array | Uint8ArrayList>, Promise<void>> = nopSink

newStream (): Stream {
// The spec says the label SHOULD be an empty string: https://github.com/libp2p/specs/blob/master/webrtc/README.md#rtcdatachannel-label
const channel = this.peerConnection.createDataChannel('')
const stream = createStream({
channel,
direction: 'outbound',
dataChannelOptions: this.dataChannelOptions,
onEnd: () => {
log.trace('stream %s %s %s onEnd', stream.direction, stream.id, stream.protocol)
drainAndClose(channel, `outbound ${stream.id} ${stream.protocol}`, this.dataChannelOptions.drainTimeout)
channel.close() // Stream initiator is responsible for closing the channel
this.streams = this.streams.filter(s => s.id !== stream.id)
this.metrics?.increment({ stream_end: true })
this.init?.onStreamEnd?.(stream)
}
},
...this.dataChannelOptions
})
this.streams.push(stream)
this.metrics?.increment({ outgoing_stream: true })
Expand Down
Loading

0 comments on commit 30b2513

Please sign in to comment.