Skip to content

Commit

Permalink
fix: Introduce end of stream message to close stream gracefully. (ser…
Browse files Browse the repository at this point in the history
  • Loading branch information
SandPod authored Dec 6, 2024
1 parent 98a2674 commit 0ecfd23
Showing 1 changed file with 27 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -351,32 +351,43 @@ class MethodStreamManager {
await session.close();
});

var streamKey = _buildStreamKey(
endpoint: methodStreamCallContext.fullEndpointPath,
method: methodStreamCallContext.method.name,
methodStreamId: methodStreamId,
);

late StreamSubscription subscription;
subscription = outputController.stream.listen(
(value) {
(value) async {
_onOutputStreamValue?.call(
methodStreamId, value, methodStreamCallContext);
},
onError: (e, s) async {
onError: (e, s) {
// All method calls that return futures are unawaited to ensure that
// the calls are invoked synchronously. If an 'await' is added
// here, processing new messages might be initiated before the
// subscription is canceled.
if (e is _StreamComplete) {
_updateCloseReason(streamKey, CloseReason.done);
unawaited(subscription.cancel());
return;
}

_onOutputStreamError?.call(
methodStreamId, e, s, methodStreamCallContext);

var streamKey = _buildStreamKey(
endpoint: methodStreamCallContext.fullEndpointPath,
method: methodStreamCallContext.method.name,
methodStreamId: methodStreamId,
);
_updateCloseReason(streamKey, CloseReason.error);

await session.close(error: e, stackTrace: s);
unawaited(session.close(error: e, stackTrace: s));

/// Required to close stream when error occurs.
/// This will also close the input streams.
/// We can't use the "cancelOnError" option
/// for the listen method because this cancels
/// the stream before the onError callback has
/// been called.
await subscription.cancel();
unawaited(subscription.cancel());
},
);

Expand Down Expand Up @@ -571,21 +582,13 @@ class MethodStreamManager {
}

outputController.addStream(methodStream).whenComplete(
() async {
var streamKey = _buildStreamKey(
endpoint: methodStreamCallContext.fullEndpointPath,
method: methodStreamCallContext.method.name,
methodStreamId: methodStreamId,
// The stream complete message is sent as an error to circumvent
// branching when passing along stream events to the the handler.
() => outputController.addError(_StreamComplete()),
);

var closeReasonIsNotAlreadySetToError =
_outputStreamContexts[streamKey]?.closeReason != CloseReason.error;
if (closeReasonIsNotAlreadySetToError) {
_updateCloseReason(streamKey, CloseReason.done);
}

await subscription.cancel();
},
);
}
}

/// Passed as the last message on a stream to indicate that the stream is
/// complete and no more messages will be sent from the endpoint.
class _StreamComplete {}

0 comments on commit 0ecfd23

Please sign in to comment.