diff --git a/packages/transport-webrtc/src/muxer.ts b/packages/transport-webrtc/src/muxer.ts index 991d130230..ea8ed34fc6 100644 --- a/packages/transport-webrtc/src/muxer.ts +++ b/packages/transport-webrtc/src/muxer.ts @@ -1,5 +1,5 @@ import { createStream } from './stream.js' -import { nopSink, nopSource } from './util.js' +import { drainAndClose, nopSink, nopSource } from './util.js' import type { DataChannelOpts } from './stream.js' import type { Stream } from '@libp2p/interface/connection' import type { CounterGroup } from '@libp2p/interface/metrics' @@ -9,6 +9,7 @@ import type { Source, Sink } from 'it-stream-types' import type { Uint8ArrayList } from 'uint8arraylist' const PROTOCOL = '/webrtc' +const DEFAULT_CHANNEL_CLOSE_DELAY = 30000 export interface DataChannelMuxerFactoryInit { /** @@ -131,6 +132,7 @@ export class DataChannelMuxer implements StreamMuxer { direction: 'inbound', dataChannelOptions: this.dataChannelOptions, onEnd: () => { + drainAndClose(channel, this.dataChannelOptions?.closeDelay ?? DEFAULT_CHANNEL_CLOSE_DELAY) this.streams = this.streams.filter(s => s.id !== stream.id) this.metrics?.increment({ stream_end: true }) init?.onStreamEnd?.(stream) @@ -158,7 +160,7 @@ export class DataChannelMuxer implements StreamMuxer { direction: 'outbound', dataChannelOptions: this.dataChannelOptions, onEnd: () => { - channel.close() // Stream initiator is responsible for closing the channel + drainAndClose(channel, this.dataChannelOptions?.closeDelay ?? DEFAULT_CHANNEL_CLOSE_DELAY) this.streams = this.streams.filter(s => s.id !== stream.id) this.metrics?.increment({ stream_end: true }) this.init?.onStreamEnd?.(stream) diff --git a/packages/transport-webrtc/src/private-to-private/transport.ts b/packages/transport-webrtc/src/private-to-private/transport.ts index 1dd1804140..caaa8c1858 100644 --- a/packages/transport-webrtc/src/private-to-private/transport.ts +++ b/packages/transport-webrtc/src/private-to-private/transport.ts @@ -105,7 +105,7 @@ export class WebRTCTransport implements Transport, Startable { * /p2p//p2p-circuit/webrtc/p2p/ */ async dial (ma: Multiaddr, options: DialOptions): Promise { - log.trace('dialing address: ', ma) + log.trace('dialing address: %a', ma) const { baseAddr, peerId } = splitAddr(ma) if (options.signal == null) { diff --git a/packages/transport-webrtc/src/private-to-public/transport.ts b/packages/transport-webrtc/src/private-to-public/transport.ts index b0c37a7306..b5508a73af 100644 --- a/packages/transport-webrtc/src/private-to-public/transport.ts +++ b/packages/transport-webrtc/src/private-to-public/transport.ts @@ -56,6 +56,7 @@ export interface WebRTCMetrics { export interface WebRTCTransportDirectInit { dataChannel?: Partial + dataChannelCloseDelay?: number } export class WebRTCDirectTransport implements Transport { @@ -80,7 +81,7 @@ export class WebRTCDirectTransport implements Transport { */ async dial (ma: Multiaddr, options: WebRTCDialOptions): Promise { const rawConn = await this._connect(ma, options) - log(`dialing address - ${ma.toString()}`) + log.trace('dialing address: %a', ma) return rawConn } diff --git a/packages/transport-webrtc/src/stream.ts b/packages/transport-webrtc/src/stream.ts index 1e9bb0c3c7..f2bd035f5c 100644 --- a/packages/transport-webrtc/src/stream.ts +++ b/packages/transport-webrtc/src/stream.ts @@ -12,6 +12,7 @@ export interface DataChannelOpts { maxMessageSize: number maxBufferedAmount: number bufferedAmountLowEventTimeout: number + closeDelay?: number } export interface WebRTCStreamInit extends AbstractStreamInit { diff --git a/packages/transport-webrtc/src/util.ts b/packages/transport-webrtc/src/util.ts index e26e64dd5f..36af653733 100644 --- a/packages/transport-webrtc/src/util.ts +++ b/packages/transport-webrtc/src/util.ts @@ -1,4 +1,8 @@ +import { logger } from '@libp2p/logger' import { detect } from 'detect-browser' +import pDefer from 'p-defer' + +const log = logger('libp2p:webrtc:utils') const browser = detect() export const isFirefox = ((browser != null) && browser.name === 'firefox') @@ -6,3 +10,50 @@ export const isFirefox = ((browser != null) && browser.name === 'firefox') export const nopSource = async function * nop (): AsyncGenerator {} export const nopSink = async (_: any): Promise => {} + +export function drainAndClose (channel: RTCDataChannel, channelCloseDelay: number): void { + if (channel.readyState !== 'open') { + return + } + + void Promise.resolve() + .then(async () => { + // wait for bufferedAmount to become zero + if (channel.bufferedAmount > 0) { + const deferred = pDefer() + + channel.bufferedAmountLowThreshold = 0 + channel.addEventListener('bufferedamountlow', () => { + deferred.resolve() + }) + + await deferred.promise + } + }) + .then(async () => { + const deferred = pDefer() + + // event if bufferedAmount is zero there can still be unsent bytes + const timeout = setTimeout(() => { + try { + // only close if the channel is still open + if (channel.readyState === 'open') { + channel.close() + } + + deferred.resolve() + } catch (err: any) { + deferred.reject(err) + } + }, channelCloseDelay) + + if (timeout.unref != null) { + timeout.unref() + } + + await deferred.promise + }) + .catch(err => { + log.error('error closing outbound stream', err) + }) +}