Unverified Commit c8b38b20 authored by Lau Ching Jun's avatar Lau Ching Jun Committed by GitHub

Debounce the proxied connection over proxied devices. (#124540)

Debounce the proxied connection over proxied devices.
parent 697a8494
......@@ -25,6 +25,7 @@ import '../emulator.dart';
import '../features.dart';
import '../globals.dart' as globals;
import '../project.dart';
import '../proxied_devices/debounce_data_stream.dart';
import '../proxied_devices/file_transfer.dart';
import '../resident_runner.dart';
import '../run_cold.dart';
......@@ -1463,7 +1464,7 @@ class ProxyDomain extends Domain {
}
_forwardedConnections[id] = socket;
socket.listen((List<int> data) {
debounceDataStream(socket).listen((List<int> data) {
sendEvent('proxy.data.$id', null, data);
}, onError: (Object error, StackTrace stackTrace) {
// Socket error, probably disconnected.
......
// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
import 'dart:async';
import 'dart:typed_data';
/// Merges the values in a stream that are sent less than [duration] apart.
///
/// To minimize latency, the merged stream will always emit the first value that
/// is sent after a pause of at least [duration] long. After the first message,
/// all values that are sent within [duration] will be merged into one.
Stream<Uint8List> debounceDataStream(Stream<Uint8List> stream, [Duration duration = const Duration(milliseconds: 100)]) {
final StreamController<Uint8List> controller = StreamController<Uint8List>();
final BytesBuilder buffer = BytesBuilder(copy: false);
bool isDone = false;
Timer? timer;
// Called when timer triggers, sends out the buffered messages.
void onTimer() {
if (buffer.isNotEmpty) {
controller.add(buffer.toBytes());
buffer.clear();
if (isDone) {
controller.close();
} else {
// Start another timer even if we have nothing to send right now, so
// that outgoing messages are at least [duration] apart.
timer = Timer(duration, onTimer);
}
} else {
timer = null;
}
}
controller.onListen = () {
final StreamSubscription<Uint8List> subscription = stream.listen((Uint8List data) {
if (timer == null) {
controller.add(data);
// Start the timer to make sure that the next message is at least [duration] apart.
timer = Timer(duration, onTimer);
} else {
buffer.add(data);
}
}, onError: (Object error, StackTrace stackTrace) {
// Forward the error.
controller.addError(error, stackTrace);
}, onDone: () {
isDone = true;
// Delay closing the channel if we still have buffered data.
if (timer == null) {
controller.close();
}
});
controller.onCancel = () {
subscription.cancel();
};
};
return controller.stream;
}
......@@ -18,6 +18,7 @@ import '../daemon.dart';
import '../device.dart';
import '../device_port_forwarder.dart';
import '../project.dart';
import 'debounce_data_stream.dart';
import 'file_transfer.dart';
bool _isNullable<T>() => null is T;
......@@ -563,7 +564,7 @@ class ProxiedPortForwarder extends DevicePortForwarder {
// Do nothing here.
},
));
socket.listen((Uint8List data) {
debounceDataStream(socket).listen((Uint8List data) {
unawaited(connection.sendRequest('proxy.write', <String, Object>{
'id': id,
}, data).then(
......
// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
import 'dart:async';
import 'dart:typed_data';
import 'package:fake_async/fake_async.dart';
import 'package:flutter_tools/src/proxied_devices/debounce_data_stream.dart';
import '../../src/common.dart';
void main() {
group('debounceDataStreams', () {
late FakeAsync fakeAsync;
late StreamController<Uint8List> source;
late Stream<Uint8List> output;
const Duration debounceDuration = Duration(seconds: 10);
const Duration smallDuration = Duration(milliseconds: 10);
void addToSource(int value) {
source.add(Uint8List.fromList(<int>[value]));
}
setUp(() {
fakeAsync = FakeAsync();
fakeAsync.run((FakeAsync time) {
source = StreamController<Uint8List>();
output = debounceDataStream(source.stream, debounceDuration);
});
});
testWithoutContext('does not listen if returned stream is not listened to', () {
expect(source.hasListener, false);
output.listen(dummy);
expect(source.hasListener, true);
});
testWithoutContext('forwards data normally is all data if longer than duration apart', () {
fakeAsync.run((FakeAsync time) {
final List<Uint8List> outputItems = <Uint8List>[];
output.listen(outputItems.add);
addToSource(1);
time.elapse(debounceDuration + smallDuration);
addToSource(2);
time.elapse(debounceDuration + smallDuration);
addToSource(3);
time.elapse(debounceDuration + smallDuration);
expect(outputItems, <List<int>>[
<int>[1],
<int>[2],
<int>[3],
]);
});
});
testWithoutContext('merge data after the first if sent within duration', () {
fakeAsync.run((FakeAsync time) {
final List<Uint8List> outputItems = <Uint8List>[];
output.listen(outputItems.add);
addToSource(1);
time.elapse(smallDuration);
addToSource(2);
time.elapse(smallDuration);
addToSource(3);
time.elapse(debounceDuration + smallDuration);
expect(outputItems, <List<int>>[
<int>[1],
<int>[2, 3],
]);
});
});
testWithoutContext('output data in separate chunks if time between them is longer than duration', () {
fakeAsync.run((FakeAsync time) {
final List<Uint8List> outputItems = <Uint8List>[];
output.listen(outputItems.add);
addToSource(1);
time.elapse(smallDuration);
addToSource(2);
time.elapse(smallDuration);
addToSource(3);
time.elapse(debounceDuration + smallDuration);
addToSource(4);
time.elapse(smallDuration);
addToSource(5);
time.elapse(debounceDuration + smallDuration);
expect(outputItems, <List<int>>[
<int>[1],
<int>[2, 3],
<int>[4, 5],
]);
});
});
testWithoutContext('sends the last chunk after debounce duration', () {
fakeAsync.run((FakeAsync time) {
final List<Uint8List> outputItems = <Uint8List>[];
output.listen(outputItems.add);
addToSource(1);
time.flushMicrotasks();
expect(outputItems, <List<int>>[<int>[1]]);
time.elapse(smallDuration);
addToSource(2);
time.elapse(smallDuration);
addToSource(3);
expect(outputItems, <List<int>>[<int>[1]]);
time.elapse(debounceDuration + smallDuration);
expect(outputItems, <List<int>>[
<int>[1],
<int>[2, 3],
]);
});
});
testWithoutContext('close if source stream is closed', () {
fakeAsync.run((FakeAsync time) {
bool isDone = false;
output.listen(dummy, onDone: () => isDone = true);
expect(isDone, false);
source.close();
time.flushMicrotasks();
expect(isDone, true);
});
});
testWithoutContext('delay close until after last chunk is sent', () {
fakeAsync.run((FakeAsync time) {
final List<Uint8List> outputItems = <Uint8List>[];
bool isDone = false;
output.listen(outputItems.add, onDone: () => isDone = true);
addToSource(1);
time.flushMicrotasks();
expect(outputItems, <List<int>>[<int>[1]]);
addToSource(2);
source.close();
time.elapse(smallDuration);
expect(isDone, false);
expect(outputItems, <List<int>>[<int>[1]]);
time.elapse(debounceDuration + smallDuration);
expect(outputItems, <List<int>>[
<int>[1],
<int>[2],
]);
expect(isDone, true);
});
});
testWithoutContext('close if returned stream is closed', () {
fakeAsync.run((FakeAsync time) {
bool isCancelled = false;
source.onCancel = () => isCancelled = true;
final StreamSubscription<Uint8List> subscription = output.listen(dummy);
expect(isCancelled, false);
subscription.cancel();
expect(isCancelled, true);
});
});
});
}
Uint8List dummy(Uint8List data) => data;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment