From 0ecfd231c7fe91b97ea11f78c8cf49fb220416fb Mon Sep 17 00:00:00 2001 From: Alexander Sandor <137198655+SandPod@users.noreply.github.com> Date: Fri, 6 Dec 2024 12:35:05 +0100 Subject: [PATCH] fix: Introduce end of stream message to close stream gracefully. (#3025) --- .../helpers/method_stream_manager.dart | 51 ++++++++++--------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/packages/serverpod/lib/src/server/websocket_request_handlers/helpers/method_stream_manager.dart b/packages/serverpod/lib/src/server/websocket_request_handlers/helpers/method_stream_manager.dart index e02dbe820a..239765b55b 100644 --- a/packages/serverpod/lib/src/server/websocket_request_handlers/helpers/method_stream_manager.dart +++ b/packages/serverpod/lib/src/server/websocket_request_handlers/helpers/method_stream_manager.dart @@ -351,24 +351,35 @@ class MethodStreamManager { await session.close(); }); + var streamKey = _buildStreamKey( + endpoint: methodStreamCallContext.fullEndpointPath, + method: methodStreamCallContext.method.name, + methodStreamId: methodStreamId, + ); + late StreamSubscription subscription; subscription = outputController.stream.listen( - (value) { + (value) async { _onOutputStreamValue?.call( methodStreamId, value, methodStreamCallContext); }, - onError: (e, s) async { + onError: (e, s) { + // All method calls that return futures are unawaited to ensure that + // the calls are invoked synchronously. If an 'await' is added + // here, processing new messages might be initiated before the + // subscription is canceled. + if (e is _StreamComplete) { + _updateCloseReason(streamKey, CloseReason.done); + unawaited(subscription.cancel()); + return; + } + _onOutputStreamError?.call( methodStreamId, e, s, methodStreamCallContext); - var streamKey = _buildStreamKey( - endpoint: methodStreamCallContext.fullEndpointPath, - method: methodStreamCallContext.method.name, - methodStreamId: methodStreamId, - ); _updateCloseReason(streamKey, CloseReason.error); - await session.close(error: e, stackTrace: s); + unawaited(session.close(error: e, stackTrace: s)); /// Required to close stream when error occurs. /// This will also close the input streams. @@ -376,7 +387,7 @@ class MethodStreamManager { /// for the listen method because this cancels /// the stream before the onError callback has /// been called. - await subscription.cancel(); + unawaited(subscription.cancel()); }, ); @@ -571,21 +582,13 @@ class MethodStreamManager { } outputController.addStream(methodStream).whenComplete( - () async { - var streamKey = _buildStreamKey( - endpoint: methodStreamCallContext.fullEndpointPath, - method: methodStreamCallContext.method.name, - methodStreamId: methodStreamId, + // The stream complete message is sent as an error to circumvent + // branching when passing along stream events to the the handler. + () => outputController.addError(_StreamComplete()), ); - - var closeReasonIsNotAlreadySetToError = - _outputStreamContexts[streamKey]?.closeReason != CloseReason.error; - if (closeReasonIsNotAlreadySetToError) { - _updateCloseReason(streamKey, CloseReason.done); - } - - await subscription.cancel(); - }, - ); } } + +/// Passed as the last message on a stream to indicate that the stream is +/// complete and no more messages will be sent from the endpoint. +class _StreamComplete {}