Unverified Commit 1d4e7cd5 authored by Jonah Williams's avatar Jonah Williams Committed by GitHub

[flutter_tools] do not add events to closed sink in throttle transform (#66468)

The throttle duration could delay past the point where the destination sink was closed. Check if it is closed before adding an event. Fixes a crash on dev: StateError: Bad State: Stream is already closed.
parent fc1e7642
...@@ -234,6 +234,7 @@ StreamTransformer<S, S> _throttle<S>({ ...@@ -234,6 +234,7 @@ StreamTransformer<S, S> _throttle<S>({
S latestLine; S latestLine;
int lastExecution; int lastExecution;
Future<void> throttleFuture; Future<void> throttleFuture;
bool done = false;
return StreamTransformer<S, S> return StreamTransformer<S, S>
.fromHandlers( .fromHandlers(
...@@ -249,14 +250,20 @@ StreamTransformer<S, S> _throttle<S>({ ...@@ -249,14 +250,20 @@ StreamTransformer<S, S> _throttle<S>({
final int nextExecutionTime = isFirstMessage || remainingTime > waitDuration.inMilliseconds final int nextExecutionTime = isFirstMessage || remainingTime > waitDuration.inMilliseconds
? 0 ? 0
: waitDuration.inMilliseconds - remainingTime; : waitDuration.inMilliseconds - remainingTime;
throttleFuture ??= Future<void> throttleFuture ??= Future<void>
.delayed(Duration(milliseconds: nextExecutionTime)) .delayed(Duration(milliseconds: nextExecutionTime))
.whenComplete(() { .whenComplete(() {
if (done) {
return;
}
sink.add(latestLine); sink.add(latestLine);
throttleFuture = null; throttleFuture = null;
lastExecution = DateTime.now().millisecondsSinceEpoch; lastExecution = DateTime.now().millisecondsSinceEpoch;
}); });
},
handleDone: (EventSink<S> sink) {
done = true;
sink.close();
} }
); );
} }
...@@ -195,6 +195,18 @@ void main() { ...@@ -195,6 +195,18 @@ void main() {
expect('$uri', 'http://127.0.0.1:12345/PTwjm8Ii8qg=/'); expect('$uri', 'http://127.0.0.1:12345/PTwjm8Ii8qg=/');
}); });
testUsingContext('protocol discovery does not crash if the log reader is closed while delaying', () async {
initialize(devicePort: 12346, throttleDuration: const Duration(milliseconds: 10));
final Future<List<Uri>> results = discoverer.uris.toList();
logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:12346/PTwjm8Ii8qg=/');
logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:12346/PTwjm8Ii8qg=/');
await logReader.dispose();
// Give time for throttle to finish.
await Future<void>.delayed(const Duration(milliseconds: 11));
expect(await results, isEmpty);
});
testUsingContext('uris in the stream are throttled', () async { testUsingContext('uris in the stream are throttled', () async {
const Duration kThrottleDuration = Duration(milliseconds: 10); const Duration kThrottleDuration = Duration(milliseconds: 10);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment