Unverified Commit bccc9eac authored by Hannes Winkler's avatar Hannes Winkler Committed by GitHub

Fix a stream lifecycle bug in CustomDeviceLogReader (#94310)

parent e147fae8
......@@ -4,6 +4,7 @@
import 'dart:async';
import 'package:meta/meta.dart';
import 'package:process/process.dart';
import '../application_package.dart';
......@@ -53,7 +54,11 @@ class CustomDeviceLogReader extends DeviceLogReader {
@override
final String name;
final StreamController<String> _logLinesController = StreamController<String>.broadcast();
@visibleForTesting
final StreamController<String> logLinesController = StreamController<String>.broadcast();
@visibleForTesting
final List<StreamSubscription<String>> subscriptions = <StreamSubscription<String>>[];
/// Listen to [process]' stdout and stderr, decode them using [SystemEncoding]
/// and add each decoded line to [logLines].
......@@ -66,13 +71,17 @@ class CustomDeviceLogReader extends DeviceLogReader {
void listenToProcessOutput(Process process, {Encoding encoding = systemEncoding}) {
final Converter<List<int>, String> decoder = encoding.decoder;
process.stdout.transform<String>(decoder)
.transform<String>(const LineSplitter())
.listen(_logLinesController.add);
subscriptions.add(
process.stdout.transform<String>(decoder)
.transform<String>(const LineSplitter())
.listen(logLinesController.add),
);
process.stderr.transform<String>(decoder)
.transform<String>(const LineSplitter())
.listen(_logLinesController.add);
subscriptions.add(
process.stderr.transform<String>(decoder)
.transform<String>(const LineSplitter())
.listen(logLinesController.add)
);
}
/// Add all lines emitted by [lines] to this [CustomDeviceLogReader]s [logLines]
......@@ -83,18 +92,28 @@ class CustomDeviceLogReader extends DeviceLogReader {
///
/// Useful when you want to combine the contents of multiple log readers.
void listenToLinesStream(Stream<String> lines) {
_logLinesController.addStream(lines);
subscriptions.add(
lines.listen(logLinesController.add)
);
}
/// Dispose this log reader, freeing all associated resources and marking
/// [logLines] as done.
@override
void dispose() {
_logLinesController.close();
Future<void> dispose() async {
final List<Future<void>> futures = <Future<void>>[];
for (final StreamSubscription<String> subscription in subscriptions) {
futures.add(subscription.cancel());
}
futures.add(logLinesController.close());
await Future.wait(futures);
}
@override
Stream<String> get logLines => _logLinesController.stream;
Stream<String> get logLines => logLinesController.stream;
}
/// A [DevicePortForwarder] that uses commands to forward / unforward a port.
......@@ -170,8 +189,8 @@ class CustomDevicePortForwarder extends DevicePortForwarder {
}));
unawaited(completer.future.whenComplete(() {
logLinesSubscription.cancel();
reader.dispose();
unawaited(logLinesSubscription.cancel());
unawaited(reader.dispose());
}));
return completer.future;
......@@ -440,7 +459,7 @@ class CustomDeviceAppSession {
_process = null;
}
logReader.dispose();
unawaited(logReader.dispose());
}
}
......
......@@ -587,6 +587,54 @@ void main() {
expect(await device.targetPlatform, TargetPlatform.linux_x64);
});
testWithoutContext('CustomDeviceLogReader cancels subscriptions before closing logLines stream', () async {
final CustomDeviceLogReader logReader = CustomDeviceLogReader('testname');
final Iterable<List<int>> lines = Iterable<List<int>>.generate(5, (int _) => utf8.encode('test'));
logReader.listenToProcessOutput(
FakeProcess(
exitCode: Future<int>.value(0),
stdout: Stream<List<int>>.fromIterable(lines),
stderr: Stream<List<int>>.fromIterable(lines),
),
);
final List<MyFakeStreamSubscription<String>> subscriptions = <MyFakeStreamSubscription<String>>[];
bool logLinesStreamDone = false;
logReader.logLines.listen((_) {}, onDone: () {
expect(subscriptions, everyElement((MyFakeStreamSubscription<String> s) => s.canceled));
logLinesStreamDone = true;
});
logReader.subscriptions.replaceRange(
0,
logReader.subscriptions.length,
logReader.subscriptions.map(
(StreamSubscription<String> e) => MyFakeStreamSubscription<String>(e)
),
);
subscriptions.addAll(logReader.subscriptions.cast());
await logReader.dispose();
expect(logLinesStreamDone, true);
});
}
class MyFakeStreamSubscription<T> extends Fake implements StreamSubscription<T> {
MyFakeStreamSubscription(this.parent);
StreamSubscription<T> parent;
bool canceled = false;
@override
Future<void> cancel() {
canceled = true;
return parent.cancel();
}
}
class FakeBundleBuilder extends Fake implements BundleBuilder {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment