Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Use broadcast stream to prevent broken pipe. #41

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 20 additions & 22 deletions tools/serverpod_cli/test_e2e/lib/src/keyword_search_in_stream.dart
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
import 'dart:async';

/// Searches for keywords in a stream of strings.
/// The class expects a stream that is split by lines and utf8 decoded.
/// Searches the [onData] string for the [keywords].
///
/// The class will listen to the stream and set the [_found] flag to true
/// The class will search for the keywords and set the [_found] flag to true
/// if the keyword is found. If the keyword is not found within the timeout
/// period, the [_found] flag will be set to false.
///
/// The user can call [keywordFound] to wait for the keyword to be
/// found or the timeout to occur.
///
/// The user needs to call [startListen] to start listening to the stream.
/// The user needs to call [close] to stop listening to the stream.
/// The user needs to call [cancel] once search is completed to remove any
/// active timeouts and prevent future onces from being initialized.
class KeywordSearchInStream {
bool? _found;
final List<String> keywords;
final Stream<String> _stream;
late final StreamSubscription<String> _subscription;
final Duration timeout;
Timer? _timer;
bool _searching = true;

KeywordSearchInStream(
this._stream, {
KeywordSearchInStream({
this.timeout = const Duration(seconds: 30),
required this.keywords,
});
Expand All @@ -39,22 +36,23 @@ class KeywordSearchInStream {
return value;
}

void close() {
_timer?.cancel();
_subscription.cancel();
}
void onData(String data) {
print(data);

KeywordSearchInStream startListen() {
_subscription = _stream.listen((String output) {
if (keywords.contains(output.trim())) {
_found = true;
}
if (!_searching) {
return;
}

print(output);
_scheduleTimeout();
});
if (keywords.contains(data.trim())) {
_found = true;
}

return this;
_scheduleTimeout();
}

void cancel() {
_searching = false;
_timer?.cancel();
}

void _scheduleTimeout() {
Expand Down
71 changes: 34 additions & 37 deletions tools/serverpod_cli/test_e2e/test/generate_watch_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ void main() async {

late Process createProcess;
Process? generateWatch;
KeywordSearchInStream? generateStreamSearch;
KeywordSearchInStream generateStreamSearch = KeywordSearchInStream(
keywords: generateWatchCompletionKeywords,
);
setUp(() async {
// Create project
createProcess = await Process.start(
Expand All @@ -65,7 +67,7 @@ void main() async {
tearDown(() async {
createProcess.kill();
generateWatch?.kill();
generateStreamSearch?.close();
generateStreamSearch.cancel();

await Process.run(
'docker',
Expand All @@ -88,21 +90,20 @@ void main() async {
},
);

var stdoutStream = generateWatch!.stdout
generateStreamSearch = KeywordSearchInStream(
keywords: generateWatchCompletionKeywords,
);
generateWatch!.stdout
.transform(const Utf8Decoder())
.transform(const LineSplitter());
.transform(const LineSplitter())
.listen(generateStreamSearch.onData);
generateWatch!.stderr
.transform(const Utf8Decoder())
.transform(const LineSplitter())
.listen(print);

generateStreamSearch = KeywordSearchInStream(
stdoutStream,
keywords: generateWatchCompletionKeywords,
).startListen();

await expectLater(
generateStreamSearch?.keywordFound,
generateStreamSearch.keywordFound,
completion(isTrue),
reason:
'Initial code generation did not complete before timeout was reached.',
Expand All @@ -126,7 +127,7 @@ fields:
''', flush: true);

await expectLater(
generateStreamSearch?.keywordFound,
generateStreamSearch.keywordFound,
completion(isTrue),
reason:
'Incremental code generation did not complete before timeout was reached.',
Expand Down Expand Up @@ -168,7 +169,7 @@ fields:
''', flush: true);

await expectLater(
generateStreamSearch?.keywordFound,
generateStreamSearch.keywordFound,
completion(isTrue),
reason:
'Incremental code generation did not complete before timeout was reached.',
Expand All @@ -191,7 +192,7 @@ fields:
// Remove model file
protocolFile.deleteSync();
await expectLater(
generateStreamSearch?.keywordFound,
generateStreamSearch.keywordFound,
completion(isTrue),
reason:
'Incremental code generation did not complete before timeout was reached.',
Expand All @@ -214,7 +215,9 @@ fields:

late Process createProcess;
Process? generateWatch;
KeywordSearchInStream? generateStreamSearch;
KeywordSearchInStream generateStreamSearch = KeywordSearchInStream(
keywords: generateWatchCompletionKeywords,
);
setUp(() async {
// Create project
createProcess = await Process.start(
Expand All @@ -239,7 +242,7 @@ fields:
tearDown(() async {
createProcess.kill();
generateWatch?.kill();
generateStreamSearch?.close();
generateStreamSearch.cancel();

await Process.run(
'docker',
Expand All @@ -261,21 +264,17 @@ fields:
},
);

var stdoutStream = generateWatch!.stdout
generateWatch!.stdout
.transform(const Utf8Decoder())
.transform(const LineSplitter());
.transform(const LineSplitter())
.listen(generateStreamSearch.onData);
generateWatch!.stderr
.transform(const Utf8Decoder())
.transform(const LineSplitter())
.listen(print);

generateStreamSearch = KeywordSearchInStream(
stdoutStream,
keywords: generateWatchCompletionKeywords,
).startListen();

await expectLater(
generateStreamSearch?.keywordFound,
generateStreamSearch.keywordFound,
completion(isTrue),
reason:
'Initial code generation did not complete before timeout was reached.',
Expand Down Expand Up @@ -303,7 +302,7 @@ class TestEndpoint extends Endpoint {
''', flush: true);

await expectLater(
generateStreamSearch?.keywordFound,
generateStreamSearch.keywordFound,
completion(isTrue),
reason:
'Incremental code generation did not complete before timeout was reached.',
Expand Down Expand Up @@ -351,7 +350,7 @@ class TestEndpoint extends Endpoint {
''', flush: true);

await expectLater(
generateStreamSearch?.keywordFound,
generateStreamSearch.keywordFound,
completion(isTrue),
reason:
'Incremental code generation did not complete before timeout was reached.',
Expand All @@ -371,7 +370,7 @@ class TestEndpoint extends Endpoint {
endpointFile.deleteSync();

await expectLater(
generateStreamSearch?.keywordFound,
generateStreamSearch.keywordFound,
completion(isTrue),
reason:
'Incremental code generation did not complete before timeout was reached.',
Expand All @@ -396,7 +395,9 @@ class TestEndpoint extends Endpoint {

late Process createProcess;
Process? generateWatch;
KeywordSearchInStream? generateStreamSearch;
KeywordSearchInStream generateStreamSearch = KeywordSearchInStream(
keywords: generateWatchCompletionKeywords,
);
setUp(() async {
// Create project
createProcess = await Process.start(
Expand All @@ -421,7 +422,7 @@ class TestEndpoint extends Endpoint {
tearDown(() async {
createProcess.kill();
generateWatch?.kill();
generateStreamSearch?.close();
generateStreamSearch.cancel();

await Process.run(
'docker',
Expand Down Expand Up @@ -474,21 +475,17 @@ fields:
},
);

var stdoutStream = generateWatch!.stdout
generateWatch!.stdout
.transform(const Utf8Decoder())
.transform(const LineSplitter());
.transform(const LineSplitter())
.listen(generateStreamSearch.onData);
generateWatch!.stderr
.transform(const Utf8Decoder())
.transform(const LineSplitter())
.listen(print);

generateStreamSearch = KeywordSearchInStream(
stdoutStream,
keywords: generateWatchCompletionKeywords,
).startListen();

await expectLater(
generateStreamSearch?.keywordFound,
generateStreamSearch.keywordFound,
completion(isTrue),
reason:
'Initial code generation did not complete before timeout was reached.',
Expand Down Expand Up @@ -541,7 +538,7 @@ fields:
));

await expectLater(
generateStreamSearch?.keywordFound,
generateStreamSearch.keywordFound,
completion(isTrue),
reason:
'Incremental code generation did not complete before timeout was reached.',
Expand Down
Loading