Unverified Commit 61de5014 authored by Andrew Davies's avatar Andrew Davies Committed by GitHub

[frdp] Ignore stale ports. (#16944)

When forwarding all ports, run a test connection to each Dart VM instance, stopping port forwarding if the connection fails.

Also does the same for any batched calls that hit all Dart VM instances.
parent 549acac6
...@@ -31,7 +31,8 @@ Future<Null> main(List<String> args) async { ...@@ -31,7 +31,8 @@ Future<Null> main(List<String> args) async {
final String interface = args.length > 1 ? args[1] : ''; final String interface = args.length > 1 ? args[1] : '';
// Example ssh config path for the Fuchsia device after having made a local // Example ssh config path for the Fuchsia device after having made a local
// build. // build.
const String sshConfigPath = '../../out/release-x86-64/ssh-keys/ssh_config'; const String sshConfigPath =
'../../../fuchsia/out/x64rel/ssh-keys/ssh_config';
final FuchsiaRemoteConnection connection = final FuchsiaRemoteConnection connection =
await FuchsiaRemoteConnection.connect(address, interface, sshConfigPath); await FuchsiaRemoteConnection.connect(address, interface, sshConfigPath);
print('On $address, the following Dart VM ports are running:'); print('On $address, the following Dart VM ports are running:');
......
...@@ -10,7 +10,7 @@ import 'package:web_socket_channel/io.dart'; ...@@ -10,7 +10,7 @@ import 'package:web_socket_channel/io.dart';
import '../common/logging.dart'; import '../common/logging.dart';
const Duration _kConnectTimeout = const Duration(seconds: 30); const Duration _kConnectTimeout = const Duration(seconds: 9);
const Duration _kReconnectAttemptInterval = const Duration(seconds: 3); const Duration _kReconnectAttemptInterval = const Duration(seconds: 3);
...@@ -41,7 +41,14 @@ Future<json_rpc.Peer> _waitAndConnect(Uri uri) async { ...@@ -41,7 +41,14 @@ Future<json_rpc.Peer> _waitAndConnect(Uri uri) async {
socket = await WebSocket.connect(uri.toString()); socket = await WebSocket.connect(uri.toString());
peer = new json_rpc.Peer(new IOWebSocketChannel(socket).cast())..listen(); peer = new json_rpc.Peer(new IOWebSocketChannel(socket).cast())..listen();
return peer; return peer;
} on HttpException catch (e) {
// This is a fine warning as this most likely means the port is stale.
_log.fine('$e: ${e.message}');
await peer?.close();
await socket?.close();
rethrow;
} catch (e) { } catch (e) {
// Other unknown errors will be handled with reconnects.
await peer?.close(); await peer?.close();
await socket?.close(); await socket?.close();
if (timer.elapsed < _kConnectTimeout) { if (timer.elapsed < _kConnectTimeout) {
......
...@@ -113,13 +113,13 @@ class FuchsiaRemoteConnection { ...@@ -113,13 +113,13 @@ class FuchsiaRemoteConnection {
/// those objects) will subsequently have its connection closed as well, so /// those objects) will subsequently have its connection closed as well, so
/// behavior for them will be undefined. /// behavior for them will be undefined.
Future<Null> stop() async { Future<Null> stop() async {
for (PortForwarder fp in _forwardedVmServicePorts) { for (PortForwarder pf in _forwardedVmServicePorts) {
// Closes VM service first to ensure that the connection is closed cleanly // Closes VM service first to ensure that the connection is closed cleanly
// on the target before shutting down the forwarding itself. // on the target before shutting down the forwarding itself.
final DartVm vmService = _dartVmCache[fp.port]; final DartVm vmService = _dartVmCache[pf.port];
_dartVmCache[fp.port] = null; _dartVmCache[pf.port] = null;
await vmService?.stop(); await vmService?.stop();
await fp.stop(); await pf.stop();
} }
_dartVmCache.clear(); _dartVmCache.clear();
_forwardedVmServicePorts.clear(); _forwardedVmServicePorts.clear();
...@@ -127,18 +127,46 @@ class FuchsiaRemoteConnection { ...@@ -127,18 +127,46 @@ class FuchsiaRemoteConnection {
/// Returns a list of [FlutterView] objects. /// Returns a list of [FlutterView] objects.
/// ///
/// This is run across all connected DartVM connections that this class is /// This is run across all connected Dart VM connections that this class is
/// managing. /// managing.
Future<List<FlutterView>> getFlutterViews() async { Future<List<FlutterView>> getFlutterViews() async {
final List<FlutterView> views = <FlutterView>[];
if (_forwardedVmServicePorts.isEmpty) { if (_forwardedVmServicePorts.isEmpty) {
return views; return <FlutterView>[];
} }
for (PortForwarder fp in _forwardedVmServicePorts) { final List<List<FlutterView>> flutterViewLists =
final DartVm vmService = await _getDartVm(fp.port); await _invokeForAllVms<List<FlutterView>>((DartVm vmService) async {
views.addAll(await vmService.getAllFlutterViews()); return await vmService.getAllFlutterViews();
});
final List<FlutterView> results = flutterViewLists.fold<List<FlutterView>>(
<FlutterView>[], (List<FlutterView> acc, List<FlutterView> element) {
acc.addAll(element);
return acc;
});
return new List<FlutterView>.unmodifiable(results);
}
// Calls all Dart VM's, returning a list of results.
//
// A side effect of this function is that internally tracked port forwarding
// will be updated in the event that ports are found to be broken/stale: they
// will be shut down and removed from tracking.
Future<List<E>> _invokeForAllVms<E>(
Future<E> vmFunction(DartVm vmService)) async {
final List<E> result = <E>[];
final Set<int> stalePorts = new Set<int>();
for (PortForwarder pf in _forwardedVmServicePorts) {
try {
final DartVm service = await _getDartVm(pf.port);
result.add(await vmFunction(service));
} on HttpException {
await pf.stop();
stalePorts.add(pf.port);
}
} }
return new List<FlutterView>.unmodifiable(views); // Clean up the ports after finished with iterating.
_forwardedVmServicePorts
.removeWhere((PortForwarder pf) => stalePorts.contains(pf.port));
return result;
} }
Future<DartVm> _getDartVm(int port) async { Future<DartVm> _getDartVm(int port) async {
...@@ -172,6 +200,16 @@ class FuchsiaRemoteConnection { ...@@ -172,6 +200,16 @@ class FuchsiaRemoteConnection {
_sshCommandRunner.interface, _sshCommandRunner.interface,
_sshCommandRunner.sshConfigPath); _sshCommandRunner.sshConfigPath);
}))); })));
// Filters out stale ports after connecting. Ignores results.
await _invokeForAllVms<Map<String, dynamic>>(
(DartVm vmService) async {
final Map<String, dynamic> res =
await vmService.invokeRpc('getVersion');
_log.fine('DartVM version check result: $res');
return res;
},
);
} }
/// Gets the open Dart VM service ports on a remote Fuchsia device. /// Gets the open Dart VM service ports on a remote Fuchsia device.
...@@ -291,13 +329,14 @@ class _SshPortForwarder implements PortForwarder { ...@@ -291,13 +329,14 @@ class _SshPortForwarder implements PortForwarder {
]); ]);
_log.fine("_SshPortForwarder running '${command.join(' ')}'"); _log.fine("_SshPortForwarder running '${command.join(' ')}'");
final Process process = await _processManager.start(command); final Process process = await _processManager.start(command);
final _SshPortForwarder result = new _SshPortForwarder._(address,
remotePort, localSocket, process, interface, sshConfigPath, isIpV6);
process.exitCode.then((int c) { process.exitCode.then((int c) {
_log.fine("'${command.join(' ')}' exited with exit code $c"); _log.fine("'${command.join(' ')}' exited with exit code $c");
}); });
_log.fine( _log.fine('Set up forwarding from ${localSocket.port} '
'Set up forwarding from ${localSocket.port} to $address port $remotePort'); 'to $address port $remotePort');
return new _SshPortForwarder._(address, remotePort, localSocket, process, return result;
interface, sshConfigPath, isIpV6);
} }
/// Kills the SSH forwarding command, then to ensure no ports are forwarded, /// Kills the SSH forwarding command, then to ensure no ports are forwarded,
...@@ -328,8 +367,8 @@ class _SshPortForwarder implements PortForwarder { ...@@ -328,8 +367,8 @@ class _SshPortForwarder implements PortForwarder {
'Shutting down SSH forwarding with command: ${command.join(' ')}'); 'Shutting down SSH forwarding with command: ${command.join(' ')}');
final ProcessResult result = await _processManager.run(command); final ProcessResult result = await _processManager.run(command);
if (result.exitCode != 0) { if (result.exitCode != 0) {
_log.warning( _log.warning('Command failed:\nstdout: ${result.stdout}'
'Command failed:\nstdout: ${result.stdout}\nstderr: ${result.stderr}'); '\nstderr: ${result.stderr}');
} }
_localSocket.close(); _localSocket.close();
} }
......
...@@ -30,9 +30,9 @@ void main() { ...@@ -30,9 +30,9 @@ void main() {
const String address = 'fe80::8eae:4cff:fef4:9247'; const String address = 'fe80::8eae:4cff:fef4:9247';
const String interface = 'eno1'; const String interface = 'eno1';
// Adds some extra junk to make sure the strings will be cleaned up. // Adds some extra junk to make sure the strings will be cleaned up.
when(mockRunner.run(typed(any))) when(mockRunner.run(typed(any))).thenAnswer((_) =>
.thenAnswer((_) => new Future<List<String>>.value( new Future<List<String>>.value(
<String>['123\n\n\n', '456 ', '789'])); <String>['123\n\n\n', '456 ', '789']));
when(mockRunner.address).thenReturn(address); when(mockRunner.address).thenReturn(address);
when(mockRunner.interface).thenReturn(interface); when(mockRunner.interface).thenReturn(interface);
int port = 0; int port = 0;
...@@ -100,8 +100,10 @@ void main() { ...@@ -100,8 +100,10 @@ void main() {
mockPeerConnections.add(mp); mockPeerConnections.add(mp);
uriConnections.add(uri); uriConnections.add(uri);
when(mp.sendRequest(typed<String>(any), typed<String>(any))) when(mp.sendRequest(typed<String>(any), typed<String>(any)))
// The local ports match the desired indices for now, so get the
// canned response from the URI port.
.thenAnswer((_) => new Future<Map<String, dynamic>>( .thenAnswer((_) => new Future<Map<String, dynamic>>(
() => flutterViewCannedResponses[flutterViewIndex++])); () => flutterViewCannedResponses[uri.port]));
return mp; return mp;
}); });
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment