Unverified Commit 1531ef60 authored by Andrew Davies's avatar Andrew Davies Committed by GitHub

[frdp] Add configurable timeouts for VM. (#22893)

This adds configurable timeouts for the Dart VM. Due to some testing
machines running things quite slowly, this is becoming more necessary.
parent 9c01c2b8
...@@ -18,9 +18,12 @@ const Duration _kRpcTimeout = Duration(seconds: 5); ...@@ -18,9 +18,12 @@ const Duration _kRpcTimeout = Duration(seconds: 5);
final Logger _log = Logger('DartVm'); final Logger _log = Logger('DartVm');
/// Signature of an asynchronous function for astablishing a JSON RPC-2 /// Signature of an asynchronous function for establishing a JSON RPC-2
/// connection to a [Uri]. /// connection to a [Uri].
typedef RpcPeerConnectionFunction = Future<json_rpc.Peer> Function(Uri uri); typedef RpcPeerConnectionFunction = Future<json_rpc.Peer> Function(
Uri uri, {
Duration timeout,
});
/// [DartVm] uses this function to connect to the Dart VM on Fuchsia. /// [DartVm] uses this function to connect to the Dart VM on Fuchsia.
/// ///
...@@ -30,16 +33,18 @@ RpcPeerConnectionFunction fuchsiaVmServiceConnectionFunction = _waitAndConnect; ...@@ -30,16 +33,18 @@ RpcPeerConnectionFunction fuchsiaVmServiceConnectionFunction = _waitAndConnect;
/// Attempts to connect to a Dart VM service. /// Attempts to connect to a Dart VM service.
/// ///
/// Gives up after `_kConnectTimeout` has elapsed. /// Gives up after `timeout` has elapsed.
Future<json_rpc.Peer> _waitAndConnect(Uri uri) async { Future<json_rpc.Peer> _waitAndConnect(
Uri uri, {
Duration timeout = _kConnectTimeout,
}) async {
final Stopwatch timer = Stopwatch()..start(); final Stopwatch timer = Stopwatch()..start();
Future<json_rpc.Peer> attemptConnection(Uri uri) async { Future<json_rpc.Peer> attemptConnection(Uri uri) async {
WebSocket socket; WebSocket socket;
json_rpc.Peer peer; json_rpc.Peer peer;
try { try {
socket = socket = await WebSocket.connect(uri.toString()).timeout(timeout);
await WebSocket.connect(uri.toString()).timeout(_kConnectTimeout);
peer = json_rpc.Peer(IOWebSocketChannel(socket).cast())..listen(); peer = json_rpc.Peer(IOWebSocketChannel(socket).cast())..listen();
return peer; return peer;
} on HttpException catch (e) { } on HttpException catch (e) {
...@@ -53,7 +58,7 @@ Future<json_rpc.Peer> _waitAndConnect(Uri uri) async { ...@@ -53,7 +58,7 @@ Future<json_rpc.Peer> _waitAndConnect(Uri uri) async {
// Other unknown errors will be handled with reconnects. // Other unknown errors will be handled with reconnects.
await peer?.close(); await peer?.close();
await socket?.close(); await socket?.close();
if (timer.elapsed < _kConnectTimeout) { if (timer.elapsed < timeout) {
_log.info('Attempting to reconnect'); _log.info('Attempting to reconnect');
await Future<void>.delayed(_kReconnectAttemptInterval); await Future<void>.delayed(_kReconnectAttemptInterval);
return attemptConnection(uri); return attemptConnection(uri);
...@@ -105,11 +110,15 @@ class DartVm { ...@@ -105,11 +110,15 @@ class DartVm {
/// Attempts to connect to the given [Uri]. /// Attempts to connect to the given [Uri].
/// ///
/// Throws an error if unable to connect. /// Throws an error if unable to connect.
static Future<DartVm> connect(Uri uri) async { static Future<DartVm> connect(
Uri uri, {
Duration timeout = _kConnectTimeout,
}) async {
if (uri.scheme == 'http') { if (uri.scheme == 'http') {
uri = uri.replace(scheme: 'ws', path: '/ws'); uri = uri.replace(scheme: 'ws', path: '/ws');
} }
final json_rpc.Peer peer = await fuchsiaVmServiceConnectionFunction(uri); final json_rpc.Peer peer =
await fuchsiaVmServiceConnectionFunction(uri, timeout: timeout);
if (peer == null) { if (peer == null) {
return null; return null;
} }
......
...@@ -20,6 +20,8 @@ const ProcessManager _processManager = LocalProcessManager(); ...@@ -20,6 +20,8 @@ const ProcessManager _processManager = LocalProcessManager();
const Duration _kIsolateFindTimeout = Duration(minutes: 1); const Duration _kIsolateFindTimeout = Duration(minutes: 1);
const Duration _kDartVmConnectionTimeout = Duration(seconds: 9);
const Duration _kVmPollInterval = Duration(milliseconds: 1500); const Duration _kVmPollInterval = Duration(milliseconds: 1500);
final Logger _log = Logger('FuchsiaRemoteConnection'); final Logger _log = Logger('FuchsiaRemoteConnection');
...@@ -246,6 +248,7 @@ class FuchsiaRemoteConnection { ...@@ -246,6 +248,7 @@ class FuchsiaRemoteConnection {
Future<List<IsolateRef>> _waitForMainIsolatesByPattern([ Future<List<IsolateRef>> _waitForMainIsolatesByPattern([
Pattern pattern, Pattern pattern,
Duration timeout = _kIsolateFindTimeout, Duration timeout = _kIsolateFindTimeout,
Duration vmConnectionTimeout = _kDartVmConnectionTimeout,
]) async { ]) async {
final Completer<List<IsolateRef>> completer = Completer<List<IsolateRef>>(); final Completer<List<IsolateRef>> completer = Completer<List<IsolateRef>>();
_onDartVmEvent.listen( _onDartVmEvent.listen(
...@@ -253,7 +256,8 @@ class FuchsiaRemoteConnection { ...@@ -253,7 +256,8 @@ class FuchsiaRemoteConnection {
if (event.eventType == DartVmEventType.started) { if (event.eventType == DartVmEventType.started) {
_log.fine('New VM found on port: ${event.servicePort}. Searching ' _log.fine('New VM found on port: ${event.servicePort}. Searching '
'for Isolate: $pattern'); 'for Isolate: $pattern');
final DartVm vmService = await _getDartVm(event.uri.port); final DartVm vmService = await _getDartVm(event.uri.port,
timeout: _kDartVmConnectionTimeout);
// If the VM service is null, set the result to the empty list. // If the VM service is null, set the result to the empty list.
final List<IsolateRef> result = await vmService final List<IsolateRef> result = await vmService
?.getMainIsolatesByPattern(pattern, timeout: timeout) ?? ?.getMainIsolatesByPattern(pattern, timeout: timeout) ??
...@@ -284,29 +288,33 @@ class FuchsiaRemoteConnection { ...@@ -284,29 +288,33 @@ class FuchsiaRemoteConnection {
/// either `timeout` is reached, or a Dart VM starts up with a name that /// either `timeout` is reached, or a Dart VM starts up with a name that
/// matches `pattern`. /// matches `pattern`.
Future<List<IsolateRef>> getMainIsolatesByPattern( Future<List<IsolateRef>> getMainIsolatesByPattern(
Pattern pattern, [ Pattern pattern, {
Duration timeout = _kIsolateFindTimeout, Duration timeout = _kIsolateFindTimeout,
]) async { Duration vmConnectionTimeout = _kDartVmConnectionTimeout,
}) async {
// If for some reason there are no Dart VM's that are alive, wait for one to // If for some reason there are no Dart VM's that are alive, wait for one to
// start with the Isolate in question. // start with the Isolate in question.
if (_dartVmPortMap.isEmpty) { if (_dartVmPortMap.isEmpty) {
_log.fine('No live Dart VMs found. Awaiting new VM startup'); _log.fine('No live Dart VMs found. Awaiting new VM startup');
return _waitForMainIsolatesByPattern(pattern, timeout); return _waitForMainIsolatesByPattern(
pattern, timeout, vmConnectionTimeout);
} }
// Accumulate a list of eventual IsolateRef lists so that they can be loaded // Accumulate a list of eventual IsolateRef lists so that they can be loaded
// simultaneously via Future.wait. // simultaneously via Future.wait.
final List<Future<List<IsolateRef>>> isolates = final List<Future<List<IsolateRef>>> isolates =
<Future<List<IsolateRef>>>[]; <Future<List<IsolateRef>>>[];
for (PortForwarder fp in _dartVmPortMap.values) { for (PortForwarder fp in _dartVmPortMap.values) {
final DartVm vmService = await _getDartVm(fp.port).timeout(timeout); final DartVm vmService =
await _getDartVm(fp.port, timeout: vmConnectionTimeout);
if (vmService == null) { if (vmService == null) {
continue; continue;
} }
isolates.add(vmService.getMainIsolatesByPattern(pattern)); isolates.add(vmService.getMainIsolatesByPattern(pattern));
} }
final List<IsolateRef> result = await Future.wait<List<IsolateRef>>(isolates) final List<IsolateRef> result =
.timeout(timeout) await Future.wait<List<IsolateRef>>(isolates)
.then<List<IsolateRef>>((List<List<IsolateRef>> listOfLists) { .timeout(timeout)
.then<List<IsolateRef>>((List<List<IsolateRef>> listOfLists) {
final List<List<IsolateRef>> mutableListOfLists = final List<List<IsolateRef>> mutableListOfLists =
List<List<IsolateRef>>.from(listOfLists) List<List<IsolateRef>>.from(listOfLists)
..retainWhere((List<IsolateRef> list) => list.isNotEmpty); ..retainWhere((List<IsolateRef> list) => list.isNotEmpty);
...@@ -330,7 +338,8 @@ class FuchsiaRemoteConnection { ...@@ -330,7 +338,8 @@ class FuchsiaRemoteConnection {
// TODO(awdavies): Set this up to handle multiple Isolates per Dart VM. // TODO(awdavies): Set this up to handle multiple Isolates per Dart VM.
if (result.isEmpty) { if (result.isEmpty) {
_log.fine('No instance of the Isolate found. Awaiting new VM startup'); _log.fine('No instance of the Isolate found. Awaiting new VM startup');
return _waitForMainIsolatesByPattern(pattern, timeout); return _waitForMainIsolatesByPattern(
pattern, timeout, vmConnectionTimeout);
} }
return result; return result;
} }
...@@ -407,13 +416,17 @@ class FuchsiaRemoteConnection { ...@@ -407,13 +416,17 @@ class FuchsiaRemoteConnection {
/// ///
/// Returns null if either there is an [HttpException] or a /// Returns null if either there is an [HttpException] or a
/// [TimeoutException], else a [DartVm] instance. /// [TimeoutException], else a [DartVm] instance.
Future<DartVm> _getDartVm(int port) async { Future<DartVm> _getDartVm(
int port, {
Duration timeout = _kDartVmConnectionTimeout,
}) async {
if (!_dartVmCache.containsKey(port)) { if (!_dartVmCache.containsKey(port)) {
// When raising an HttpException this means that there is no instance of // When raising an HttpException this means that there is no instance of
// the Dart VM to communicate with. The TimeoutException is raised when // the Dart VM to communicate with. The TimeoutException is raised when
// the Dart VM instance is shut down in the middle of communicating. // the Dart VM instance is shut down in the middle of communicating.
try { try {
final DartVm dartVm = await DartVm.connect(_getDartVmUri(port)); final DartVm dartVm =
await DartVm.connect(_getDartVmUri(port), timeout: timeout);
_dartVmCache[port] = dartVm; _dartVmCache[port] = dartVm;
} on HttpException { } on HttpException {
_log.warning('HTTP Exception encountered connecting to new VM'); _log.warning('HTTP Exception encountered connecting to new VM');
...@@ -460,7 +473,7 @@ class FuchsiaRemoteConnection { ...@@ -460,7 +473,7 @@ class FuchsiaRemoteConnection {
(DartVm vmService) async { (DartVm vmService) async {
final Map<String, dynamic> res = final Map<String, dynamic> res =
await vmService.invokeRpc('getVersion'); await vmService.invokeRpc('getVersion');
_log.fine('DartVM version check result: $res'); _log.fine('DartVM(${vmService.uri}) version check result: $res');
return res; return res;
}, },
queueEvents, queueEvents,
...@@ -475,7 +488,8 @@ class FuchsiaRemoteConnection { ...@@ -475,7 +488,8 @@ class FuchsiaRemoteConnection {
await stop(); await stop();
final List<int> servicePorts = await getDeviceServicePorts(); final List<int> servicePorts = await getDeviceServicePorts();
final List<PortForwarder> forwardedVmServicePorts = final List<PortForwarder> forwardedVmServicePorts =
await Future.wait<PortForwarder>(servicePorts.map<Future<PortForwarder>>((int deviceServicePort) { await Future.wait<PortForwarder>(
servicePorts.map<Future<PortForwarder>>((int deviceServicePort) {
return fuchsiaPortForwardingFunction( return fuchsiaPortForwardingFunction(
_sshCommandRunner.address, _sshCommandRunner.address,
deviceServicePort, deviceServicePort,
......
...@@ -22,8 +22,7 @@ void main() { ...@@ -22,8 +22,7 @@ void main() {
mockRunner = MockSshCommandRunner(); mockRunner = MockSshCommandRunner();
// Adds some extra junk to make sure the strings will be cleaned up. // Adds some extra junk to make sure the strings will be cleaned up.
when(mockRunner.run(any)).thenAnswer((_) => when(mockRunner.run(any)).thenAnswer((_) =>
Future<List<String>>.value( Future<List<String>>.value(<String>['123\n\n\n', '456 ', '789']));
<String>['123\n\n\n', '456 ', '789']));
const String address = 'fe80::8eae:4cff:fef4:9247'; const String address = 'fe80::8eae:4cff:fef4:9247';
const String interface = 'eno1'; const String interface = 'eno1';
when(mockRunner.address).thenReturn(address); when(mockRunner.address).thenReturn(address);
...@@ -86,7 +85,10 @@ void main() { ...@@ -86,7 +85,10 @@ void main() {
mockPeerConnections = <MockPeer>[]; mockPeerConnections = <MockPeer>[];
uriConnections = <Uri>[]; uriConnections = <Uri>[];
Future<json_rpc.Peer> mockVmConnectionFunction(Uri uri) { Future<json_rpc.Peer> mockVmConnectionFunction(
Uri uri, {
Duration timeout,
}) {
return Future<json_rpc.Peer>(() async { return Future<json_rpc.Peer>(() async {
final MockPeer mp = MockPeer(); final MockPeer mp = MockPeer();
mockPeerConnections.add(mp); mockPeerConnections.add(mp);
......
...@@ -17,7 +17,10 @@ void main() { ...@@ -17,7 +17,10 @@ void main() {
}); });
test('null connector', () async { test('null connector', () async {
Future<json_rpc.Peer> mockServiceFunction(Uri uri) { Future<json_rpc.Peer> mockServiceFunction(
Uri uri, {
Duration timeout,
}) {
return Future<json_rpc.Peer>(() => null); return Future<json_rpc.Peer>(() => null);
} }
...@@ -28,7 +31,10 @@ void main() { ...@@ -28,7 +31,10 @@ void main() {
test('disconnect closes peer', () async { test('disconnect closes peer', () async {
final MockPeer peer = MockPeer(); final MockPeer peer = MockPeer();
Future<json_rpc.Peer> mockServiceFunction(Uri uri) { Future<json_rpc.Peer> mockServiceFunction(
Uri uri, {
Duration timeout,
}) {
return Future<json_rpc.Peer>(() => peer); return Future<json_rpc.Peer>(() => peer);
} }
...@@ -84,7 +90,10 @@ void main() { ...@@ -84,7 +90,10 @@ void main() {
], ],
}; };
Future<json_rpc.Peer> mockVmConnectionFunction(Uri uri) { Future<json_rpc.Peer> mockVmConnectionFunction(
Uri uri, {
Duration timeout,
}) {
when(mockPeer.sendRequest(any, any)).thenAnswer((_) => when(mockPeer.sendRequest(any, any)).thenAnswer((_) =>
Future<Map<String, dynamic>>(() => flutterViewCannedResponses)); Future<Map<String, dynamic>>(() => flutterViewCannedResponses));
return Future<json_rpc.Peer>(() => mockPeer); return Future<json_rpc.Peer>(() => mockPeer);
...@@ -139,7 +148,10 @@ void main() { ...@@ -139,7 +148,10 @@ void main() {
], ],
}; };
Future<json_rpc.Peer> mockVmConnectionFunction(Uri uri) { Future<json_rpc.Peer> mockVmConnectionFunction(
Uri uri, {
Duration timeout,
}) {
when(mockPeer.sendRequest(any, any)).thenAnswer((_) => when(mockPeer.sendRequest(any, any)).thenAnswer((_) =>
Future<Map<String, dynamic>>(() => flutterViewCannedResponses)); Future<Map<String, dynamic>>(() => flutterViewCannedResponses));
return Future<json_rpc.Peer>(() => mockPeer); return Future<json_rpc.Peer>(() => mockPeer);
...@@ -186,7 +198,10 @@ void main() { ...@@ -186,7 +198,10 @@ void main() {
] ]
}; };
Future<json_rpc.Peer> mockVmConnectionFunction(Uri uri) { Future<json_rpc.Peer> mockVmConnectionFunction(
Uri uri, {
Duration timeout,
}) {
when(mockPeer.sendRequest(any, any)).thenAnswer((_) => when(mockPeer.sendRequest(any, any)).thenAnswer((_) =>
Future<Map<String, dynamic>>( Future<Map<String, dynamic>>(
() => flutterViewCannedResponseMissingId)); () => flutterViewCannedResponseMissingId));
...@@ -239,7 +254,10 @@ void main() { ...@@ -239,7 +254,10 @@ void main() {
], ],
}; };
Future<json_rpc.Peer> mockVmConnectionFunction(Uri uri) { Future<json_rpc.Peer> mockVmConnectionFunction(
Uri uri, {
Duration timeout,
}) {
when(mockPeer.sendRequest(any, any)).thenAnswer( when(mockPeer.sendRequest(any, any)).thenAnswer(
(_) => Future<Map<String, dynamic>>(() => vmCannedResponse)); (_) => Future<Map<String, dynamic>>(() => vmCannedResponse));
return Future<json_rpc.Peer>(() => mockPeer); return Future<json_rpc.Peer>(() => mockPeer);
...@@ -278,7 +296,10 @@ void main() { ...@@ -278,7 +296,10 @@ void main() {
], ],
}; };
Future<json_rpc.Peer> mockVmConnectionFunction(Uri uri) { Future<json_rpc.Peer> mockVmConnectionFunction(
Uri uri, {
Duration timeout,
}) {
when(mockPeer.sendRequest(any, any)).thenAnswer((_) => when(mockPeer.sendRequest(any, any)).thenAnswer((_) =>
Future<Map<String, dynamic>>( Future<Map<String, dynamic>>(
() => flutterViewCannedResponseMissingIsolateName)); () => flutterViewCannedResponseMissingIsolateName));
...@@ -311,7 +332,10 @@ void main() { ...@@ -311,7 +332,10 @@ void main() {
test('verify timeout fires', () async { test('verify timeout fires', () async {
const Duration timeoutTime = Duration(milliseconds: 100); const Duration timeoutTime = Duration(milliseconds: 100);
Future<json_rpc.Peer> mockVmConnectionFunction(Uri uri) { Future<json_rpc.Peer> mockVmConnectionFunction(
Uri uri, {
Duration timeout,
}) {
// Return a command that will never complete. // Return a command that will never complete.
when(mockPeer.sendRequest(any, any)) when(mockPeer.sendRequest(any, any))
.thenAnswer((_) => Completer<Map<String, dynamic>>().future); .thenAnswer((_) => Completer<Map<String, dynamic>>().future);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment