// Copyright 2014 The Flutter Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. import 'dart:async'; import 'dart:io'; import 'package:process/process.dart'; import 'common/logging.dart'; import 'common/network.dart'; import 'dart/dart_vm.dart'; import 'runners/ssh_command_runner.dart'; final String _ipv4Loopback = InternetAddress.loopbackIPv4.address; final String _ipv6Loopback = InternetAddress.loopbackIPv6.address; const ProcessManager _processManager = LocalProcessManager(); const Duration _kIsolateFindTimeout = Duration(minutes: 1); const Duration _kDartVmConnectionTimeout = Duration(seconds: 9); const Duration _kVmPollInterval = Duration(milliseconds: 1500); final Logger _log = Logger('FuchsiaRemoteConnection'); /// A function for forwarding ports on the local machine to a remote device. /// /// Takes a remote `address`, the target device's port, and an optional /// `interface` and `configFile`. The config file is used primarily for the /// default SSH port forwarding configuration. typedef PortForwardingFunction = Future<PortForwarder> Function( String address, int remotePort, [ String? interface, String? configFile, ]); /// The function for forwarding the local machine's ports to a remote Fuchsia /// device. /// /// Can be overwritten in the event that a different method is required. /// Defaults to using SSH port forwarding. PortForwardingFunction fuchsiaPortForwardingFunction = _SshPortForwarder.start; /// Sets [fuchsiaPortForwardingFunction] back to the default SSH port forwarding /// implementation. void restoreFuchsiaPortForwardingFunction() { fuchsiaPortForwardingFunction = _SshPortForwarder.start; } /// A general error raised when something fails within a /// [FuchsiaRemoteConnection]. class FuchsiaRemoteConnectionError extends Error { /// Basic constructor outlining the reason for the failure in `message`. FuchsiaRemoteConnectionError(this.message); /// The reason for the failure. final String message; @override String toString() { return '$FuchsiaRemoteConnectionError: $message'; } } /// An enum specifying a Dart VM's state. enum DartVmEventType { /// The Dart VM has started. started, /// The Dart VM has stopped. /// /// This can mean either the host machine cannot be connect to, the VM service /// has shut down cleanly, or the VM service has crashed. stopped, } /// An event regarding the Dart VM. /// /// Specifies the type of the event (whether the VM has started or has stopped), /// and contains the service port of the VM as well as a URL to connect to it. class DartVmEvent { DartVmEvent._({required this.eventType, required this.servicePort, required this.uri}); /// The URL used to connect to the Dart VM. final Uri uri; /// The type of event regarding this instance of the Dart VM. final DartVmEventType eventType; /// The port on the host machine that the Dart VM service is/was running on. final int servicePort; } /// Manages a remote connection to a Fuchsia Device. /// /// Provides affordances to observe and connect to Flutter views, isolates, and /// perform actions on the Fuchsia device's various VM services. /// /// This class can be connected to several instances of the Fuchsia device's /// Dart VM at any given time. class FuchsiaRemoteConnection { FuchsiaRemoteConnection._(this._useIpV6, this._sshCommandRunner) : _pollDartVms = false; bool _pollDartVms; final List<PortForwarder> _forwardedVmServicePorts = <PortForwarder>[]; final SshCommandRunner _sshCommandRunner; final bool _useIpV6; /// A mapping of Dart VM ports (as seen on the target machine), to /// [PortForwarder] instances mapping from the local machine to the target /// machine. final Map<int, PortForwarder> _dartVmPortMap = <int, PortForwarder>{}; /// Tracks stale ports so as not to reconnect while polling. final Set<int> _stalePorts = <int>{}; /// A broadcast stream that emits events relating to Dart VM's as they update. Stream<DartVmEvent> get onDartVmEvent => _onDartVmEvent; late Stream<DartVmEvent> _onDartVmEvent; final StreamController<DartVmEvent> _dartVmEventController = StreamController<DartVmEvent>(); /// VM service cache to avoid repeating handshakes across function /// calls. Keys a URI to a DartVm connection instance. final Map<Uri, DartVm?> _dartVmCache = <Uri, DartVm?>{}; /// Same as [FuchsiaRemoteConnection.connect] albeit with a provided /// [SshCommandRunner] instance. static Future<FuchsiaRemoteConnection> connectWithSshCommandRunner(SshCommandRunner commandRunner) async { final FuchsiaRemoteConnection connection = FuchsiaRemoteConnection._( isIpV6Address(commandRunner.address), commandRunner); await connection._forwardOpenPortsToDeviceServicePorts(); Stream<DartVmEvent> dartVmStream() { Future<void> listen() async { while (connection._pollDartVms) { await connection._pollVms(); await Future<void>.delayed(_kVmPollInterval); } connection._dartVmEventController.close(); } connection._dartVmEventController.onListen = listen; return connection._dartVmEventController.stream.asBroadcastStream(); } connection._onDartVmEvent = dartVmStream(); return connection; } /// Opens a connection to a Fuchsia device. /// /// Accepts an `address` to a Fuchsia device, and optionally a `sshConfigPath` /// in order to open the associated ssh_config for port forwarding. /// /// Will throw an [ArgumentError] if `address` is malformed. /// /// Once this function is called, the instance of [FuchsiaRemoteConnection] /// returned will keep all associated DartVM connections opened over the /// lifetime of the object. /// /// At its current state Dart VM connections will not be added or removed over /// the lifetime of this object. /// /// Throws an [ArgumentError] if the supplied `address` is not valid IPv6 or /// IPv4. /// /// If `address` is IPv6 link local (usually starts with `fe80::`), then /// `interface` will probably need to be set in order to connect successfully /// (that being the outgoing interface of your machine, not the interface on /// the target machine). /// /// Attempts to set `address` via the environment variable /// `FUCHSIA_DEVICE_URL` in the event that the argument is not passed. /// If `address` is not supplied, `interface` is also ignored, as the format /// is expected to contain the interface as well (in the event that it is /// link-local), like the following: /// /// ``` /// fe80::1%eth0 /// ``` /// /// In the event that `FUCHSIA_SSH_CONFIG` is set in the environment, that /// will be used when `sshConfigPath` isn't supplied. static Future<FuchsiaRemoteConnection> connect([ String? address, String interface = '', String? sshConfigPath, ]) async { address ??= Platform.environment['FUCHSIA_DEVICE_URL']; sshConfigPath ??= Platform.environment['FUCHSIA_SSH_CONFIG']; if (address == null) { throw FuchsiaRemoteConnectionError( r'No address supplied, and $FUCHSIA_DEVICE_URL not found.'); } const String interfaceDelimiter = '%'; if (address.contains(interfaceDelimiter)) { final List<String> addressAndInterface = address.split(interfaceDelimiter); address = addressAndInterface[0]; interface = addressAndInterface[1]; } return FuchsiaRemoteConnection.connectWithSshCommandRunner( SshCommandRunner( address: address, interface: interface, sshConfigPath: sshConfigPath, ), ); } /// Closes all open connections. /// /// Any objects that this class returns (including any child objects from /// those objects) will subsequently have its connection closed as well, so /// behavior for them will be undefined. Future<void> stop() async { for (final PortForwarder pf in _forwardedVmServicePorts) { // Closes VM service first to ensure that the connection is closed cleanly // on the target before shutting down the forwarding itself. final Uri uri = _getDartVmUri(pf); final DartVm? vmService = _dartVmCache[uri]; _dartVmCache[uri] = null; await vmService?.stop(); await pf.stop(); } for (final PortForwarder pf in _dartVmPortMap.values) { final Uri uri = _getDartVmUri(pf); final DartVm? vmService = _dartVmCache[uri]; _dartVmCache[uri] = null; await vmService?.stop(); await pf.stop(); } _dartVmCache.clear(); _forwardedVmServicePorts.clear(); _dartVmPortMap.clear(); _pollDartVms = false; } /// Helper method for [getMainIsolatesByPattern]. /// /// Called when either there are no Isolates that exist that match /// `pattern`, or there are not yet any active Dart VM's on the system /// (possible when the Isolate we're attempting to connect to is in the only /// instance of the Dart VM and its service port has not yet opened). Future<List<IsolateRef>> _waitForMainIsolatesByPattern([ Pattern? pattern, Duration timeout = _kIsolateFindTimeout, Duration vmConnectionTimeout = _kDartVmConnectionTimeout, ]) async { final Completer<List<IsolateRef>> completer = Completer<List<IsolateRef>>(); _onDartVmEvent.listen( (DartVmEvent event) async { if (event.eventType == DartVmEventType.started) { _log.fine('New VM found on port: ${event.servicePort}. Searching ' 'for Isolate: $pattern'); final DartVm? vmService = await _getDartVm(event.uri); // If the VM service is null, set the result to the empty list. final List<IsolateRef> result = await vmService?.getMainIsolatesByPattern(pattern!) ?? <IsolateRef>[]; if (result.isNotEmpty) { if (!completer.isCompleted) { completer.complete(result); } else { _log.warning('Found more than one Dart VM containing Isolates ' 'that match the pattern "$pattern".'); } } } }, onDone: () { if (!completer.isCompleted) { _log.warning('Terminating isolate search for "$pattern"' ' before timeout reached.'); } }, ); return completer.future.timeout(timeout); } /// Returns all Isolates running `main()` as matched by the [Pattern]. /// /// If there are no live Dart VM's or the Isolate cannot be found, waits until /// either `timeout` is reached, or a Dart VM starts up with a name that /// matches `pattern`. Future<List<IsolateRef>> getMainIsolatesByPattern( Pattern pattern, { Duration timeout = _kIsolateFindTimeout, Duration vmConnectionTimeout = _kDartVmConnectionTimeout, }) async { // If for some reason there are no Dart VM's that are alive, wait for one to // start with the Isolate in question. if (_dartVmPortMap.isEmpty) { _log.fine('No live Dart VMs found. Awaiting new VM startup'); return _waitForMainIsolatesByPattern( pattern, timeout, vmConnectionTimeout); } // Accumulate a list of eventual IsolateRef lists so that they can be loaded // simultaneously via Future.wait. final List<Future<List<IsolateRef>>> isolates = <Future<List<IsolateRef>>>[]; for (final PortForwarder fp in _dartVmPortMap.values) { final DartVm? vmService = await _getDartVm(_getDartVmUri(fp), timeout: vmConnectionTimeout); if (vmService == null) { continue; } isolates.add(vmService.getMainIsolatesByPattern(pattern)); } final List<IsolateRef> result = await Future.wait<List<IsolateRef>>(isolates) .timeout(timeout) .then<List<IsolateRef>>((List<List<IsolateRef>> listOfLists) { final List<List<IsolateRef>> mutableListOfLists = List<List<IsolateRef>>.from(listOfLists) ..retainWhere((List<IsolateRef> list) => list.isNotEmpty); // Folds the list of lists into one flat list. return mutableListOfLists.fold<List<IsolateRef>>( <IsolateRef>[], (List<IsolateRef> accumulator, List<IsolateRef> element) { accumulator.addAll(element); return accumulator; }, ); }); // If no VM instance anywhere has this, it's possible it hasn't spun up // anywhere. // // For the time being one Flutter Isolate runs at a time in each VM, so for // now this will wait until the timer runs out or a new Dart VM starts that // contains the Isolate in question. // // TODO(awdavies): Set this up to handle multiple Isolates per Dart VM. if (result.isEmpty) { _log.fine('No instance of the Isolate found. Awaiting new VM startup'); return _waitForMainIsolatesByPattern( pattern, timeout, vmConnectionTimeout); } return result; } /// Returns a list of [FlutterView] objects. /// /// This is run across all connected Dart VM connections that this class is /// managing. Future<List<FlutterView>> getFlutterViews() async { if (_dartVmPortMap.isEmpty) { return <FlutterView>[]; } final List<List<FlutterView>> flutterViewLists = await _invokeForAllVms<List<FlutterView>>((DartVm vmService) async { return vmService.getAllFlutterViews(); }); final List<FlutterView> results = flutterViewLists.fold<List<FlutterView>>( <FlutterView>[], (List<FlutterView> acc, List<FlutterView> element) { acc.addAll(element); return acc; }); return List<FlutterView>.unmodifiable(results); } // Calls all Dart VM's, returning a list of results. // // A side effect of this function is that internally tracked port forwarding // will be updated in the event that ports are found to be broken/stale: they // will be shut down and removed from tracking. Future<List<E>> _invokeForAllVms<E>( Future<E> Function(DartVm vmService) vmFunction, [ bool queueEvents = true, ]) async { final List<E> result = <E>[]; // Helper function loop. Future<void> shutDownPortForwarder(PortForwarder pf) async { await pf.stop(); _stalePorts.add(pf.remotePort); if (queueEvents) { _dartVmEventController.add(DartVmEvent._( eventType: DartVmEventType.stopped, servicePort: pf.remotePort, uri: _getDartVmUri(pf), )); } } for (final PortForwarder pf in _dartVmPortMap.values) { final DartVm? service = await _getDartVm(_getDartVmUri(pf)); if (service == null) { await shutDownPortForwarder(pf); } else { result.add(await vmFunction(service)); } } _stalePorts.forEach(_dartVmPortMap.remove); return result; } Uri _getDartVmUri(PortForwarder pf) { String? addr; if (pf.openPortAddress == null) { addr = _useIpV6 ? '[$_ipv6Loopback]' : _ipv4Loopback; } else { addr = isIpV6Address(pf.openPortAddress!) ? '[${pf.openPortAddress}]' : pf.openPortAddress; } final Uri uri = Uri.http('$addr:${pf.port}', '/'); return uri; } /// Attempts to create a connection to a Dart VM. /// /// Returns null if either there is an [HttpException] or a /// [TimeoutException], else a [DartVm] instance. Future<DartVm?> _getDartVm( Uri uri, { Duration timeout = _kDartVmConnectionTimeout, }) async { if (!_dartVmCache.containsKey(uri)) { // When raising an HttpException this means that there is no instance of // the Dart VM to communicate with. The TimeoutException is raised when // the Dart VM instance is shut down in the middle of communicating. try { final DartVm dartVm = await DartVm.connect(uri, timeout: timeout); _dartVmCache[uri] = dartVm; } on HttpException { _log.warning('HTTP Exception encountered connecting to new VM'); return null; } on TimeoutException { _log.warning('TimeoutException encountered connecting to new VM'); return null; } } return _dartVmCache[uri]; } /// Checks for changes in the list of Dart VM instances. /// /// If there are new instances of the Dart VM, then connections will be /// attempted (after clearing out stale connections). Future<void> _pollVms() async { await _checkPorts(); final List<int> servicePorts = await getDeviceServicePorts(); for (final int servicePort in servicePorts) { if (!_stalePorts.contains(servicePort) && !_dartVmPortMap.containsKey(servicePort)) { _dartVmPortMap[servicePort] = await fuchsiaPortForwardingFunction( _sshCommandRunner.address, servicePort, _sshCommandRunner.interface, _sshCommandRunner.sshConfigPath); _dartVmEventController.add(DartVmEvent._( eventType: DartVmEventType.started, servicePort: servicePort, uri: _getDartVmUri(_dartVmPortMap[servicePort]!), )); } } } /// Runs a dummy heartbeat command on all Dart VM instances. /// /// Removes any failing ports from the cache. Future<void> _checkPorts([ bool queueEvents = true ]) async { // Filters out stale ports after connecting. Ignores results. await _invokeForAllVms<void>( (DartVm vmService) async { await vmService.ping(); }, queueEvents, ); } /// Forwards a series of open ports to the remote device. /// /// When this function is run, all existing forwarded ports and connections /// are reset by way of [stop]. Future<void> _forwardOpenPortsToDeviceServicePorts() async { await stop(); final List<int> servicePorts = await getDeviceServicePorts(); final List<PortForwarder?> forwardedVmServicePorts = await Future.wait<PortForwarder?>( servicePorts.map<Future<PortForwarder?>>((int deviceServicePort) { return fuchsiaPortForwardingFunction( _sshCommandRunner.address, deviceServicePort, _sshCommandRunner.interface, _sshCommandRunner.sshConfigPath); })); for (final PortForwarder? pf in forwardedVmServicePorts) { // TODO(awdavies): Handle duplicates. _dartVmPortMap[pf!.remotePort] = pf; } // Don't queue events, since this is the initial forwarding. await _checkPorts(false); _pollDartVms = true; } /// Gets the open Dart VM service ports on a remote Fuchsia device. /// /// The method attempts to get service ports through an SSH connection. Upon /// successfully getting the VM service ports, returns them as a list of /// integers. If an empty list is returned, then no Dart VM instances could be /// found. An exception is thrown in the event of an actual error when /// attempting to acquire the ports. Future<List<int>> getDeviceServicePorts() async { final List<String> portPaths = await _sshCommandRunner .run('/bin/find /hub -name vmservice-port'); final List<int> ports = <int>[]; for (final String path in portPaths) { if (path == '') { continue; } final List<String> lsOutput = await _sshCommandRunner.run('/bin/ls $path'); for (final String line in lsOutput) { if (line == '') { continue; } final int? port = int.tryParse(line); if (port != null) { ports.add(port); } } } return ports; } } /// Defines an interface for port forwarding. /// /// When a port forwarder is initialized, it is intended to save a port through /// which a connection is persisted along the lifetime of this object. /// /// To shut down a port forwarder you must call the [stop] function. abstract class PortForwarder { /// Determines the port which is being forwarded. int get port; /// The address on which the open port is accessible. Defaults to null to /// indicate local loopback. String? get openPortAddress => null; /// The destination port on the other end of the port forwarding tunnel. int get remotePort; /// Shuts down and cleans up port forwarding. Future<void> stop(); } /// Instances of this class represent a running SSH tunnel. /// /// The SSH tunnel is from the host to a VM service running on a Fuchsia device. class _SshPortForwarder implements PortForwarder { _SshPortForwarder._( this._remoteAddress, this._remotePort, this._localSocket, this._interface, this._sshConfigPath, this._ipV6, ); final String _remoteAddress; final int _remotePort; final ServerSocket _localSocket; final String? _sshConfigPath; final String? _interface; final bool _ipV6; @override int get port => _localSocket.port; @override String get openPortAddress => _ipV6 ? _ipv6Loopback : _ipv4Loopback; @override int get remotePort => _remotePort; /// Starts SSH forwarding through a subprocess, and returns an instance of /// [_SshPortForwarder]. static Future<_SshPortForwarder> start( String address, int remotePort, [ String? interface, String? sshConfigPath, ]) async { final bool isIpV6 = isIpV6Address(address); final ServerSocket? localSocket = await _createLocalSocket(); if (localSocket == null || localSocket.port == 0) { _log.warning('_SshPortForwarder failed to find a local port for ' '$address:$remotePort'); throw StateError('Unable to create a socket or no available ports.'); } // TODO(awdavies): The square-bracket enclosure for using the IPv6 loopback // didn't appear to work, but when assigning to the IPv4 loopback device, // netstat shows that the local port is actually being used on the IPv6 // loopback (::1). Therefore, while the IPv4 loopback can be used for // forwarding to the destination IPv6 interface, when connecting to the // websocket, the IPV6 loopback should be used. final String formattedForwardingUrl = '${localSocket.port}:$_ipv4Loopback:$remotePort'; final String targetAddress = isIpV6 && interface!.isNotEmpty ? '$address%$interface' : address; const String dummyRemoteCommand = 'true'; final List<String> command = <String>[ 'ssh', if (isIpV6) '-6', if (sshConfigPath != null) ...<String>['-F', sshConfigPath], '-nNT', '-f', '-L', formattedForwardingUrl, targetAddress, dummyRemoteCommand, ]; _log.fine("_SshPortForwarder running '${command.join(' ')}'"); // Must await for the port forwarding function to completer here, as // forwarding must be completed before surfacing VM events (as the user may // want to connect immediately after an event is surfaced). final ProcessResult processResult = await _processManager.run(command); _log.fine("'${command.join(' ')}' exited with exit code " '${processResult.exitCode}'); if (processResult.exitCode != 0) { throw StateError('Unable to start port forwarding'); } final _SshPortForwarder result = _SshPortForwarder._( address, remotePort, localSocket, interface, sshConfigPath, isIpV6); _log.fine('Set up forwarding from ${localSocket.port} ' 'to $address port $remotePort'); return result; } /// Kills the SSH forwarding command, then to ensure no ports are forwarded, /// runs the SSH 'cancel' command to shut down port forwarding completely. @override Future<void> stop() async { // Cancel the forwarding request. See [start] for commentary about why this // uses the IPv4 loopback. final String formattedForwardingUrl = '${_localSocket.port}:$_ipv4Loopback:$_remotePort'; final String targetAddress = _ipV6 && _interface!.isNotEmpty ? '$_remoteAddress%$_interface' : _remoteAddress; final String? sshConfigPath = _sshConfigPath; final List<String> command = <String>[ 'ssh', if (sshConfigPath != null) ...<String>['-F', sshConfigPath], '-O', 'cancel', '-L', formattedForwardingUrl, targetAddress, ]; _log.fine( 'Shutting down SSH forwarding with command: ${command.join(' ')}'); final ProcessResult result = await _processManager.run(command); if (result.exitCode != 0) { _log.warning('Command failed:\nstdout: ${result.stdout}' '\nstderr: ${result.stderr}'); } _localSocket.close(); } /// Attempts to find an available port. /// /// If successful returns a valid [ServerSocket] (which must be disconnected /// later). static Future<ServerSocket?> _createLocalSocket() async { try { return await ServerSocket.bind(_ipv4Loopback, 0); } catch (e) { // We should not be catching all errors arbitrarily here, this might hide real errors. // TODO(ianh): Determine which exceptions to catch here. _log.warning('_createLocalSocket failed: $e'); return null; } } }