Commit d6f61b9e authored by Todd Volkert's avatar Todd Volkert Committed by GitHub

Add ability to mock VMService's WebSocket connection (#8145)

parent 77e53016
...@@ -60,7 +60,7 @@ class TraceCommand extends FlutterCommand { ...@@ -60,7 +60,7 @@ class TraceCommand extends FlutterCommand {
Tracing tracing; Tracing tracing;
try { try {
tracing = await Tracing.connect(observatoryUri); tracing = Tracing.connect(observatoryUri);
} catch (error) { } catch (error) {
throwToolExit('Error connecting to observatory: $error'); throwToolExit('Error connecting to observatory: $error');
} }
...@@ -102,8 +102,9 @@ class TraceCommand extends FlutterCommand { ...@@ -102,8 +102,9 @@ class TraceCommand extends FlutterCommand {
class Tracing { class Tracing {
Tracing(this.vmService); Tracing(this.vmService);
static Future<Tracing> connect(Uri uri) { static Tracing connect(Uri uri) {
return VMService.connect(uri).then((VMService observatory) => new Tracing(observatory)); VMService observatory = VMService.connect(uri);
return new Tracing(observatory);
} }
final VMService vmService; final VMService vmService;
......
...@@ -194,7 +194,7 @@ abstract class ResidentRunner { ...@@ -194,7 +194,7 @@ abstract class ResidentRunner {
if (!debuggingOptions.debuggingEnabled) { if (!debuggingOptions.debuggingEnabled) {
return new Future<Null>.error('Error the service protocol is not enabled.'); return new Future<Null>.error('Error the service protocol is not enabled.');
} }
vmService = await VMService.connect(uri); vmService = VMService.connect(uri);
printTrace('Connected to service protocol: $uri'); printTrace('Connected to service protocol: $uri');
await vmService.getVM(); await vmService.getVM();
......
...@@ -7,13 +7,23 @@ import 'dart:convert' show BASE64; ...@@ -7,13 +7,23 @@ import 'dart:convert' show BASE64;
import 'package:json_rpc_2/error_code.dart' as rpc_error_code; import 'package:json_rpc_2/error_code.dart' as rpc_error_code;
import 'package:json_rpc_2/json_rpc_2.dart' as rpc; import 'package:json_rpc_2/json_rpc_2.dart' as rpc;
import 'package:meta/meta.dart';
import 'package:stream_channel/stream_channel.dart'; import 'package:stream_channel/stream_channel.dart';
import 'package:web_socket_channel/io.dart'; import 'package:web_socket_channel/io.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'base/common.dart';
import 'base/file_system.dart'; import 'base/file_system.dart';
import 'base/io.dart';
import 'globals.dart'; import 'globals.dart';
/// A function that opens a two-way communication channel to the specified [uri].
typedef StreamChannel<dynamic> OpenChannel(Uri uri);
OpenChannel _openChannel = _defaultOpenChannel;
StreamChannel<dynamic> _defaultOpenChannel(Uri uri) =>
new IOWebSocketChannel.connect(uri.toString());
/// The default VM service request timeout. /// The default VM service request timeout.
const Duration kDefaultRequestTimeout = const Duration(seconds: 10); const Duration kDefaultRequestTimeout = const Duration(seconds: 10);
...@@ -22,35 +32,41 @@ const Duration kLongRequestTimeout = const Duration(minutes: 1); ...@@ -22,35 +32,41 @@ const Duration kLongRequestTimeout = const Duration(minutes: 1);
/// A connection to the Dart VM Service. /// A connection to the Dart VM Service.
class VMService { class VMService {
VMService._(this.peer, this.httpAddress, this.wsAddress, this._requestTimeout) { VMService._(this._peer, this.httpAddress, this.wsAddress, this._requestTimeout) {
_vm = new VM._empty(this); _vm = new VM._empty(this);
_peer.listen().catchError((dynamic e, StackTrace stackTrace) {
_connectionError.completeError(e, stackTrace);
});
peer.registerMethod('streamNotify', (rpc.Parameters event) { _peer.registerMethod('streamNotify', (rpc.Parameters event) {
_handleStreamNotify(event.asMap); _handleStreamNotify(event.asMap);
}); });
} }
@visibleForTesting
static void setOpenChannelForTesting(OpenChannel openChannel) {
_openChannel = openChannel;
}
/// Connect to a Dart VM Service at [httpUri]. /// Connect to a Dart VM Service at [httpUri].
/// ///
/// Requests made via the returns [VMService] time out after [requestTimeout] /// Requests made via the returns [VMService] time out after [requestTimeout]
/// amount of time, which is [kDefaultRequestTimeout] by default. /// amount of time, which is [kDefaultRequestTimeout] by default.
static Future<VMService> connect(Uri httpUri, { Duration requestTimeout: kDefaultRequestTimeout }) async { static VMService connect(
Uri httpUri, {
Duration requestTimeout: kDefaultRequestTimeout,
}) {
Uri wsUri = httpUri.replace(scheme: 'ws', path: fs.path.join(httpUri.path, 'ws')); Uri wsUri = httpUri.replace(scheme: 'ws', path: fs.path.join(httpUri.path, 'ws'));
WebSocket ws; StreamChannel<dynamic> channel = _openChannel(wsUri);
try { rpc.Peer peer = new rpc.Peer.withoutJson(jsonDocument.bind(channel.cast()));
ws = await WebSocket.connect(wsUri.toString());
} catch (e) {
return new Future<VMService>.error('Failed to connect to $wsUri\n $e');
}
rpc.Peer peer = new rpc.Peer.withoutJson(jsonDocument.bind(new IOWebSocketChannel(ws).cast()));
peer.listen();
return new VMService._(peer, httpUri, wsUri, requestTimeout); return new VMService._(peer, httpUri, wsUri, requestTimeout);
} }
final Uri httpAddress; final Uri httpAddress;
final Uri wsAddress; final Uri wsAddress;
final rpc.Peer peer; final rpc.Peer _peer;
final Duration _requestTimeout; final Duration _requestTimeout;
final Completer<Map<String, dynamic>> _connectionError = new Completer<Map<String, dynamic>>();
VM _vm; VM _vm;
/// The singleton [VM] object. Owns [Isolate] and [FlutterView] objects. /// The singleton [VM] object. Owns [Isolate] and [FlutterView] objects.
...@@ -61,8 +77,9 @@ class VMService { ...@@ -61,8 +77,9 @@ class VMService {
Set<String> _listeningFor = new Set<String>(); Set<String> _listeningFor = new Set<String>();
bool get isClosed => peer.isClosed; /// Whether our connection to the VM service has been closed;
Future<Null> get done => peer.done; bool get isClosed => _peer.isClosed;
Future<Null> get done => _peer.done;
// Events // Events
Stream<ServiceEvent> get onDebugEvent => onEvent('Debug'); Stream<ServiceEvent> get onDebugEvent => onEvent('Debug');
...@@ -78,6 +95,16 @@ class VMService { ...@@ -78,6 +95,16 @@ class VMService {
return _getEventController(streamId).stream; return _getEventController(streamId).stream;
} }
Future<Map<String, dynamic>> _sendRequest(
String method,
Map<String, dynamic> params,
) {
return Future.any(<Future<Map<String, dynamic>>>[
_peer.sendRequest(method, params),
_connectionError.future,
]);
}
StreamController<ServiceEvent> _getEventController(String eventName) { StreamController<ServiceEvent> _getEventController(String eventName) {
StreamController<ServiceEvent> controller = _eventControllers[eventName]; StreamController<ServiceEvent> controller = _eventControllers[eventName];
if (controller == null) { if (controller == null) {
...@@ -114,8 +141,7 @@ class VMService { ...@@ -114,8 +141,7 @@ class VMService {
Future<Null> _streamListen(String streamId) async { Future<Null> _streamListen(String streamId) async {
if (!_listeningFor.contains(streamId)) { if (!_listeningFor.contains(streamId)) {
_listeningFor.add(streamId); _listeningFor.add(streamId);
await peer.sendRequest('streamListen', await _sendRequest('streamListen', <String, dynamic>{ 'streamId': streamId });
<String, dynamic>{ 'streamId': streamId });
} }
} }
...@@ -299,7 +325,7 @@ abstract class ServiceObject { ...@@ -299,7 +325,7 @@ abstract class ServiceObject {
_inProgressReload = null; _inProgressReload = null;
} }
return _inProgressReload; return await _inProgressReload;
} }
/// Update [this] using [map] as a source. [map] can be a service reference. /// Update [this] using [map] as a source. [map] can be a service reference.
...@@ -578,8 +604,8 @@ class VM extends ServiceObjectOwner { ...@@ -578,8 +604,8 @@ class VM extends ServiceObjectOwner {
assert(params != null); assert(params != null);
timeout ??= _vmService._requestTimeout; timeout ??= _vmService._requestTimeout;
try { try {
Map<String, dynamic> result = await _vmService.peer Map<String, dynamic> result = await _vmService
.sendRequest(method, params) ._sendRequest(method, params)
.timeout(timeout); .timeout(timeout);
return result; return result;
} on TimeoutException { } on TimeoutException {
...@@ -587,6 +613,9 @@ class VM extends ServiceObjectOwner { ...@@ -587,6 +613,9 @@ class VM extends ServiceObjectOwner {
if (timeoutFatal) if (timeoutFatal)
throw new TimeoutException('Request to Dart VM Service timed out: $method($params)'); throw new TimeoutException('Request to Dart VM Service timed out: $method($params)');
return null; return null;
} on WebSocketChannelException catch (error) {
throwToolExit('Error connecting to observatory: $error');
return null;
} }
} }
......
...@@ -15,7 +15,9 @@ dependencies: ...@@ -15,7 +15,9 @@ dependencies:
file: 2.0.1 file: 2.0.1
http: ^0.11.3 http: ^0.11.3
intl: '>=0.14.0 <0.15.0' intl: '>=0.14.0 <0.15.0'
json_rpc_2: ^2.0.0 # TODO(tvolkert): Change to ^2.0.0 after manually vetting 2.0.4 release,
# which contains https://github.com/dart-lang/json_rpc_2/pull/19
json_rpc_2: 2.0.3
json_schema: 1.0.6 json_schema: 1.0.6
linter: 0.1.30-alpha.1 linter: 0.1.30-alpha.1
meta: ^1.0.4 meta: ^1.0.4
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment