Unverified Commit 5d590afa authored by Andrew Davies's avatar Andrew Davies Committed by GitHub

[frdp] Adds DartVM events/driver support. (#17170)

This change adds Dart VM event support (listening for when a VM starts/stops by using a periodic heartbeat).

This also adds support to connect to a specific `IsolateRef` through the flutter driver, so that when an application spawns, it can immediately be driven (as shown in included example code).
parent 03173082
......@@ -62,6 +62,9 @@ const List<TimelineStream> _defaultStreams = const <TimelineStream>[TimelineStre
/// Default timeout for short-running RPCs.
const Duration _kShortTimeout = const Duration(seconds: 5);
/// Default timeout for awaiting an Isolate to become runnable.
const Duration _kIsolateLoadRunnableTimeout = const Duration(minutes: 1);
/// Default timeout for long-running RPCs.
final Duration _kLongTimeout = _kShortTimeout * 6;
......@@ -146,34 +149,46 @@ class FlutterDriver {
/// [logCommunicationToFile] determines whether the command communication
/// between the test and the app should be logged to `flutter_driver_commands.log`.
///
/// [isolateNumber] (optional) determines the specific isolate to connect to.
/// If this is left as `null`, will connect to the first isolate found
/// running on [dartVmServiceUrl].
///
/// [isolateReadyTimeout] determines how long after we connect to the VM
/// service we will wait for the first isolate to become runnable.
static Future<FlutterDriver> connect({
String dartVmServiceUrl,
bool printCommunication: false,
bool logCommunicationToFile: true,
Duration isolateReadyTimeout: const Duration(minutes: 1),
int isolateNumber,
Duration isolateReadyTimeout: _kIsolateLoadRunnableTimeout,
}) async {
dartVmServiceUrl ??= Platform.environment['VM_SERVICE_URL'];
if (dartVmServiceUrl == null) {
throw new DriverError(
'Could not determine URL to connect to application.\n'
'Either the VM_SERVICE_URL environment variable should be set, or an explicit\n'
'URL should be provided to the FlutterDriver.connect() method.'
);
'Could not determine URL to connect to application.\n'
'Either the VM_SERVICE_URL environment variable should be set, or an explicit\n'
'URL should be provided to the FlutterDriver.connect() method.');
}
// Connect to Dart VM services
_log.info('Connecting to Flutter application at $dartVmServiceUrl');
final VMServiceClientConnection connection = await vmServiceConnectFunction(dartVmServiceUrl);
final VMServiceClientConnection connection =
await vmServiceConnectFunction(dartVmServiceUrl);
final VMServiceClient client = connection.client;
final VM vm = await client.getVM();
_log.trace('Looking for the isolate');
VMIsolate isolate = await vm.isolates.first.loadRunnable()
final VMIsolateRef isolateRef = isolateNumber ==
null ? vm.isolates.first :
vm.isolates.firstWhere(
(VMIsolateRef isolate) => isolate.number == isolateNumber);
_log.trace('Isolate found with number: ${isolateRef.number}');
VMIsolate isolate = await isolateRef
.loadRunnable()
.timeout(isolateReadyTimeout, onTimeout: () {
throw new TimeoutException('Timeout while waiting for the isolate to become runnable');
});
throw new TimeoutException(
'Timeout while waiting for the isolate to become runnable');
});
// TODO(yjbanov): vm_service_client does not support "None" pause event yet.
// It is currently reported as null, but we cannot rely on it because
......@@ -189,7 +204,7 @@ class FlutterDriver {
isolate.pauseEvent is! VMPauseInterruptedEvent &&
isolate.pauseEvent is! VMResumeEvent) {
await new Future<Null>.delayed(_kShortTimeout ~/ 10);
isolate = await vm.isolates.first.loadRunnable();
isolate = await isolateRef.loadRunnable();
}
final FlutterDriver driver = new FlutterDriver.connectedTo(
......@@ -319,7 +334,7 @@ class FlutterDriver {
/// JSON-RPC client useful for sending raw JSON requests.
final rpc.Peer _peer;
/// The main isolate hosting the Flutter application
final VMIsolateRef _appIsolate;
final VMIsolate _appIsolate;
/// Whether to print communication between host and app to `stdout`.
final bool _printCommunication;
/// Whether to log communication between host and app to `flutter_driver_commands.log`.
......
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
import 'dart:async';
import 'dart:core';
import 'package:flutter_driver/flutter_driver.dart';
import 'package:fuchsia_remote_debug_protocol/fuchsia_remote_debug_protocol.dart';
import 'package:fuchsia_remote_debug_protocol/logging.dart';
/// Runs through a simple usage of the fuchsia_remote_debug_protocol library:
/// connects to a remote machine at the address in argument 1 (interface
/// optional for argument 2) to drive an application named 'todo_list' by
/// scrolling up and down on the main scaffold.
///
/// Make sure to set up your application (you can change the name from
/// 'todo_list') follows the setup for testing with the flutter driver:
/// https://flutter.io/testing/#adding-the-flutter_driver-dependency
///
/// Example usage:
///
/// $ dart examples/driver_todo_list_scroll.dart \
/// fe80::8eae:4cff:fef4:9247 eno1
Future<Null> main(List<String> args) async {
// Log only at info level within the library. If issues arise, this can be
// changed to [LoggingLevel.all] or [LoggingLevel.fine] to see more
// information.
Logger.globalLevel = LoggingLevel.info;
if (args.isEmpty) {
print('Expects an IP address and/or network interface');
return;
}
final String address = args[0];
final String interface = args.length > 1 ? args[1] : '';
// Example ssh config path for the Fuchsia device after having made a local
// build.
const String sshConfigPath =
'../../../fuchsia/out/x64rel/ssh-keys/ssh_config';
final FuchsiaRemoteConnection connection =
await FuchsiaRemoteConnection.connect(address, interface, sshConfigPath);
const Pattern isolatePattern = 'todo_list';
print('Finding $isolatePattern');
final List<IsolateRef> refs =
await connection.getMainIsolatesByPattern(isolatePattern);
final IsolateRef ref = refs.first;
print('Driving ${ref.name}');
final FlutterDriver driver = await FlutterDriver.connect(
dartVmServiceUrl: ref.dartVm.uri.toString(),
isolateNumber: ref.number,
printCommunication: true,
logCommunicationToFile: false);
for (int i = 0; i < 5; ++i) {
// Scrolls down 300px.
await driver.scroll(find.byType('Scaffold'), 0.0, -300.0,
const Duration(milliseconds: 300));
await new Future<Null>.delayed(const Duration(milliseconds: 500));
// Scrolls up 300px.
await driver.scroll(find.byType('Scaffold'), 300.0, 300.0,
const Duration(milliseconds: 300));
}
await driver.close();
await connection.stop();
}
......@@ -93,10 +93,13 @@ class RpcFormatError extends Error {
/// Either wraps existing RPC calls to the Dart VM service, or runs raw RPC
/// function calls via [invokeRpc].
class DartVm {
DartVm._(this._peer);
DartVm._(this._peer, this.uri);
final json_rpc.Peer _peer;
/// The URI through which this DartVM instance is connected.
final Uri uri;
/// Attempts to connect to the given [Uri].
///
/// Throws an error if unable to connect.
......@@ -108,7 +111,24 @@ class DartVm {
if (peer == null) {
return null;
}
return new DartVm._(peer);
return new DartVm._(peer, uri);
}
/// Returns a [List] of [IsolateRef] objects whose name matches `pattern`.
///
/// Also checks to make sure it was launched from the `main()` function.
Future<List<IsolateRef>> getMainIsolatesByPattern(Pattern pattern) async {
final Map<String, dynamic> jsonVmRef =
await invokeRpc('getVM', timeout: _kRpcTimeout);
final List<Map<String, dynamic>> jsonIsolates = jsonVmRef['isolates'];
final List<IsolateRef> result = <IsolateRef>[];
for (Map<String, dynamic> jsonIsolate in jsonIsolates) {
final String name = jsonIsolate['name'];
if (name.contains(pattern) && name.contains(new RegExp(r':main\(\)'))) {
result.add(new IsolateRef._fromJson(jsonIsolate, this));
}
}
return result;
}
/// Invokes a raw JSON RPC command with the VM service.
......@@ -119,7 +139,7 @@ class DartVm {
Future<Map<String, dynamic>> invokeRpc(
String function, {
Map<String, dynamic> params,
Duration timeout,
Duration timeout = _kRpcTimeout,
}) async {
final Future<Map<String, dynamic>> future = _peer.sendRequest(
function,
......@@ -208,3 +228,44 @@ class FlutterView {
/// May be null if there is no associated isolate.
String get name => _name;
}
/// This is a wrapper class for the `@Isolate` RPC object.
///
/// See:
/// https://github.com/dart-lang/sdk/blob/master/runtime/vm/service/service.md#isolate
///
/// This class contains information about the Isolate like its name and ID, as
/// well as a reference to the parent DartVM on which it is running.
class IsolateRef {
IsolateRef._(this.name, this.number, this.dartVm);
factory IsolateRef._fromJson(Map<String, dynamic> json, DartVm dartVm) {
final String number = json['number'];
final String name = json['name'];
final String type = json['type'];
if (type == null) {
throw new RpcFormatError('Unable to find type within JSON "$json"');
}
if (type != '@Isolate') {
throw new RpcFormatError('Type "$type" does not match for IsolateRef');
}
if (number == null) {
throw new RpcFormatError(
'Unable to find number for isolate ref within JSON "$json"');
}
if (name == null) {
throw new RpcFormatError(
'Unable to find name for isolate ref within JSON "$json"');
}
return new IsolateRef._(name, int.parse(number), dartVm);
}
/// The full name of this Isolate (not guaranteed to be unique).
final String name;
/// The unique number ID of this isolate.
final int number;
/// The parent [DartVm] on which this Isolate lives.
final DartVm dartVm;
}
......@@ -19,6 +19,10 @@ final String _ipv6Loopback = InternetAddress.LOOPBACK_IP_V6.address;
const ProcessManager _processManager = const LocalProcessManager();
const Duration _kIsolateFindTimeout = const Duration(minutes: 1);
const Duration _kVmPollInterval = const Duration(milliseconds: 1500);
final Logger _log = new Logger('FuchsiaRemoteConnection');
/// A function for forwarding ports on the local machine to a remote device.
......@@ -43,6 +47,35 @@ void restoreFuchsiaPortForwardingFunction() {
fuchsiaPortForwardingFunction = _SshPortForwarder.start;
}
/// An enum specifying a Dart VM's state.
enum DartVmEventType {
/// The Dart VM has started.
started,
/// The Dart VM has stopped.
///
/// This can mean either the host machine cannot be connect to, the VM service
/// has shut down cleanly, or the VM service has crashed.
stopped,
}
/// An event regarding the Dart VM.
///
/// Specifies the type of the event (whether the VM has started or has stopped),
/// and contains the service port of the VM as well as a URI to connect to it.
class DartVmEvent {
DartVmEvent._({this.eventType, this.servicePort, this.uri});
/// The URI used to connect to the Dart VM.
final Uri uri;
/// The type of event regarding this instance of the Dart VM.
final DartVmEventType eventType;
/// The port on the host machine that the Dart VM service is/was running on.
final int servicePort;
}
/// Manages a remote connection to a Fuchsia Device.
///
/// Provides affordances to observe and connect to Flutter views, isolates, and
......@@ -51,16 +84,32 @@ void restoreFuchsiaPortForwardingFunction() {
/// Note that this class can be connected to several instances of the Fuchsia
/// device's Dart VM at any given time.
class FuchsiaRemoteConnection {
FuchsiaRemoteConnection._(this._useIpV6Loopback, this._sshCommandRunner)
: _pollDartVms = false;
bool _pollDartVms;
final List<PortForwarder> _forwardedVmServicePorts = <PortForwarder>[];
final SshCommandRunner _sshCommandRunner;
final bool _useIpV6Loopback;
/// A mapping of Dart VM ports (as seen on the target machine), to
/// [PortForwarder] instances mapping from the local machine to the target
/// machine.
final Map<int, PortForwarder> _dartVmPortMap = <int, PortForwarder>{};
/// Tracks stale ports so as not to reconnect while polling.
final Set<int> _stalePorts = new Set<int>();
/// A broadcast stream that emits events relating to Dart VM's as they update.
Stream<DartVmEvent> get onDartVmEvent => _onDartVmEvent;
Stream<DartVmEvent> _onDartVmEvent;
final StreamController<DartVmEvent> _dartVmEventController =
new StreamController<DartVmEvent>();
/// VM service cache to avoid repeating handshakes across function
/// calls. Keys a forwarded port to a DartVm connection instance.
final Map<int, DartVm> _dartVmCache = <int, DartVm>{};
FuchsiaRemoteConnection._(this._useIpV6Loopback, this._sshCommandRunner);
/// Same as [FuchsiaRemoteConnection.connect] albeit with a provided
/// [SshCommandRunner] instance.
@visibleForTesting
......@@ -69,6 +118,21 @@ class FuchsiaRemoteConnection {
final FuchsiaRemoteConnection connection = new FuchsiaRemoteConnection._(
isIpV6Address(commandRunner.address), commandRunner);
await connection._forwardLocalPortsToDeviceServicePorts();
Stream<DartVmEvent> dartVmStream() {
Future<Null> listen() async {
while (connection._pollDartVms) {
await connection._pollVms();
await new Future<Null>.delayed(_kVmPollInterval);
}
connection._dartVmEventController.close();
}
connection._dartVmEventController.onListen = listen;
return connection._dartVmEventController.stream.asBroadcastStream();
}
connection._onDartVmEvent = dartVmStream();
return connection;
}
......@@ -121,8 +185,82 @@ class FuchsiaRemoteConnection {
await vmService?.stop();
await pf.stop();
}
for (PortForwarder pf in _dartVmPortMap.values) {
final DartVm vmService = _dartVmCache[pf.port];
_dartVmCache[pf.port] = null;
await vmService?.stop();
await pf.stop();
}
_dartVmCache.clear();
_forwardedVmServicePorts.clear();
_dartVmPortMap.clear();
_pollDartVms = false;
}
/// Returns all Isolates running `main()` as matched by the [Pattern].
///
/// In the current state this is not capable of listening for an
/// Isolate to start up. The Isolate must already be running.
Future<List<IsolateRef>> getMainIsolatesByPattern(
Pattern pattern, [
Duration timeout = _kIsolateFindTimeout,
]) async {
if (_dartVmPortMap.isEmpty) {
return null;
}
// Accumulate a list of eventual IsolateRef lists so that they can be loaded
// simultaneously via Future.wait.
final List<Future<List<IsolateRef>>> isolates =
<Future<List<IsolateRef>>>[];
for (PortForwarder fp in _dartVmPortMap.values) {
final DartVm vmService = await _getDartVm(fp.port);
isolates.add(vmService.getMainIsolatesByPattern(pattern));
}
final Completer<List<IsolateRef>> completer =
new Completer<List<IsolateRef>>();
final List<IsolateRef> result =
await Future.wait(isolates).then((List<List<IsolateRef>> listOfLists) {
final List<List<IsolateRef>> mutableListOfLists =
new List<List<IsolateRef>>.from(listOfLists)
..retainWhere((List<IsolateRef> list) => list.isNotEmpty);
// Folds the list of lists into one flat list.
return mutableListOfLists.fold<List<IsolateRef>>(
<IsolateRef>[],
(List<IsolateRef> accumulator, List<IsolateRef> element) {
accumulator.addAll(element);
return accumulator;
},
);
});
// If no VM instance anywhere has this, it's possible it hasn't spun up
// anywhere.
//
// For the time being one Flutter Isolate runs at a time in each VM, so for
// now this will wait until the timer runs out or a new Dart VM starts that
// contains the Isolate in question.
//
// TODO(awdavies): Set this up to handle multiple Isolates per Dart VM.
if (result.isEmpty) {
_log.fine('No instance of the Isolate found. Awaiting new VM startup');
_onDartVmEvent.listen(
(DartVmEvent event) async {
if (event.eventType == DartVmEventType.started) {
_log.fine('New VM found on port: ${event.servicePort}. Searching '
'for Isolate: $pattern');
final DartVm vmService = await _getDartVm(event.uri.port);
final List<IsolateRef> result =
await vmService.getMainIsolatesByPattern(pattern);
if (result.isNotEmpty) {
completer.complete(result);
}
}
},
);
} else {
completer.complete(result);
}
return completer.future.timeout(timeout);
}
/// Returns a list of [FlutterView] objects.
......@@ -130,7 +268,7 @@ class FuchsiaRemoteConnection {
/// This is run across all connected Dart VM connections that this class is
/// managing.
Future<List<FlutterView>> getFlutterViews() async {
if (_forwardedVmServicePorts.isEmpty) {
if (_dartVmPortMap.isEmpty) {
return <FlutterView>[];
}
final List<List<FlutterView>> flutterViewLists =
......@@ -151,40 +289,102 @@ class FuchsiaRemoteConnection {
// will be updated in the event that ports are found to be broken/stale: they
// will be shut down and removed from tracking.
Future<List<E>> _invokeForAllVms<E>(
Future<E> vmFunction(DartVm vmService)) async {
Future<E> vmFunction(DartVm vmService), [
bool queueEvents = true,
]) async {
final List<E> result = <E>[];
final Set<int> stalePorts = new Set<int>();
for (PortForwarder pf in _forwardedVmServicePorts) {
// Helper function loop.
Future<Null> shutDownPortForwarder(PortForwarder pf) async {
await pf.stop();
_stalePorts.add(pf.remotePort);
if (queueEvents) {
_dartVmEventController.add(new DartVmEvent._(
eventType: DartVmEventType.stopped,
servicePort: pf.remotePort,
uri: _getDartVmUri(pf.port),
));
}
}
for (PortForwarder pf in _dartVmPortMap.values) {
// When raising an HttpException this means that there is no instance of
// the Dart VM to communicate with. The TimeoutException is raised when
// the Dart VM instance is shut down in the middle of communicating.
try {
final DartVm service = await _getDartVm(pf.port);
result.add(await vmFunction(service));
} on HttpException {
await pf.stop();
stalePorts.add(pf.port);
await shutDownPortForwarder(pf);
} on TimeoutException {
await shutDownPortForwarder(pf);
}
}
// Clean up the ports after finished with iterating.
_forwardedVmServicePorts
.removeWhere((PortForwarder pf) => stalePorts.contains(pf.port));
_stalePorts.forEach(_dartVmPortMap.remove);
return result;
}
Uri _getDartVmUri(int port) {
// While the IPv4 loopback can be used for the initial port forwarding
// (see [PortForwarder.start]), the address is actually bound to the IPv6
// loopback device, so connecting to the IPv4 loopback would fail when the
// target address is IPv6 link-local.
final String addr = _useIpV6Loopback
? 'http://\[$_ipv6Loopback\]:$port'
: 'http://$_ipv4Loopback:$port';
final Uri uri = Uri.parse(addr);
return uri;
}
Future<DartVm> _getDartVm(int port) async {
if (!_dartVmCache.containsKey(port)) {
// While the IPv4 loopback can be used for the initial port forwarding
// (see [PortForwarder.start]), the address is actually bound to the IPv6
// loopback device, so connecting to the IPv4 loopback would fail when the
// target address is IPv6 link-local.
final String addr = _useIpV6Loopback
? 'http://\[$_ipv6Loopback\]:$port'
: 'http://$_ipv4Loopback:$port';
final Uri uri = Uri.parse(addr);
final DartVm dartVm = await DartVm.connect(uri);
final DartVm dartVm = await DartVm.connect(_getDartVmUri(port));
_dartVmCache[port] = dartVm;
}
return _dartVmCache[port];
}
/// Checks for changes in the list of Dart VM instances.
///
/// If there are new instances of the Dart VM, then connections will be
/// attempted (after clearing out stale connections).
Future<Null> _pollVms() async {
await _checkPorts();
final List<int> servicePorts = await getDeviceServicePorts();
for (int servicePort in servicePorts) {
if (!_stalePorts.contains(servicePort) &&
!_dartVmPortMap.containsKey(servicePort)) {
_dartVmPortMap[servicePort] = await fuchsiaPortForwardingFunction(
_sshCommandRunner.address,
servicePort,
_sshCommandRunner.interface,
_sshCommandRunner.sshConfigPath);
_dartVmEventController.add(new DartVmEvent._(
eventType: DartVmEventType.started,
servicePort: servicePort,
uri: _getDartVmUri(_dartVmPortMap[servicePort].port),
));
}
}
}
/// Runs a dummy heartbeat command on all Dart VM instances.
///
/// Removes any failing ports from the cache.
Future<Null> _checkPorts([bool queueEvents = true]) async {
// Filters out stale ports after connecting. Ignores results.
await _invokeForAllVms<Map<String, dynamic>>(
(DartVm vmService) async {
final Map<String, dynamic> res =
await vmService.invokeRpc('getVersion');
_log.fine('DartVM version check result: $res');
return res;
},
queueEvents,
);
}
/// Forwards a series of local device ports to the remote device.
///
/// When this function is run, all existing forwarded ports and connections
......@@ -192,24 +392,24 @@ class FuchsiaRemoteConnection {
Future<Null> _forwardLocalPortsToDeviceServicePorts() async {
await stop();
final List<int> servicePorts = await getDeviceServicePorts();
_forwardedVmServicePorts
.addAll(await Future.wait(servicePorts.map((int deviceServicePort) {
final List<PortForwarder> forwardedVmServicePorts =
await Future.wait(servicePorts.map((int deviceServicePort) {
return fuchsiaPortForwardingFunction(
_sshCommandRunner.address,
deviceServicePort,
_sshCommandRunner.interface,
_sshCommandRunner.sshConfigPath);
})));
}));
// Filters out stale ports after connecting. Ignores results.
await _invokeForAllVms<Map<String, dynamic>>(
(DartVm vmService) async {
final Map<String, dynamic> res =
await vmService.invokeRpc('getVersion');
_log.fine('DartVM version check result: $res');
return res;
},
);
for (PortForwarder pf in forwardedVmServicePorts) {
// TODO(awdavies): Handle duplicates.
_dartVmPortMap[pf.remotePort] = pf;
}
// Don't queue events, since this is the initial forwarding.
await _checkPorts(false);
_pollDartVms = true;
}
/// Gets the open Dart VM service ports on a remote Fuchsia device.
......@@ -271,7 +471,6 @@ class _SshPortForwarder implements PortForwarder {
this._remoteAddress,
this._remotePort,
this._localSocket,
this._process,
this._interface,
this._sshConfigPath,
this._ipV6,
......@@ -280,7 +479,6 @@ class _SshPortForwarder implements PortForwarder {
final String _remoteAddress;
final int _remotePort;
final ServerSocket _localSocket;
final Process _process;
final String _sshConfigPath;
final String _interface;
final bool _ipV6;
......@@ -328,12 +526,17 @@ class _SshPortForwarder implements PortForwarder {
dummyRemoteCommand,
]);
_log.fine("_SshPortForwarder running '${command.join(' ')}'");
final Process process = await _processManager.start(command);
final _SshPortForwarder result = new _SshPortForwarder._(address,
remotePort, localSocket, process, interface, sshConfigPath, isIpV6);
process.exitCode.then((int c) {
_log.fine("'${command.join(' ')}' exited with exit code $c");
});
// Must await for the port forwarding function to completer here, as
// forwarding must be completed before surfacing VM events (as the user may
// want to connect immediately after an event is surfaced).
final ProcessResult processResult = await _processManager.run(command);
_log.fine("'${command.join(' ')}' exited with exit code "
'${processResult.exitCode}');
if (processResult.exitCode != 0) {
return null;
}
final _SshPortForwarder result = new _SshPortForwarder._(
address, remotePort, localSocket, interface, sshConfigPath, isIpV6);
_log.fine('Set up forwarding from ${localSocket.port} '
'to $address port $remotePort');
return result;
......@@ -343,8 +546,6 @@ class _SshPortForwarder implements PortForwarder {
/// runs the SSH 'cancel' command to shut down port forwarding completely.
@override
Future<Null> stop() async {
// Kill the original SSH process if it is still around.
_process.kill();
// Cancel the forwarding request. See [start] for commentary about why this
// uses the IPv4 loopback.
final String formattedForwardingUrl =
......
......@@ -10,6 +10,8 @@ dependencies:
web_socket_channel: 1.0.7
flutter_test:
sdk: flutter
flutter_driver:
sdk: flutter
args: 1.4.2 # THIS LINE IS AUTOGENERATED - TO UPDATE USE "flutter update-packages --force-upgrade"
async: 2.0.6 # THIS LINE IS AUTOGENERATED - TO UPDATE USE "flutter update-packages --force-upgrade"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment