Skip to content

Commit

Permalink
fix: Correct type mismatch in onTimeout callback for subscription can…
Browse files Browse the repository at this point in the history
…cellation in MethodStreamManager closeAllStreams() (serverpod#2805)

Co-authored-by: Hampus Lavin <hampus.lavin@gmail.com>
  • Loading branch information
KristijanZic and hampuslavin authored Oct 4, 2024
1 parent d93a323 commit 830108f
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,12 @@ class MethodStreamManager {
var closeSubscriptionFutures = outboundStreamContexts.map(
(c) => c.subscription.cancel().timeout(
_closeTimeout,
onTimeout: () => c.controller.onCancel?.call(),
onTimeout: () async {
await c.controller.onCancel?.call();
return null;
// This type case is needed to avoid a runtime exception
// Filed as bug on dart-lang/sdk: https://github.com/dart-lang/sdk/issues/56846
} as Future<Null> Function()?,
),
);

Expand Down
4 changes: 4 additions & 0 deletions packages/serverpod_test/lib/src/test_stream_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ class TestStreamManager<OutputStreamType> {
);

GlobalStreamManager.add(_streamManager);

outputStreamController.onCancel = () async {
await _streamManager.closeAllStreams();
};
}

/// Inititates a stream method call which opens all needed streams.
Expand Down
22 changes: 22 additions & 0 deletions tests/serverpod_test_client/lib/src/protocol/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -1517,6 +1517,28 @@ class EndpointMethodStreaming extends _i1.EndpointRef {
{'stream': stream},
);

_i2.Stream<int?> getBroadcastStream() =>
caller.callStreamingServerEndpoint<_i2.Stream<int?>, int?>(
'methodStreaming',
'getBroadcastStream',
{},
{},
);

_i2.Future<bool> wasBroadcastStreamCanceled() =>
caller.callServerEndpoint<bool>(
'methodStreaming',
'wasBroadcastStreamCanceled',
{},
);

_i2.Future<bool> wasSessionWillCloseListenerCalled() =>
caller.callServerEndpoint<bool>(
'methodStreaming',
'wasSessionWillCloseListenerCalled',
{},
);

_i2.Stream<int> intStreamFromValue(int value) =>
caller.callStreamingServerEndpoint<_i2.Stream<int>, int>(
'methodStreaming',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,38 @@ class MethodStreaming extends Endpoint {
return stream.first;
}

static const cancelStreamChannelName = 'cancelStreamChannel';
static const sessionClosedChannelName = 'sessionClosedChannel';
Stream<int?> getBroadcastStream(Session session) {
session.addWillCloseListener((localSession) {
localSession.messages
.postMessage(sessionClosedChannelName, SimpleData(num: 1));
});
var stream = StreamController<int?>.broadcast(
onCancel: () {
session.messages
.postMessage(cancelStreamChannelName, SimpleData(num: 1));
},
);
return stream.stream;
}

Future<bool> wasBroadcastStreamCanceled(Session session) async {
var streamWasCanceled = Completer<bool>();
session.messages.addListener(cancelStreamChannelName, (data) {
streamWasCanceled.complete(true);
});
return streamWasCanceled.future;
}

Future<bool> wasSessionWillCloseListenerCalled(Session session) async {
var sessionWillCloseListenerWasCalled = Completer<bool>();
session.messages.addListener(sessionClosedChannelName, (data) {
sessionWillCloseListenerWasCalled.complete(true);
});
return sessionWillCloseListenerWasCalled.future;
}

Stream<int> intStreamFromValue(Session session, int value) async* {
for (var i in List.generate(value, (index) => index)) {
yield i;
Expand Down
33 changes: 33 additions & 0 deletions tests/serverpod_test_server/lib/src/generated/endpoints.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3345,6 +3345,26 @@ class Endpoints extends _i1.EndpointDispatch {
(endpoints['methodStreaming'] as _i23.MethodStreaming)
.methodCallEndpoint(session),
),
'wasBroadcastStreamCanceled': _i1.MethodConnector(
name: 'wasBroadcastStreamCanceled',
params: {},
call: (
_i1.Session session,
Map<String, dynamic> params,
) async =>
(endpoints['methodStreaming'] as _i23.MethodStreaming)
.wasBroadcastStreamCanceled(session),
),
'wasSessionWillCloseListenerCalled': _i1.MethodConnector(
name: 'wasSessionWillCloseListenerCalled',
params: {},
call: (
_i1.Session session,
Map<String, dynamic> params,
) async =>
(endpoints['methodStreaming'] as _i23.MethodStreaming)
.wasSessionWillCloseListenerCalled(session),
),
'simpleEndpoint': _i1.MethodConnector(
name: 'simpleEndpoint',
params: {},
Expand Down Expand Up @@ -3499,6 +3519,19 @@ class Endpoints extends _i1.EndpointDispatch {
streamParams['stream']!.cast<int?>(),
),
),
'getBroadcastStream': _i1.MethodStreamConnector(
name: 'getBroadcastStream',
params: {},
streamParams: {},
returnType: _i1.MethodStreamReturnType.streamType,
call: (
_i1.Session session,
Map<String, dynamic> params,
Map<String, Stream> streamParams,
) =>
(endpoints['methodStreaming'] as _i23.MethodStreaming)
.getBroadcastStream(session),
),
'intStreamFromValue': _i1.MethodStreamConnector(
name: 'intStreamFromValue',
params: {
Expand Down
3 changes: 3 additions & 0 deletions tests/serverpod_test_server/lib/src/generated/protocol.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ methodStreaming:
- methodCallEndpoint:
- intReturnFromStream:
- nullableIntReturnFromStream:
- getBroadcastStream:
- wasBroadcastStreamCanceled:
- wasSessionWillCloseListenerCalled:
- intStreamFromValue:
- intEchoStream:
- dynamicEchoStream:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import 'package:serverpod/serverpod.dart';
import 'package:test/test.dart';

import 'test_tools/serverpod_test_tools.dart';

void main() {
withServerpod(
'Given call to MethodStreamingEndpoint',
(endpoints, session) {
test(
'when calling an endpoint returning a non-broadcast stream and cancelling '
'then will cancel', () async {
var stream = endpoints.methodStreaming.intStreamFromValue(session, 1);
var subscription = stream.listen((event) {});

await expectLater(subscription.cancel(), completes);
});

test(
'when calling an endpoint returning a broadcast stream and cancelling '
'then it should cancel and trigger the onCancel hook on the stream controller',
() async {
var wasStreamCancelled =
endpoints.methodStreaming.wasBroadcastStreamCanceled(session);

var stream = endpoints.methodStreaming.getBroadcastStream(session);
// Wait for the stream to be created, otherwise cancel is called before creation
await flushMicrotasks();

var subscription = stream.listen((event) {});
await subscription.cancel();

await expectLater(
wasStreamCancelled,
completion(true),
);
});

test(
'when calling an endpoint returning a broadcast stream and cancelling '
'then it should close the session and call its will close listener',
() async {
var wasSessionWillCloseListenerCalled = endpoints.methodStreaming
.wasSessionWillCloseListenerCalled(session);

var stream = endpoints.methodStreaming.getBroadcastStream(session);
// Wait for the stream to be created, otherwise cancel is called before creation
await flushMicrotasks();

var subscription = stream.listen((event) {});
await subscription.cancel();

await expectLater(
wasSessionWillCloseListenerCalled,
completion(true),
);
});
},
runMode: ServerpodRunMode.production,
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4398,6 +4398,76 @@ class _MethodStreaming {
});
}

_i3.Stream<int?> getBroadcastStream(_i1.TestSession session) {
var _localTestStreamManager = _i1.TestStreamManager<int?>();
_i1.callStreamFunctionAndHandleExceptions(
() async {
var _localUniqueSession =
(await (session as _i1.InternalTestSession).copyWith(
endpoint: 'methodStreaming',
method: 'getBroadcastStream',
) as _i1.InternalTestSession);
var _localCallContext =
await _endpointDispatch.getMethodStreamCallContext(
createSessionCallback: (_) => _localUniqueSession.serverpodSession,
endpointPath: 'methodStreaming',
methodName: 'getBroadcastStream',
arguments: {},
requestedInputStreams: [],
serializationManager: _serializationManager,
);
await _localTestStreamManager.callStreamMethod(
_localCallContext,
_localUniqueSession.serverpodSession,
{},
);
},
_localTestStreamManager.outputStreamController,
);
return _localTestStreamManager.outputStreamController.stream;
}

_i3.Future<bool> wasBroadcastStreamCanceled(_i1.TestSession session) async {
return _i1.callAwaitableFunctionAndHandleExceptions(() async {
var _localUniqueSession = ((session as _i1.InternalTestSession).copyWith(
endpoint: 'methodStreaming',
method: 'wasBroadcastStreamCanceled',
) as _i1.InternalTestSession);
var _localCallContext = await _endpointDispatch.getMethodCallContext(
createSessionCallback: (_) => _localUniqueSession.serverpodSession,
endpointPath: 'methodStreaming',
methodName: 'wasBroadcastStreamCanceled',
parameters: {},
serializationManager: _serializationManager,
);
return (_localCallContext.method.call(
_localUniqueSession.serverpodSession,
_localCallContext.arguments,
) as _i3.Future<bool>);
});
}

_i3.Future<bool> wasSessionWillCloseListenerCalled(
_i1.TestSession session) async {
return _i1.callAwaitableFunctionAndHandleExceptions(() async {
var _localUniqueSession = ((session as _i1.InternalTestSession).copyWith(
endpoint: 'methodStreaming',
method: 'wasSessionWillCloseListenerCalled',
) as _i1.InternalTestSession);
var _localCallContext = await _endpointDispatch.getMethodCallContext(
createSessionCallback: (_) => _localUniqueSession.serverpodSession,
endpointPath: 'methodStreaming',
methodName: 'wasSessionWillCloseListenerCalled',
parameters: {},
serializationManager: _serializationManager,
);
return (_localCallContext.method.call(
_localUniqueSession.serverpodSession,
_localCallContext.arguments,
) as _i3.Future<bool>);
});
}

_i3.Stream<int> intStreamFromValue(
_i1.TestSession session,
int value,
Expand Down

0 comments on commit 830108f

Please sign in to comment.