Unverified Commit 10658266 authored by Lau Ching Jun's avatar Lau Ching Jun Committed by GitHub

ProxiedDevice, connection to a remotely connected device via flutter daemon. (#95738)

Also allow daemon commands to pass binary streams
parent 94d9b02d
......@@ -237,7 +237,7 @@ known, it can be explicitly provided to attach via the command-line, e.g.
final Daemon daemon = boolArg('machine')
? Daemon(
DaemonConnection(
daemonStreams: StdioDaemonStreams(globals.stdio),
daemonStreams: DaemonStreams.fromStdio(globals.stdio, logger: globals.logger),
logger: globals.logger,
),
notifyingLogger: (globals.logger is NotifyingLogger)
......
......@@ -11,6 +11,7 @@ import 'package:meta/meta.dart';
import 'package:uuid/uuid.dart';
import '../android/android_workflow.dart';
import '../application_package.dart';
import '../base/common.dart';
import '../base/file_system.dart';
import '../base/io.dart';
......@@ -18,6 +19,7 @@ import '../base/logger.dart';
import '../base/terminal.dart';
import '../base/utils.dart';
import '../build_info.dart';
import '../convert.dart';
import '../daemon.dart';
import '../device.dart';
import '../device_port_forwarder.dart';
......@@ -85,7 +87,7 @@ class DaemonCommand extends FlutterCommand {
globals.printStatus('Starting device daemon...');
final Daemon daemon = Daemon(
DaemonConnection(
daemonStreams: StdioDaemonStreams(globals.stdio),
daemonStreams: DaemonStreams.fromStdio(globals.stdio, logger: globals.logger),
logger: globals.logger,
),
notifyingLogger: asLogger<NotifyingLogger>(globals.logger),
......@@ -128,7 +130,7 @@ class _DaemonServer {
});
final Daemon daemon = Daemon(
DaemonConnection(
daemonStreams: TcpDaemonStreams(socket, logger: logger),
daemonStreams: DaemonStreams.fromSocket(socket, logger: logger),
logger: logger,
),
notifyingLogger: notifyingLogger,
......@@ -145,6 +147,7 @@ class _DaemonServer {
}
typedef CommandHandler = Future<dynamic> Function(Map<String, dynamic> args);
typedef CommandHandlerWithBinary = Future<dynamic> Function(Map<String, dynamic> args, Stream<List<int>> binary);
class Daemon {
Daemon(
......@@ -158,6 +161,7 @@ class Daemon {
_registerDomain(deviceDomain = DeviceDomain(this));
_registerDomain(emulatorDomain = EmulatorDomain(this));
_registerDomain(devToolsDomain = DevToolsDomain(this));
_registerDomain(proxyDomain = ProxyDomain(this));
// Start listening.
_commandSubscription = connection.incomingCommands.listen(
......@@ -178,7 +182,8 @@ class Daemon {
DeviceDomain deviceDomain;
EmulatorDomain emulatorDomain;
DevToolsDomain devToolsDomain;
StreamSubscription<Map<String, dynamic>> _commandSubscription;
ProxyDomain proxyDomain;
StreamSubscription<DaemonMessage> _commandSubscription;
final NotifyingLogger notifyingLogger;
final bool logToStdout;
......@@ -192,11 +197,11 @@ class Daemon {
Future<int> get onExit => _onExitCompleter.future;
void _handleRequest(Map<String, dynamic> request) {
void _handleRequest(DaemonMessage request) {
// {id, method, params}
// [id] is an opaque type to us.
final dynamic id = request['id'];
final dynamic id = request.data['id'];
if (id == null) {
globals.stdio.stderrWrite('no id for request: $request\n');
......@@ -204,7 +209,7 @@ class Daemon {
}
try {
final String method = request['method'] as String;
final String method = request.data['method'] as String;
assert(method != null);
if (!method.contains('.')) {
throw 'method not understood: $method';
......@@ -216,7 +221,7 @@ class Daemon {
throw 'no domain for method: $method';
}
_domainMap[prefix].handleCommand(name, id, castStringKeyedMap(request['params']) ?? const <String, dynamic>{});
_domainMap[prefix].handleCommand(name, id, castStringKeyedMap(request.data['params']) ?? const <String, dynamic>{}, request.binary);
} on Exception catch (error, trace) {
connection.sendErrorResponse(id, _toJsonable(error), trace);
}
......@@ -246,18 +251,29 @@ abstract class Domain {
final Daemon daemon;
final String name;
final Map<String, CommandHandler> _handlers = <String, CommandHandler>{};
final Map<String, CommandHandlerWithBinary> _handlersWithBinary = <String, CommandHandlerWithBinary>{};
void registerHandler(String name, CommandHandler handler) {
assert(!_handlers.containsKey(name));
assert(!_handlersWithBinary.containsKey(name));
_handlers[name] = handler;
}
void registerHandlerWithBinary(String name, CommandHandlerWithBinary handler) {
assert(!_handlers.containsKey(name));
assert(!_handlersWithBinary.containsKey(name));
_handlersWithBinary[name] = handler;
}
@override
String toString() => name;
void handleCommand(String command, dynamic id, Map<String, dynamic> args) {
void handleCommand(String command, dynamic id, Map<String, dynamic> args, Stream<List<int>> binary) {
Future<dynamic>.sync(() {
if (_handlers.containsKey(command)) {
return _handlers[command](args);
} else if (_handlersWithBinary.containsKey(command)) {
return _handlersWithBinary[command](args, binary);
}
throw 'command not understood: $name.$command';
}).then<dynamic>((dynamic result) {
......@@ -267,8 +283,8 @@ abstract class Domain {
});
}
void sendEvent(String name, [ dynamic args ]) {
daemon.connection.sendEvent(name, _toJsonable(args));
void sendEvent(String name, [ dynamic args, List<int> binary ]) {
daemon.connection.sendEvent(name, _toJsonable(args), binary);
}
String _getStringArg(Map<String, dynamic> args, String name, { bool required = false }) {
......@@ -794,16 +810,29 @@ typedef _DeviceEventHandler = void Function(Device device);
class DeviceDomain extends Domain {
DeviceDomain(Daemon daemon) : super(daemon, 'device') {
registerHandler('getDevices', getDevices);
registerHandler('discoverDevices', discoverDevices);
registerHandler('enable', enable);
registerHandler('disable', disable);
registerHandler('forward', forward);
registerHandler('unforward', unforward);
registerHandler('supportsRuntimeMode', supportsRuntimeMode);
registerHandler('uploadApplicationPackage', uploadApplicationPackage);
registerHandler('logReader.start', startLogReader);
registerHandler('logReader.stop', stopLogReader);
registerHandler('startApp', startApp);
registerHandler('stopApp', stopApp);
registerHandler('takeScreenshot', takeScreenshot);
// Use the device manager discovery so that client provided device types
// are usable via the daemon protocol.
globals.deviceManager.deviceDiscoverers.forEach(addDeviceDiscoverer);
}
/// An incrementing number used to generate unique ids.
int _id = 0;
final Map<String, ApplicationPackage> _applicationPackages = <String, ApplicationPackage>{};
final Map<String, DeviceLogReader> _logReaders = <String, DeviceLogReader>{};
void addDeviceDiscoverer(DeviceDiscovery discoverer) {
if (!discoverer.supportsPlatform) {
return;
......@@ -843,6 +872,15 @@ class DeviceDomain extends Domain {
];
}
/// Return a list of the current devices, discarding existing cache of devices.
Future<List<Map<String, dynamic>>> discoverDevices([ Map<String, dynamic> args ]) async {
return <Map<String, dynamic>>[
for (final PollingDeviceDiscovery discoverer in _discoverers)
for (final Device device in await discoverer.discoverDevices())
await _deviceToMap(device),
];
}
/// Enable device events.
Future<void> enable(Map<String, dynamic> args) async {
for (final PollingDeviceDiscovery discoverer in _discoverers) {
......@@ -887,6 +925,118 @@ class DeviceDomain extends Domain {
return device.portForwarder.unforward(ForwardedPort(hostPort, devicePort));
}
/// Returns whether a device supports runtime mode.
Future<bool> supportsRuntimeMode(Map<String, dynamic> args) async {
final String deviceId = _getStringArg(args, 'deviceId', required: true);
final Device device = await daemon.deviceDomain._getDevice(deviceId);
if (device == null) {
throw "device '$deviceId' not found";
}
final String buildMode = _getStringArg(args, 'buildMode', required: true);
return await device.supportsRuntimeMode(getBuildModeForName(buildMode));
}
/// Creates an application package from a file in the temp directory.
Future<String> uploadApplicationPackage(Map<String, dynamic> args) async {
final TargetPlatform targetPlatform = getTargetPlatformForName(_getStringArg(args, 'targetPlatform', required: true));
final File applicationBinary = daemon.proxyDomain.tempDirectory.childFile(_getStringArg(args, 'applicationBinary', required: true));
final ApplicationPackage applicationPackage = await ApplicationPackageFactory.instance.getPackageForPlatform(
targetPlatform,
applicationBinary: applicationBinary,
);
final String id = 'application_package_${_id++}';
_applicationPackages[id] = applicationPackage;
return id;
}
/// Starts the log reader on the device.
Future<String> startLogReader(Map<String, dynamic> args) async {
final String deviceId = _getStringArg(args, 'deviceId', required: true);
final Device device = await daemon.deviceDomain._getDevice(deviceId);
if (device == null) {
throw "device '$deviceId' not found";
}
final String applicationPackageId = _getStringArg(args, 'applicationPackageId');
final ApplicationPackage applicationPackage = applicationPackageId != null ? _applicationPackages[applicationPackageId] : null;
final String id = '${deviceId}_${_id++}';
final DeviceLogReader logReader = await device.getLogReader(app: applicationPackage);
logReader.logLines.listen((String log) => sendEvent('device.logReader.logLines.$id', log));
_logReaders[id] = logReader;
return id;
}
/// Stops a log reader that was previously started.
Future<void> stopLogReader(Map<String, dynamic> args) async {
final String id = _getStringArg(args, 'id', required: true);
_logReaders.remove(id)?.dispose();
}
/// Starts an app on a device.
Future<Map<String, dynamic>> startApp(Map<String, dynamic> args) async {
final String deviceId = _getStringArg(args, 'deviceId', required: true);
final Device device = await daemon.deviceDomain._getDevice(deviceId);
if (device == null) {
throw "device '$deviceId' not found";
}
final String applicationPackageId = _getStringArg(args, 'applicationPackageId', required: true);
final ApplicationPackage applicationPackage = _applicationPackages[applicationPackageId];
final LaunchResult result = await device.startApp(
applicationPackage,
debuggingOptions: DebuggingOptions.fromJson(
castStringKeyedMap(args['debuggingOptions']),
// We are using prebuilts, build info does not matter here.
BuildInfo.debug,
),
mainPath: _getStringArg(args, 'mainPath'),
route: _getStringArg(args, 'route'),
platformArgs: castStringKeyedMap(args['platformArgs']),
prebuiltApplication: _getBoolArg(args, 'prebuiltApplication'),
ipv6: _getBoolArg(args, 'ipv6'),
userIdentifier: _getStringArg(args, 'userIdentifier'),
);
return <String, dynamic>{
'started': result.started,
'observatoryUri': result.observatoryUri?.toString(),
};
}
/// Stops an app.
Future<bool> stopApp(Map<String, dynamic> args) async {
final String deviceId = _getStringArg(args, 'deviceId', required: true);
final Device device = await daemon.deviceDomain._getDevice(deviceId);
if (device == null) {
throw "device '$deviceId' not found";
}
final String applicationPackageId = _getStringArg(args, 'applicationPackageId', required: true);
final ApplicationPackage applicationPackage = _applicationPackages[applicationPackageId];
return device.stopApp(
applicationPackage,
userIdentifier: _getStringArg(args, 'userIdentifier'),
);
}
/// Takes a screenshot.
Future<String> takeScreenshot(Map<String, dynamic> args) async {
final String deviceId = _getStringArg(args, 'deviceId', required: true);
final Device device = await daemon.deviceDomain._getDevice(deviceId);
if (device == null) {
throw "device '$deviceId' not found";
}
final String tempFileName = 'screenshot_${_id++}';
final File tempFile = daemon.proxyDomain.tempDirectory.childFile(tempFileName);
await device.takeScreenshot(tempFile);
if (await tempFile.exists()) {
final String imageBase64 = base64.encode(await tempFile.readAsBytes());
return imageBase64;
} else {
return null;
}
}
@override
Future<void> dispose() {
for (final PollingDeviceDiscovery discoverer in _discoverers) {
......@@ -939,6 +1089,16 @@ Future<Map<String, dynamic>> _deviceToMap(Device device) async {
'platformType': device.platformType?.toString(),
'ephemeral': device.ephemeral,
'emulatorId': await device.emulatorId,
'sdk': await device.sdkNameAndVersion,
'capabilities': <String, Object>{
'hotReload': device.supportsHotReload,
'hotRestart': device.supportsHotRestart,
'screenshot': device.supportsScreenshot,
'fastStart': device.supportsFastStart,
'flutterExit': device.supportsFlutterExit,
'hardwareRendering': await device.supportsHardwareRendering,
'startPaused': device.supportsStartPaused,
}
};
}
......@@ -1157,6 +1317,75 @@ class EmulatorDomain extends Domain {
}
}
class ProxyDomain extends Domain {
ProxyDomain(Daemon daemon) : super(daemon, 'proxy') {
registerHandlerWithBinary('writeTempFile', writeTempFile);
registerHandler('connect', connect);
registerHandler('disconnect', disconnect);
registerHandlerWithBinary('write', write);
}
final Map<String, Socket> _forwardedConnections = <String, Socket>{};
int _id = 0;
/// Writes to a file in a local temporary directory.
Future<void> writeTempFile(Map<String, dynamic> args, Stream<List<int>> binary) async {
final String path = _getStringArg(args, 'path', required: true);
final File file = tempDirectory.childFile(path);
await file.parent.create(recursive: true);
await file.openWrite().addStream(binary);
}
/// Opens a connection to a local port, and returns the connection id.
Future<String> connect(Map<String, dynamic> args) async {
final int targetPort = _getIntArg(args, 'port', required: true);
final String id = 'portForwarder_${targetPort}_${_id++}';
final Socket socket = await Socket.connect('127.0.0.1', targetPort);
_forwardedConnections[id] = socket;
socket.listen((List<int> data) {
sendEvent('proxy.data.$id', null, data);
});
unawaited(socket.done.then((dynamic _) {
sendEvent('proxy.disconnected.$id');
}));
return id;
}
/// Disconnects from a previously established connection.
Future<bool> disconnect(Map<String, dynamic> args) async {
final String id = _getStringArg(args, 'id', required: true);
if (_forwardedConnections.containsKey(id)) {
await _forwardedConnections.remove(id)?.close();
return true;
}
return false;
}
/// Writes to a previously established connection.
Future<bool> write(Map<String, dynamic> args, Stream<List<int>> binary) async {
final String id = _getStringArg(args, 'id', required: true);
if (_forwardedConnections.containsKey(id)) {
final StreamSubscription<List<int>> subscription = binary.listen(_forwardedConnections[id].add);
await subscription.asFuture<void>();
await subscription.cancel();
return true;
}
return false;
}
@override
Future<void> dispose() async {
for (final Socket connection in _forwardedConnections.values) {
await connection.close();
}
await _tempDirectory?.delete(recursive: true);
}
Directory _tempDirectory;
Directory get tempDirectory => _tempDirectory ??= globals.fs.systemTempDirectory.createTempSync('flutter_tool_daemon.');
}
/// A [Logger] which sends log messages to a listening daemon client.
///
/// This class can either:
......@@ -1232,11 +1461,11 @@ class AppRunLogger extends DelegatingLogger {
}
@override
void sendEvent(String name, [Map<String, dynamic> args]) {
void sendEvent(String name, [Map<String, dynamic> args, List<int> binary]) {
if (domain == null) {
printStatus('event sent after app closed: $name');
} else {
domain.sendEvent(name, args);
domain.sendEvent(name, args, binary);
}
}
......
......@@ -559,7 +559,7 @@ class RunCommand extends RunCommandBase {
}
final Daemon daemon = Daemon(
DaemonConnection(
daemonStreams: StdioDaemonStreams(globals.stdio),
daemonStreams: DaemonStreams.fromStdio(globals.stdio, logger: globals.logger),
logger: globals.logger,
),
notifyingLogger: (globals.logger is NotifyingLogger)
......
......@@ -3,6 +3,9 @@
// found in the LICENSE file.
import 'dart:async';
import 'dart:typed_data';
import 'package:meta/meta.dart';
import 'base/common.dart';
import 'base/io.dart';
......@@ -10,101 +13,211 @@ import 'base/logger.dart';
import 'base/utils.dart';
import 'convert.dart';
/// Parse binary streams in the JSON RPC format understood by the daemon, and
/// convert it into a stream of JSON RPC messages.
Stream<Map<String, Object?>> _convertInputStream(Stream<List<int>> inputStream) {
return utf8.decoder.bind(inputStream)
.transform<String>(const LineSplitter())
.where((String line) => line.startsWith('[{') && line.endsWith('}]'))
.map<Map<String, Object?>?>((String line) {
line = line.substring(1, line.length - 1);
return castStringKeyedMap(json.decode(line));
})
.where((Map<String, Object?>? entry) => entry != null)
.cast<Map<String, Object?>>();
/// A single message passed through the [DaemonConnection].
class DaemonMessage {
DaemonMessage(this.data, [this.binary]);
/// Content of the JSON message in the message.
final Map<String, Object?> data;
/// Stream of the binary content of the message.
///
/// Must be listened to if binary data is present.
final Stream<List<int>>? binary;
}
/// A stream that a [DaemonConnection] uses to communicate with each other.
abstract class DaemonStreams {
/// Stream that contains input to the [DaemonConnection].
Stream<Map<String, Object?>> get inputStream;
/// Data of an event passed through the [DaemonConnection].
class DaemonEventData {
DaemonEventData(this.eventName, this.data, [this.binary]);
/// Outputs a message through the connection.
void send(Map<String, Object?> message);
/// The name of the event.
final String eventName;
/// Cleans up any resources used.
Future<void> dispose() async { }
/// The data of the event.
final Object? data;
/// Stream of the binary content of the event.
///
/// Must be listened to if binary data is present.
final Stream<List<int>>? binary;
}
/// A [DaemonStream] that uses stdin and stdout as the underlying streams.
class StdioDaemonStreams extends DaemonStreams {
StdioDaemonStreams(Stdio stdio) :
_stdio = stdio,
inputStream = _convertInputStream(stdio.stdin);
const String _binaryLengthKey = '_binaryLength';
final Stdio _stdio;
enum _InputStreamParseState {
json,
binary,
}
@override
final Stream<Map<String, Object?>> inputStream;
/// Converts a binary stream to a stream of [DaemonMessage].
///
/// The daemon JSON-RPC protocol is defined as follows: every single line of
/// text that starts with `[{` and ends with `}]` will be parsed as a JSON
/// message. The array should contain only one single object which contains the
/// message data.
///
/// If the JSON object contains the key [_binaryLengthKey] with an integer
/// value (will be refered to as N), the following N bytes after the newline
/// character will contain the binary part of the message.
@visibleForTesting
class DaemonInputStreamConverter {
DaemonInputStreamConverter(this.inputStream) {
// Lazily listen to the input stream.
_controller.onListen = () {
final StreamSubscription<List<int>> subscription = inputStream.listen((List<int> chunk) {
_processChunk(chunk);
}, onError: (Object error, StackTrace stackTrace) {
_controller.addError(error, stackTrace);
}, onDone: () {
unawaited(_controller.close());
});
@override
void send(Map<String, Object?> message) {
_stdio.stdoutWrite(
'[${json.encode(message)}]\n',
fallback: (String message, Object? error, StackTrace stack) {
throwToolExit('Failed to write daemon command response to stdout: $error');
},
);
_controller.onCancel = subscription.cancel;
// We should not handle onPause or onResume. When the stream is paused, we
// still need to read from the input stream.
};
}
}
/// A [DaemonStream] that uses [Socket] as the underlying stream.
class TcpDaemonStreams extends DaemonStreams {
/// Creates a [DaemonStreams] with an existing [Socket].
TcpDaemonStreams(
Socket socket, {
required Logger logger,
}): _logger = logger {
_socket = Future<Socket>.value(_initializeSocket(socket));
final Stream<List<int>> inputStream;
final StreamController<DaemonMessage> _controller = StreamController<DaemonMessage>();
Stream<DaemonMessage> get convertedStream => _controller.stream;
// Internal states
/// The current parse state, whether we are expecting JSON or binary data.
_InputStreamParseState state = _InputStreamParseState.json;
/// The binary stream that is being transferred.
late StreamController<List<int>> currentBinaryStream;
/// Remaining length in bytes that have to be sent to the binary stream.
int remainingBinaryLength = 0;
/// Buffer to hold the current line of input data.
final BytesBuilder bytesBuilder = BytesBuilder(copy: false);
// Processes a single chunk received in the input stream.
void _processChunk(List<int> chunk) {
const int LF = 10; // The '\n' character
int start = 0;
while (start < chunk.length) {
if (state == _InputStreamParseState.json) {
// Search for newline character.
final int indexOfNewLine = chunk.indexOf(LF, start);
if (indexOfNewLine < 0) {
bytesBuilder.add(chunk.sublist(start));
start = chunk.length;
} else {
bytesBuilder.add(chunk.sublist(start, indexOfNewLine + 1));
start = indexOfNewLine + 1;
// Process chunk here
final Uint8List combinedChunk = bytesBuilder.takeBytes();
String jsonString = utf8.decode(combinedChunk).trim();
if (jsonString.startsWith('[{') && jsonString.endsWith('}]')) {
jsonString = jsonString.substring(1, jsonString.length - 1);
final Map<String, Object?>? value = castStringKeyedMap(json.decode(jsonString));
if (value != null) {
// Check if we need to consume another binary blob.
if (value[_binaryLengthKey] != null) {
remainingBinaryLength = value[_binaryLengthKey]! as int;
currentBinaryStream = StreamController<List<int>>();
state = _InputStreamParseState.binary;
_controller.add(DaemonMessage(value, currentBinaryStream.stream));
} else {
_controller.add(DaemonMessage(value));
}
}
}
}
} else if (state == _InputStreamParseState.binary) {
final int bytesSent = _addBinaryChunk(chunk, start, remainingBinaryLength);
start += bytesSent;
remainingBinaryLength -= bytesSent;
if (remainingBinaryLength <= 0) {
assert(remainingBinaryLength == 0);
unawaited(currentBinaryStream.close());
state = _InputStreamParseState.json;
}
}
}
}
/// Connects to a remote host and creates a [DaemonStreams] from the socket.
TcpDaemonStreams.connect(
String host,
int port, {
required Logger logger,
}) : _logger = logger {
_socket = Socket.connect(host, port).then(_initializeSocket);
int _addBinaryChunk(List<int> chunk, int start, int maximumSizeToRead) {
if (start == 0 && chunk.length <= remainingBinaryLength) {
currentBinaryStream.add(chunk);
return chunk.length;
} else {
final int chunkRemainingLength = chunk.length - start;
final int sizeToRead = chunkRemainingLength < remainingBinaryLength ? chunkRemainingLength : remainingBinaryLength;
currentBinaryStream.add(chunk.sublist(start, start + sizeToRead));
return sizeToRead;
}
}
}
late final Future<Socket> _socket;
final StreamController<Map<String, Object?>> _commands = StreamController<Map<String, Object?>>();
/// A stream that a [DaemonConnection] uses to communicate with each other.
class DaemonStreams {
DaemonStreams(
Stream<List<int>> rawInputStream,
StreamSink<List<int>> outputSink, {
required Logger logger,
}) :
_outputSink = outputSink,
inputStream = DaemonInputStreamConverter(rawInputStream).convertedStream,
_logger = logger;
final StreamSink<List<int>> _outputSink;
final Logger _logger;
@override
Stream<Map<String, Object?>> get inputStream => _commands.stream;
@override
void send(Map<String, Object?> message) {
_socket.then((Socket socket) {
try {
socket.write('[${json.encode(message)}]\n');
} on SocketException catch (error) {
_logger.printError('Failed to write daemon command response to socket: $error');
// Failed to send, close the connection
socket.close();
/// Stream that contains input to the [DaemonConnection].
final Stream<DaemonMessage> inputStream;
/// Outputs a message through the connection.
void send(Map<String, Object?> message, [ List<int>? binary ]) {
try {
if (binary != null) {
message[_binaryLengthKey] = binary.length;
}
});
_outputSink.add(utf8.encode('[${json.encode(message)}]\n'));
if (binary != null) {
_outputSink.add(binary);
}
} on IOException catch (error) {
_logger.printError('Failed to write daemon command response: $error');
// Failed to send, close the connection
_outputSink.close();
}
}
Socket _initializeSocket(Socket socket) {
_commands.addStream(_convertInputStream(socket));
return socket;
/// Cleans up any resources used.
Future<void> dispose() async {
unawaited(_outputSink.close());
}
@override
Future<void> dispose() async {
await (await _socket).close();
/// Creates a [DaemonStreams] that uses stdin and stdout as the underlying streams.
static DaemonStreams fromStdio(Stdio stdio, { required Logger logger }) {
return DaemonStreams(stdio.stdin, stdio.stdout, logger: logger);
}
/// Creates a [DaemonStreams] that uses [Socket] as the underlying streams.
static DaemonStreams fromSocket(Socket socket, { required Logger logger }) {
return DaemonStreams(socket, socket, logger: logger);
}
/// Connects to a server and creates a [DaemonStreams] from the connection as the underlying streams.
static DaemonStreams connect(String host, int port, { required Logger logger }) {
final Future<Socket> socketFuture = Socket.connect(host, port);
final StreamController<List<int>> inputStreamController = StreamController<List<int>>();
final StreamController<List<int>> outputStreamController = StreamController<List<int>>();
socketFuture.then((Socket socket) {
inputStreamController.addStream(socket);
socket.addStream(outputStreamController.stream);
});
return DaemonStreams(inputStreamController.stream, outputStreamController.sink, logger: logger);
}
}
......@@ -116,7 +229,7 @@ class DaemonConnection {
}): _logger = logger,
_daemonStreams = daemonStreams {
_commandSubscription = daemonStreams.inputStream.listen(
_handleData,
_handleMessage,
onError: (Object error, StackTrace stackTrace) {
// We have to listen for on error otherwise the error on the socket
// will end up in the Zone error handler.
......@@ -130,28 +243,27 @@ class DaemonConnection {
final Logger _logger;
late final StreamSubscription<Map<String, Object?>> _commandSubscription;
late final StreamSubscription<DaemonMessage> _commandSubscription;
int _outgoingRequestId = 0;
final Map<String, Completer<Object?>> _outgoingRequestCompleters = <String, Completer<Object?>>{};
final StreamController<Map<String, Object?>> _events = StreamController<Map<String, Object?>>.broadcast();
final StreamController<Map<String, Object?>> _incomingCommands = StreamController<Map<String, Object?>>();
final StreamController<DaemonEventData> _events = StreamController<DaemonEventData>.broadcast();
final StreamController<DaemonMessage> _incomingCommands = StreamController<DaemonMessage>();
/// A stream that contains all the incoming requests.
Stream<Map<String, Object?>> get incomingCommands => _incomingCommands.stream;
Stream<DaemonMessage> get incomingCommands => _incomingCommands.stream;
/// Listens to the event with the event name [eventToListen].
Stream<Object?> listenToEvent(String eventToListen) {
Stream<DaemonEventData> listenToEvent(String eventToListen) {
return _events.stream
.where((Map<String, Object?> event) => event['event'] == eventToListen)
.map<Object?>((Map<String, Object?> event) => event['params']);
.where((DaemonEventData event) => event.eventName == eventToListen);
}
/// Sends a request to the other end of the connection.
///
/// Returns a [Future] that resolves with the content.
Future<Object?> sendRequest(String method, [Object? params]) async {
Future<Object?> sendRequest(String method, [Object? params, List<int>? binary]) async {
final String id = '${++_outgoingRequestId}';
final Completer<Object?> completer = Completer<Object?>();
_outgoingRequestCompleters[id] = completer;
......@@ -161,7 +273,7 @@ class DaemonConnection {
if (params != null) 'params': params,
};
_logger.printTrace('-> Sending to daemon, id = $id, method = $method');
_daemonStreams.send(data);
_daemonStreams.send(data, binary);
return completer.future;
}
......@@ -183,11 +295,11 @@ class DaemonConnection {
}
/// Sends an event to the client.
void sendEvent(String name, [ Object? params ]) {
void sendEvent(String name, [ Object? params, List<int>? binary ]) {
_daemonStreams.send(<String, Object?>{
'event': name,
if (params != null) 'params': params,
});
}, binary);
}
/// Handles the input from the stream.
......@@ -203,7 +315,8 @@ class DaemonConnection {
///
/// Event:
/// {"event": <String>. "params": <optional, Object?>}
void _handleData(Map<String, Object?> data) {
void _handleMessage(DaemonMessage message) {
final Map<String, Object?> data = message.data;
if (data['id'] != null) {
if (data['method'] == null) {
// This is a response to previously sent request.
......@@ -220,12 +333,21 @@ class DaemonConnection {
_outgoingRequestCompleters.remove(id)?.complete(result);
}
} else {
_incomingCommands.add(data);
_incomingCommands.add(message);
}
} else if (data['event'] != null) {
// This is an event
_logger.printTrace('<- Event received: ${data['event']}');
_events.add(data);
final Object? eventName = data['event'];
if (eventName is String) {
_events.add(DaemonEventData(
eventName,
data['params'],
message.binary,
));
} else {
throwToolExit('event name received is not string!');
}
} else {
_logger.printError('Unknown data received from daemon');
}
......
......@@ -37,6 +37,18 @@ class Category {
@override
String toString() => value;
static Category? fromString(String category) {
switch (category) {
case 'web':
return web;
case 'desktop':
return desktop;
case 'mobile':
return mobile;
}
return null;
}
}
/// The platform sub-folder that a device type supports.
......@@ -56,6 +68,28 @@ class PlatformType {
@override
String toString() => value;
static PlatformType? fromString(String platformType) {
switch (platformType) {
case 'web':
return web;
case 'android':
return android;
case 'ios':
return ios;
case 'linux':
return linux;
case 'macos':
return macos;
case 'windows':
return windows;
case 'fuchsia':
return fuchsia;
case 'custom':
return custom;
}
return null;
}
}
/// A discovery mechanism for flutter-supported development devices.
......@@ -809,6 +843,47 @@ class DebuggingOptions {
nullAssertions = false,
nativeNullAssertions = false;
DebuggingOptions._({
required this.buildInfo,
required this.debuggingEnabled,
required this.startPaused,
required this.dartFlags,
required this.dartEntrypointArgs,
required this.disableServiceAuthCodes,
required this.enableDds,
required this.enableSoftwareRendering,
required this.skiaDeterministicRendering,
required this.traceSkia,
required this.traceAllowlist,
required this.traceSkiaAllowlist,
required this.traceSystrace,
required this.endlessTraceBuffer,
required this.dumpSkpOnShaderCompilation,
required this.cacheSkSL,
required this.purgePersistentCache,
required this.useTestFonts,
required this.verboseSystemLogs,
required this.hostVmServicePort,
required this.deviceVmServicePort,
required this.disablePortPublication,
required this.ddsPort,
required this.devToolsServerAddress,
required this.port,
required this.hostname,
required this.webEnableExposeUrl,
required this.webUseSseForDebugProxy,
required this.webUseSseForDebugBackend,
required this.webUseSseForInjectedClient,
required this.webRunHeadless,
required this.webBrowserDebugPort,
required this.webEnableExpressionEvaluation,
required this.webLaunchUrl,
required this.vmserviceOutFile,
required this.fastStart,
required this.nullAssertions,
required this.nativeNullAssertions,
});
final bool debuggingEnabled;
final BuildInfo buildInfo;
......@@ -870,6 +945,88 @@ class DebuggingOptions {
final bool nativeNullAssertions;
bool get hasObservatoryPort => hostVmServicePort != null;
Map<String, Object?> toJson() => <String, Object?>{
'debuggingEnabled': debuggingEnabled,
'startPaused': startPaused,
'dartFlags': dartFlags,
'dartEntrypointArgs': dartEntrypointArgs,
'disableServiceAuthCodes': disableServiceAuthCodes,
'enableDds': enableDds,
'enableSoftwareRendering': enableSoftwareRendering,
'skiaDeterministicRendering': skiaDeterministicRendering,
'traceSkia': traceSkia,
'traceAllowlist': traceAllowlist,
'traceSkiaAllowlist': traceSkiaAllowlist,
'traceSystrace': traceSystrace,
'endlessTraceBuffer': endlessTraceBuffer,
'dumpSkpOnShaderCompilation': dumpSkpOnShaderCompilation,
'cacheSkSL': cacheSkSL,
'purgePersistentCache': purgePersistentCache,
'useTestFonts': useTestFonts,
'verboseSystemLogs': verboseSystemLogs,
'hostVmServicePort': hostVmServicePort,
'deviceVmServicePort': deviceVmServicePort,
'disablePortPublication': disablePortPublication,
'ddsPort': ddsPort,
'devToolsServerAddress': devToolsServerAddress.toString(),
'port': port,
'hostname': hostname,
'webEnableExposeUrl': webEnableExposeUrl,
'webUseSseForDebugProxy': webUseSseForDebugProxy,
'webUseSseForDebugBackend': webUseSseForDebugBackend,
'webUseSseForInjectedClient': webUseSseForInjectedClient,
'webRunHeadless': webRunHeadless,
'webBrowserDebugPort': webBrowserDebugPort,
'webEnableExpressionEvaluation': webEnableExpressionEvaluation,
'webLaunchUrl': webLaunchUrl,
'vmserviceOutFile': vmserviceOutFile,
'fastStart': fastStart,
'nullAssertions': nullAssertions,
'nativeNullAssertions': nativeNullAssertions,
};
static DebuggingOptions fromJson(Map<String, Object?> json, BuildInfo buildInfo) =>
DebuggingOptions._(
buildInfo: buildInfo,
debuggingEnabled: (json['debuggingEnabled'] as bool?)!,
startPaused: (json['startPaused'] as bool?)!,
dartFlags: (json['dartFlags'] as String?)!,
dartEntrypointArgs: ((json['dartEntrypointArgs'] as List<String>?)?.cast<String>())!,
disableServiceAuthCodes: (json['disableServiceAuthCodes'] as bool?)!,
enableDds: (json['enableDds'] as bool?)!,
enableSoftwareRendering: (json['enableSoftwareRendering'] as bool?)!,
skiaDeterministicRendering: (json['skiaDeterministicRendering'] as bool?)!,
traceSkia: (json['traceSkia'] as bool?)!,
traceAllowlist: json['traceAllowlist'] as String?,
traceSkiaAllowlist: json['traceSkiaAllowlist'] as String?,
traceSystrace: (json['traceSystrace'] as bool?)!,
endlessTraceBuffer: (json['endlessTraceBuffer'] as bool?)!,
dumpSkpOnShaderCompilation: (json['dumpSkpOnShaderCompilation'] as bool?)!,
cacheSkSL: (json['cacheSkSL'] as bool?)!,
purgePersistentCache: (json['purgePersistentCache'] as bool?)!,
useTestFonts: (json['useTestFonts'] as bool?)!,
verboseSystemLogs: (json['verboseSystemLogs'] as bool?)!,
hostVmServicePort: json['hostVmServicePort'] as int? ,
deviceVmServicePort: json['deviceVmServicePort'] as int?,
disablePortPublication: (json['disablePortPublication'] as bool?)!,
ddsPort: json['ddsPort'] as int?,
devToolsServerAddress: json['devToolsServerAddress'] != null ? Uri.parse(json['devToolsServerAddress']! as String) : null,
port: json['port'] as String?,
hostname: json['hostname'] as String?,
webEnableExposeUrl: json['webEnableExposeUrl'] as bool?,
webUseSseForDebugProxy: (json['webUseSseForDebugProxy'] as bool?)!,
webUseSseForDebugBackend: (json['webUseSseForDebugBackend'] as bool?)!,
webUseSseForInjectedClient: (json['webUseSseForInjectedClient'] as bool?)!,
webRunHeadless: (json['webRunHeadless'] as bool?)!,
webBrowserDebugPort: json['webBrowserDebugPort'] as int?,
webEnableExpressionEvaluation: (json['webEnableExpressionEvaluation'] as bool?)!,
webLaunchUrl: json['webLaunchUrl'] as String?,
vmserviceOutFile: json['vmserviceOutFile'] as String?,
fastStart: (json['fastStart'] as bool?)!,
nullAssertions: (json['nullAssertions'] as bool?)!,
nativeNullAssertions: (json['nativeNullAssertions'] as bool?)!,
);
}
class LaunchResult {
......
// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
import 'dart:async';
import 'dart:typed_data';
import '../application_package.dart';
import '../base/common.dart';
import '../base/file_system.dart';
import '../base/io.dart';
import '../base/logger.dart';
import '../build_info.dart';
import '../convert.dart';
import '../daemon.dart';
import '../device.dart';
import '../device_port_forwarder.dart';
import '../project.dart';
bool _isNullable<T>() => null is T;
T _cast<T>(Object? object) {
if (!_isNullable<T>() && object == null) {
throw Exception('Expected $T, received null!');
} else {
return object as T;
}
}
/// A [DeviceDiscovery] that will connect to a flutter daemon and connects to
/// the devices remotely.
class ProxiedDevices extends DeviceDiscovery {
ProxiedDevices(this.connection, {
required Logger logger,
}) : _logger = logger;
/// [DaemonConnection] used to communicate with the daemon.
final DaemonConnection connection;
final Logger _logger;
@override
bool get supportsPlatform => true;
@override
bool get canListAnything => true;
List<Device>? _devices;
@override
Future<List<Device>> get devices async =>
_devices ?? await discoverDevices();
@override
Future<List<Device>> discoverDevices({Duration? timeout}) async {
final List<Map<String, Object?>> discoveredDevices = _cast<List<dynamic>>(await connection.sendRequest('device.discoverDevices')).cast<Map<String, Object?>>();
final List<ProxiedDevice> devices = <ProxiedDevice>[
for (final Map<String, Object?> device in discoveredDevices)
_deviceFromDaemonResult(device),
];
_devices = devices;
return devices;
}
@override
List<String> get wellKnownIds => const <String>[];
ProxiedDevice _deviceFromDaemonResult(Map<String, Object?> device) {
final Map<String, Object?> capabilities = _cast<Map<String, Object?>>(device['capabilities']);
return ProxiedDevice(
connection, _cast<String>(device['id']),
category: Category.fromString(_cast<String>(device['category'])),
platformType: PlatformType.fromString(_cast<String>(device['platformType'])),
targetPlatform: getTargetPlatformForName(_cast<String>(device['platform'])),
ephemeral: _cast<bool>(device['ephemeral']),
name: 'Proxied ${device['name']}',
isLocalEmulator: _cast<bool>(device['emulator']),
emulatorId: _cast<String?>(device['emulatorId']),
sdkNameAndVersion: _cast<String>(device['sdk']),
supportsHotReload: _cast<bool>(capabilities['hotReload']),
supportsHotRestart: _cast<bool>(capabilities['hotRestart']),
supportsFlutterExit: _cast<bool>(capabilities['flutterExit']),
supportsScreenshot: _cast<bool>(capabilities['screenshot']),
supportsFastStart: _cast<bool>(capabilities['fastStart']),
supportsHardwareRendering: _cast<bool>(capabilities['hardwareRendering']),
logger: _logger,
);
}
}
/// A [Device] that acts as a proxy to remotely connected device.
///
/// The communication happens via a flutter daemon.
class ProxiedDevice extends Device {
ProxiedDevice(this.connection, String id, {
required Category? category,
required PlatformType? platformType,
required TargetPlatform targetPlatform,
required bool ephemeral,
required this.name,
required bool isLocalEmulator,
required String? emulatorId,
required String sdkNameAndVersion,
required this.supportsHotReload,
required this.supportsHotRestart,
required this.supportsFlutterExit,
required this.supportsScreenshot,
required this.supportsFastStart,
required bool supportsHardwareRendering,
required Logger logger,
}): _isLocalEmulator = isLocalEmulator,
_emulatorId = emulatorId,
_sdkNameAndVersion = sdkNameAndVersion,
_supportsHardwareRendering = supportsHardwareRendering,
_targetPlatform = targetPlatform,
_logger = logger,
super(id,
category: category,
platformType: platformType,
ephemeral: ephemeral);
/// [DaemonConnection] used to communicate with the daemon.
final DaemonConnection connection;
final Logger _logger;
@override
final String name;
final bool _isLocalEmulator;
@override
Future<bool> get isLocalEmulator async => _isLocalEmulator;
final String? _emulatorId;
@override
Future<String?> get emulatorId async => _emulatorId;
@override
Future<bool> supportsRuntimeMode(BuildMode buildMode) async =>
_cast<bool>(await connection.sendRequest('device.supportsRuntimeMode', <String, Object>{
'deviceId': id,
'buildMode': buildMode.toString(),
}));
final bool _supportsHardwareRendering;
@override
Future<bool> get supportsHardwareRendering async => _supportsHardwareRendering;
// ProxiedDevice is intended to be used with prebuilt projects. No building
// is required, so we returns true for all projects.
@override
bool isSupportedForProject(FlutterProject flutterProject) => true;
@override
Future<bool> isAppInstalled(
covariant ApplicationPackage app, {
String? userIdentifier,
}) => throw UnimplementedError();
@override
Future<bool> isLatestBuildInstalled(covariant ApplicationPackage app) => throw UnimplementedError();
@override
Future<bool> installApp(
covariant ApplicationPackage app, {
String? userIdentifier,
}) => throw UnimplementedError();
@override
Future<bool> uninstallApp(
covariant ApplicationPackage app, {
String? userIdentifier,
}) => throw UnimplementedError();
@override
bool isSupported() => true;
final TargetPlatform _targetPlatform;
@override
Future<TargetPlatform> get targetPlatform async => _targetPlatform;
TargetPlatform get targetPlatformSync => _targetPlatform;
final String _sdkNameAndVersion;
@override
Future<String> get sdkNameAndVersion async => _sdkNameAndVersion;
@override
FutureOr<DeviceLogReader> getLogReader({
covariant PrebuiltApplicationPackage? app,
bool includePastLogs = false,
}) => _ProxiedLogReader(connection, this, app);
_ProxiedPortForwarder? _proxiedPortForwarder;
_ProxiedPortForwarder get proxiedPortForwarder => _proxiedPortForwarder ??= _ProxiedPortForwarder(connection, logger: _logger);
@override
DevicePortForwarder get portForwarder => throw UnimplementedError;
@override
void clearLogs() => throw UnimplementedError();
@override
Future<LaunchResult> startApp(
covariant PrebuiltApplicationPackage package, {
String? mainPath,
String? route,
required DebuggingOptions debuggingOptions,
Map<String, Object?> platformArgs = const <String, Object?>{},
bool prebuiltApplication = false,
bool ipv6 = false,
String? userIdentifier,
}) async {
final Map<String, Object?> result = _cast<Map<String, Object?>>(await connection.sendRequest('device.startApp', <String, Object?>{
'deviceId': id,
'applicationPackageId': await applicationPackageId(package),
'mainPath': mainPath,
'route': route,
'debuggingOptions': debuggingOptions.toJson(),
'platformArgs': platformArgs,
'prebuiltApplication': prebuiltApplication,
'ipv6': ipv6,
'userIdentifier': userIdentifier,
}));
final bool started = _cast<bool>(result['started']);
final String? observatoryUriStr = _cast<String?>(result['observatoryUri']);
final Uri? observatoryUri = observatoryUriStr == null ? null : Uri.parse(observatoryUriStr);
if (started) {
if (observatoryUri != null) {
final int hostPort = await proxiedPortForwarder.forward(observatoryUri.port);
return LaunchResult.succeeded(observatoryUri: observatoryUri.replace(port: hostPort));
} else {
return LaunchResult.succeeded();
}
} else {
return LaunchResult.failed();
}
}
@override
final bool supportsHotReload;
@override
final bool supportsHotRestart;
@override
final bool supportsFlutterExit;
@override
final bool supportsScreenshot;
@override
final bool supportsFastStart;
@override
Future<bool> stopApp(
covariant PrebuiltApplicationPackage app, {
String? userIdentifier,
}) async {
return _cast<bool>(await connection.sendRequest('device.stopApp', <String, Object?>{
'deviceId': id,
'applicationPackageId': await applicationPackageId(app),
'userIdentifier': userIdentifier,
}));
}
@override
Future<MemoryInfo> queryMemoryInfo() => throw UnimplementedError();
@override
Future<void> takeScreenshot(File outputFile) async {
final String imageBase64 = _cast<String>(await connection.sendRequest('device.takeScreenshot', <String, Object?>{
'deviceId': id,
}));
await outputFile.writeAsBytes(base64.decode(imageBase64));
}
@override
Future<void> dispose() async {}
final Map<String, Future<String>> _applicationPackageMap = <String, Future<String>>{};
Future<String> applicationPackageId(PrebuiltApplicationPackage package) async {
final File binary = package.applicationPackage as File;
final String path = binary.absolute.path;
if (_applicationPackageMap.containsKey(path)) {
return _applicationPackageMap[path]!;
}
final String fileName = binary.basename;
final Completer<String> idCompleter = Completer<String>();
_applicationPackageMap[path] = idCompleter.future;
await connection.sendRequest('proxy.writeTempFile', <String, Object>{
'path': fileName,
}, await binary.readAsBytes());
final String id = _cast<String>(await connection.sendRequest('device.uploadApplicationPackage', <String, Object>{
'targetPlatform': getNameForTargetPlatform(_targetPlatform),
'applicationBinary': fileName,
}));
idCompleter.complete(id);
return id;
}
}
/// A [DeviceLogReader] for a proxied device.
class _ProxiedLogReader extends DeviceLogReader {
_ProxiedLogReader(this.connection, this.device, this.applicationPackage);
final DaemonConnection connection;
final ProxiedDevice device;
final PrebuiltApplicationPackage? applicationPackage;
@override
String get name => device.name;
final StreamController<String> _logLinesStreamController = StreamController<String>();
Stream<String>? _logLines;
String? _id;
@override
Stream<String> get logLines => _logLines ??= _start();
Stream<String> _start() {
final PrebuiltApplicationPackage? package = applicationPackage;
final Future<String?> applicationPackageId = package != null ? device.applicationPackageId(package) : Future<String?>.value();
final Future<String> idFuture = applicationPackageId.then((String? applicationPackageId) async =>
_cast<String>(await connection.sendRequest('device.logReader.start', <String, Object>{
'deviceId': device.id,
if (applicationPackageId != null)
'applicationPackageId': applicationPackageId,
})));
idFuture.then((String id) {
_id = id;
final Stream<String> stream = connection.listenToEvent('device.logReader.logLines.$_id').map((DaemonEventData event) => event.data! as String);
_logLinesStreamController.addStream(stream);
});
return _logLinesStreamController.stream;
}
@override
void dispose() {
if (_id != null) {
connection.sendRequest('device.logReader.stop', <String, Object?>{
'id': _id,
});
}
}
}
/// A [DevicePortForwarder] for a proxied device.
class _ProxiedPortForwarder extends DevicePortForwarder {
_ProxiedPortForwarder(this.connection, {
required Logger logger,
}) : _logger = logger;
DaemonConnection connection;
final Logger _logger;
@override
final List<ForwardedPort> forwardedPorts = <ForwardedPort>[];
final List<Socket> _connectedSockets = <Socket>[];
final Map<int, ServerSocket> _serverSockets = <int, ServerSocket>{};
@override
Future<int> forward(int devicePort, { int? hostPort }) async {
final ServerSocket serverSocket = await ServerSocket.bind(InternetAddress.loopbackIPv4, hostPort ?? 0);
_serverSockets[serverSocket.port] = serverSocket;
forwardedPorts.add(ForwardedPort(serverSocket.port, devicePort));
serverSocket.listen((Socket socket) async {
final String id = _cast<String>(await connection.sendRequest('proxy.connect', <String, Object>{
'port': devicePort,
}));
final Stream<List<int>> dataStream = connection.listenToEvent('proxy.data.$id').asyncExpand((DaemonEventData event) => event.binary);
dataStream.listen(socket.add);
final Future<DaemonEventData> disconnectFuture = connection.listenToEvent('proxy.disconnected.$id').first;
unawaited(disconnectFuture.then((_) {
socket.close();
}));
socket.listen((Uint8List data) {
connection.sendRequest('proxy.write', <String, Object>{
'id': id,
}, data);
});
_connectedSockets.add(socket);
unawaited(socket.done.then((dynamic value) {
connection.sendRequest('proxy.disconnect', <String, Object>{
'id': id,
});
_connectedSockets.remove(socket);
}).onError((Object? error, StackTrace stackTrace) {
connection.sendRequest('proxy.disconnect', <String, Object>{
'id': id,
});
_connectedSockets.remove(socket);
}));
}, onError: (Object error, StackTrace stackTrace) {
_logger.printWarning('Server socket error: $error');
_logger.printTrace('Server socket error: $error, stack trace: $stackTrace');
});
return serverSocket.port;
}
@override
Future<void> unforward(ForwardedPort forwardedPort) async {
if (!forwardedPorts.remove(forwardedPort)) {
// Not in list. Nothing to remove.
return;
}
forwardedPort.dispose();
final ServerSocket? serverSocket = _serverSockets.remove(forwardedPort.hostPort);
await serverSocket?.close();
}
@override
Future<void> dispose() async {
for (final ForwardedPort forwardedPort in forwardedPorts) {
forwardedPort.dispose();
}
for (final ServerSocket serverSocket in _serverSockets.values) {
await serverSocket.close();
}
for (final Socket socket in _connectedSockets) {
await socket.close();
}
}
}
......@@ -13,8 +13,10 @@
import 'dart:async';
import 'package:fake_async/fake_async.dart';
import 'package:file/src/interface/file.dart';
import 'package:flutter_tools/src/android/android_device.dart';
import 'package:flutter_tools/src/android/android_workflow.dart';
import 'package:flutter_tools/src/application_package.dart';
import 'package:flutter_tools/src/base/common.dart';
import 'package:flutter_tools/src/base/logger.dart';
import 'package:flutter_tools/src/base/utils.dart';
......@@ -27,6 +29,7 @@ import 'package:flutter_tools/src/fuchsia/fuchsia_workflow.dart';
import 'package:flutter_tools/src/globals.dart' as globals;
import 'package:flutter_tools/src/ios/ios_workflow.dart';
import 'package:flutter_tools/src/resident_runner.dart';
import 'package:flutter_tools/src/vmservice.dart';
import 'package:test/fake.dart';
import '../../src/common.dart';
......@@ -49,18 +52,18 @@ Future<T> _runFakeAsync<T>(Future<T> Function(FakeAsync time) f) async {
});
}
class FakeDaemonStreams extends DaemonStreams {
final StreamController<Map<String, dynamic>> inputs = StreamController<Map<String, dynamic>>();
final StreamController<Map<String, dynamic>> outputs = StreamController<Map<String, dynamic>>();
class FakeDaemonStreams implements DaemonStreams {
final StreamController<DaemonMessage> inputs = StreamController<DaemonMessage>();
final StreamController<DaemonMessage> outputs = StreamController<DaemonMessage>();
@override
Stream<Map<String, dynamic>> get inputStream {
Stream<DaemonMessage> get inputStream {
return inputs.stream;
}
@override
void send(Map<String, dynamic> message) {
outputs.add(message);
void send(Map<String, dynamic> message, [ List<int> binary ]) {
outputs.add(DaemonMessage(message, binary != null ? Stream<List<int>>.value(binary) : null));
}
@override
......@@ -102,11 +105,11 @@ void main() {
daemonConnection,
notifyingLogger: notifyingLogger,
);
daemonStreams.inputs.add(<String, dynamic>{'id': 0, 'method': 'daemon.version'});
final Map<String, dynamic> response = await daemonStreams.outputs.stream.firstWhere(_notEvent);
expect(response['id'], 0);
expect(response['result'], isNotEmpty);
expect(response['result'], isA<String>());
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': 0, 'method': 'daemon.version'}));
final DaemonMessage response = await daemonStreams.outputs.stream.firstWhere(_notEvent);
expect(response.data['id'], 0);
expect(response.data['result'], isNotEmpty);
expect(response.data['result'], isA<String>());
});
testUsingContext('daemon.getSupportedPlatforms command should succeed', () async {
......@@ -117,12 +120,16 @@ void main() {
// Use the flutter_gallery project which has a known set of supported platforms.
final String projectPath = globals.fs.path.join(getFlutterRoot(), 'dev', 'integration_tests', 'flutter_gallery');
daemonStreams.inputs.add(<String, dynamic>{'id': 0, 'method': 'daemon.getSupportedPlatforms', 'params': <String, Object>{'projectRoot': projectPath}});
final Map<String, dynamic> response = await daemonStreams.outputs.stream.firstWhere(_notEvent);
expect(response['id'], 0);
expect(response['result'], isNotEmpty);
expect((response['result'] as Map<String, dynamic>)['platforms'], <String>{'macos'});
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{
'id': 0,
'method': 'daemon.getSupportedPlatforms',
'params': <String, Object>{'projectRoot': projectPath},
}));
final DaemonMessage response = await daemonStreams.outputs.stream.firstWhere(_notEvent);
expect(response.data['id'], 0);
expect(response.data['result'], isNotEmpty);
expect((response.data['result'] as Map<String, dynamic>)['platforms'], <String>{'macos'});
}, overrides: <Type, Generator>{
// Disable Android/iOS and enable macOS to make sure result is consistent and defaults are tested off.
FeatureFlags: () => TestFeatureFlags(isAndroidEnabled: false, isIOSEnabled: false, isMacOSEnabled: true),
......@@ -134,12 +141,12 @@ void main() {
notifyingLogger: notifyingLogger,
);
globals.printError('daemon.logMessage test');
final Map<String, dynamic> response = await daemonStreams.outputs.stream.firstWhere((Map<String, dynamic> map) {
return map['event'] == 'daemon.logMessage' && (map['params'] as Map<String, dynamic>)['level'] == 'error';
final DaemonMessage response = await daemonStreams.outputs.stream.firstWhere((DaemonMessage message) {
return message.data['event'] == 'daemon.logMessage' && (message.data['params'] as Map<String, dynamic>)['level'] == 'error';
});
expect(response['id'], isNull);
expect(response['event'], 'daemon.logMessage');
final Map<String, String> logMessage = castStringKeyedMap(response['params']).cast<String, String>();
expect(response.data['id'], isNull);
expect(response.data['event'], 'daemon.logMessage');
final Map<String, String> logMessage = castStringKeyedMap(response.data['params']).cast<String, String>();
expect(logMessage['level'], 'error');
expect(logMessage['message'], 'daemon.logMessage test');
}, overrides: <Type, Generator>{
......@@ -152,12 +159,12 @@ void main() {
notifyingLogger: notifyingLogger,
);
globals.printWarning('daemon.logMessage test');
final Map<String, dynamic> response = await daemonStreams.outputs.stream.firstWhere((Map<String, dynamic> map) {
return map['event'] == 'daemon.logMessage' && (map['params'] as Map<String, dynamic>)['level'] == 'warning';
final DaemonMessage response = await daemonStreams.outputs.stream.firstWhere((DaemonMessage message) {
return message.data['event'] == 'daemon.logMessage' && (message.data['params'] as Map<String, dynamic>)['level'] == 'warning';
});
expect(response['id'], isNull);
expect(response['event'], 'daemon.logMessage');
final Map<String, String> logMessage = castStringKeyedMap(response['params']).cast<String, String>();
expect(response.data['id'], isNull);
expect(response.data['event'], 'daemon.logMessage');
final Map<String, String> logMessage = castStringKeyedMap(response.data['params']).cast<String, String>();
expect(logMessage['level'], 'warning');
expect(logMessage['message'], 'daemon.logMessage test');
}, overrides: <Type, Generator>{
......@@ -201,7 +208,7 @@ void main() {
daemonConnection,
notifyingLogger: notifyingLogger,
);
daemonStreams.inputs.add(<String, dynamic>{'id': 0, 'method': 'daemon.shutdown'});
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': 0, 'method': 'daemon.shutdown'}));
return daemon.onExit.then<void>((int code) async {
await daemonStreams.inputs.close();
expect(code, 0);
......@@ -214,10 +221,10 @@ void main() {
notifyingLogger: notifyingLogger,
);
daemonStreams.inputs.add(<String, dynamic>{'id': 0, 'method': 'app.restart'});
final Map<String, dynamic> response = await daemonStreams.outputs.stream.firstWhere(_notEvent);
expect(response['id'], 0);
expect(response['error'], contains('appId is required'));
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': 0, 'method': 'app.restart'}));
final DaemonMessage response = await daemonStreams.outputs.stream.firstWhere(_notEvent);
expect(response.data['id'], 0);
expect(response.data['error'], contains('appId is required'));
});
testUsingContext('ext.flutter.debugPaint via service extension without an appId should report an error', () async {
......@@ -226,16 +233,16 @@ void main() {
notifyingLogger: notifyingLogger,
);
daemonStreams.inputs.add(<String, dynamic>{
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{
'id': 0,
'method': 'app.callServiceExtension',
'params': <String, String>{
'methodName': 'ext.flutter.debugPaint',
},
});
final Map<String, dynamic> response = await daemonStreams.outputs.stream.firstWhere(_notEvent);
expect(response['id'], 0);
expect(response['error'], contains('appId is required'));
}));
final DaemonMessage response = await daemonStreams.outputs.stream.firstWhere(_notEvent);
expect(response.data['id'], 0);
expect(response.data['error'], contains('appId is required'));
});
testUsingContext('app.stop without appId should report an error', () async {
......@@ -244,10 +251,10 @@ void main() {
notifyingLogger: notifyingLogger,
);
daemonStreams.inputs.add(<String, dynamic>{'id': 0, 'method': 'app.stop'});
final Map<String, dynamic> response = await daemonStreams.outputs.stream.firstWhere(_notEvent);
expect(response['id'], 0);
expect(response['error'], contains('appId is required'));
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': 0, 'method': 'app.stop'}));
final DaemonMessage response = await daemonStreams.outputs.stream.firstWhere(_notEvent);
expect(response.data['id'], 0);
expect(response.data['error'], contains('appId is required'));
});
testUsingContext('device.getDevices should respond with list', () async {
......@@ -255,10 +262,10 @@ void main() {
daemonConnection,
notifyingLogger: notifyingLogger,
);
daemonStreams.inputs.add(<String, dynamic>{'id': 0, 'method': 'device.getDevices'});
final Map<String, dynamic> response = await daemonStreams.outputs.stream.firstWhere(_notEvent);
expect(response['id'], 0);
expect(response['result'], isList);
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': 0, 'method': 'device.getDevices'}));
final DaemonMessage response = await daemonStreams.outputs.stream.firstWhere(_notEvent);
expect(response.data['id'], 0);
expect(response.data['result'], isList);
});
testUsingContext('device.getDevices reports available devices', () async {
......@@ -269,10 +276,10 @@ void main() {
final FakePollingDeviceDiscovery discoverer = FakePollingDeviceDiscovery();
daemon.deviceDomain.addDeviceDiscoverer(discoverer);
discoverer.addDevice(FakeAndroidDevice());
daemonStreams.inputs.add(<String, dynamic>{'id': 0, 'method': 'device.getDevices'});
final Map<String, dynamic> response = await daemonStreams.outputs.stream.firstWhere(_notEvent);
expect(response['id'], 0);
final dynamic result = response['result'];
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': 0, 'method': 'device.getDevices'}));
final DaemonMessage response = await daemonStreams.outputs.stream.firstWhere(_notEvent);
expect(response.data['id'], 0);
final dynamic result = response.data['result'];
expect(result, isList);
expect(result, isNotEmpty);
});
......@@ -287,11 +294,11 @@ void main() {
daemon.deviceDomain.addDeviceDiscoverer(discoverer);
discoverer.addDevice(FakeAndroidDevice());
return daemonStreams.outputs.stream.skipWhile(_isConnectedEvent).first.then<void>((Map<String, dynamic> response) async {
expect(response['event'], 'device.added');
expect(response['params'], isMap);
return daemonStreams.outputs.stream.skipWhile(_isConnectedEvent).first.then<void>((DaemonMessage response) async {
expect(response.data['event'], 'device.added');
expect(response.data['params'], isMap);
final Map<String, dynamic> params = castStringKeyedMap(response['params']);
final Map<String, dynamic> params = castStringKeyedMap(response.data['params']);
expect(params['platform'], isNotEmpty); // the fake device has a platform of 'android-arm'
});
}, overrides: <Type, Generator>{
......@@ -300,16 +307,185 @@ void main() {
FuchsiaWorkflow: () => FakeFuchsiaWorkflow(),
});
testUsingContext('device.discoverDevices should respond with list', () async {
daemon = Daemon(
daemonConnection,
notifyingLogger: notifyingLogger,
);
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': 0, 'method': 'device.discoverDevices'}));
final DaemonMessage response = await daemonStreams.outputs.stream.firstWhere(_notEvent);
expect(response.data['id'], 0);
expect(response.data['result'], isList);
});
testUsingContext('device.discoverDevices reports available devices', () async {
daemon = Daemon(
daemonConnection,
notifyingLogger: notifyingLogger,
);
final FakePollingDeviceDiscovery discoverer = FakePollingDeviceDiscovery();
daemon.deviceDomain.addDeviceDiscoverer(discoverer);
discoverer.addDevice(FakeAndroidDevice());
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': 0, 'method': 'device.discoverDevices'}));
final DaemonMessage response = await daemonStreams.outputs.stream.firstWhere(_notEvent);
expect(response.data['id'], 0);
final dynamic result = response.data['result'];
expect(result, isList);
expect(result, isNotEmpty);
expect(discoverer.discoverDevicesCalled, true);
});
testUsingContext('device.supportsRuntimeMode returns correct value', () async {
daemon = Daemon(
daemonConnection,
notifyingLogger: notifyingLogger,
);
final FakePollingDeviceDiscovery discoverer = FakePollingDeviceDiscovery();
daemon.deviceDomain.addDeviceDiscoverer(discoverer);
final FakeAndroidDevice device = FakeAndroidDevice();
discoverer.addDevice(device);
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{
'id': 0,
'method': 'device.supportsRuntimeMode',
'params': <String, dynamic>{
'deviceId': 'device',
'buildMode': 'profile',
},
}));
final DaemonMessage response = await daemonStreams.outputs.stream.firstWhere(_notEvent);
expect(response.data['id'], 0);
final dynamic result = response.data['result'];
expect(result, true);
expect(device.supportsRuntimeModeCalledBuildMode, BuildMode.profile);
});
testUsingContext('device.logReader.start and .stop starts and stops log reader', () async {
daemon = Daemon(
daemonConnection,
notifyingLogger: notifyingLogger,
);
final FakePollingDeviceDiscovery discoverer = FakePollingDeviceDiscovery();
daemon.deviceDomain.addDeviceDiscoverer(discoverer);
final FakeAndroidDevice device = FakeAndroidDevice();
discoverer.addDevice(device);
final FakeDeviceLogReader logReader = FakeDeviceLogReader();
device.logReader = logReader;
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{
'id': 0,
'method': 'device.logReader.start',
'params': <String, dynamic>{
'deviceId': 'device',
},
}));
final Stream<DaemonMessage> broadcastOutput = daemonStreams.outputs.stream.asBroadcastStream();
final DaemonMessage firstResponse = await broadcastOutput.firstWhere(_notEvent);
expect(firstResponse.data['id'], 0);
final String logReaderId = firstResponse.data['result'] as String;
expect(logReaderId, isNotNull);
// Try sending logs.
logReader.logLinesController.add('Sample log line');
final DaemonMessage logEvent = await broadcastOutput.firstWhere(
(DaemonMessage message) => message.data['event'] != null && message.data['event'] != 'device.added',
);
expect(logEvent.data['params'], 'Sample log line');
// Now try to stop the log reader.
expect(logReader.disposeCalled, false);
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{
'id': 1,
'method': 'device.logReader.stop',
'params': <String, dynamic>{
'id': logReaderId,
},
}));
final DaemonMessage stopResponse = await broadcastOutput.firstWhere(_notEvent);
expect(stopResponse.data['id'], 1);
expect(logReader.disposeCalled, true);
});
group('device.startApp and .stopApp', () {
FakeApplicationPackageFactory applicationPackageFactory;
setUp(() {
applicationPackageFactory = FakeApplicationPackageFactory();
});
testUsingContext('device.startApp and .stopApp starts and stops an app', () async {
daemon = Daemon(
daemonConnection,
notifyingLogger: notifyingLogger,
);
final FakePollingDeviceDiscovery discoverer = FakePollingDeviceDiscovery();
daemon.deviceDomain.addDeviceDiscoverer(discoverer);
final FakeAndroidDevice device = FakeAndroidDevice();
discoverer.addDevice(device);
final Stream<DaemonMessage> broadcastOutput = daemonStreams.outputs.stream.asBroadcastStream();
// First upload the application package.
final FakeApplicationPackage applicationPackage = FakeApplicationPackage();
applicationPackageFactory.applicationPackage = applicationPackage;
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{
'id': 0,
'method': 'device.uploadApplicationPackage',
'params': <String, dynamic>{
'targetPlatform': 'android',
'applicationBinary': 'test_file',
},
}));
final DaemonMessage applicationPackageIdResponse = await broadcastOutput.firstWhere(_notEvent);
expect(applicationPackageIdResponse.data['id'], 0);
expect(applicationPackageFactory.applicationBinaryRequested.basename, 'test_file');
expect(applicationPackageFactory.platformRequested, TargetPlatform.android);
final String applicationPackageId = applicationPackageIdResponse.data['result'] as String;
// Try starting the app.
final Uri observatoryUri = Uri.parse('http://127.0.0.1:12345/observatory');
device.launchResult = LaunchResult.succeeded(observatoryUri: observatoryUri);
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{
'id': 1,
'method': 'device.startApp',
'params': <String, dynamic>{
'deviceId': 'device',
'applicationPackageId': applicationPackageId,
'debuggingOptions': DebuggingOptions.enabled(BuildInfo.debug).toJson(),
},
}));
final DaemonMessage startAppResponse = await broadcastOutput.firstWhere(_notEvent);
expect(startAppResponse.data['id'], 1);
expect(device.startAppPackage, applicationPackage);
final Map<String, dynamic> startAppResult = startAppResponse.data['result'] as Map<String, dynamic>;
expect(startAppResult['started'], true);
expect(startAppResult['observatoryUri'], observatoryUri.toString());
// Try stopping the app.
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{
'id': 2,
'method': 'device.stopApp',
'params': <String, dynamic>{
'deviceId': 'device',
'applicationPackageId': applicationPackageId,
},
}));
final DaemonMessage stopAppResponse = await broadcastOutput.firstWhere(_notEvent);
expect(stopAppResponse.data['id'], 2);
expect(device.stopAppPackage, applicationPackage);
final bool stopAppResult = stopAppResponse.data['result'] as bool;
expect(stopAppResult, true);
}, overrides: <Type, Generator>{
ApplicationPackageFactory: () => applicationPackageFactory,
});
});
testUsingContext('emulator.launch without an emulatorId should report an error', () async {
daemon = Daemon(
daemonConnection,
notifyingLogger: notifyingLogger,
);
daemonStreams.inputs.add(<String, dynamic>{'id': 0, 'method': 'emulator.launch'});
final Map<String, dynamic> response = await daemonStreams.outputs.stream.firstWhere(_notEvent);
expect(response['id'], 0);
expect(response['error'], contains('emulatorId is required'));
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': 0, 'method': 'emulator.launch'}));
final DaemonMessage response = await daemonStreams.outputs.stream.firstWhere(_notEvent);
expect(response.data['id'], 0);
expect(response.data['error'], contains('emulatorId is required'));
});
testUsingContext('emulator.launch coldboot parameter must be boolean', () async {
......@@ -318,10 +494,10 @@ void main() {
notifyingLogger: notifyingLogger,
);
final Map<String, dynamic> params = <String, dynamic>{'emulatorId': 'device', 'coldBoot': 1};
daemonStreams.inputs.add(<String, dynamic>{'id': 0, 'method': 'emulator.launch', 'params': params});
final Map<String, dynamic> response = await daemonStreams.outputs.stream.firstWhere(_notEvent);
expect(response['id'], 0);
expect(response['error'], contains('coldBoot is not a bool'));
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': 0, 'method': 'emulator.launch', 'params': params}));
final DaemonMessage response = await daemonStreams.outputs.stream.firstWhere(_notEvent);
expect(response.data['id'], 0);
expect(response.data['error'], contains('coldBoot is not a bool'));
});
testUsingContext('emulator.getEmulators should respond with list', () async {
......@@ -329,10 +505,10 @@ void main() {
daemonConnection,
notifyingLogger: notifyingLogger,
);
daemonStreams.inputs.add(<String, dynamic>{'id': 0, 'method': 'emulator.getEmulators'});
final Map<String, dynamic> response = await daemonStreams.outputs.stream.firstWhere(_notEvent);
expect(response['id'], 0);
expect(response['result'], isList);
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': 0, 'method': 'emulator.getEmulators'}));
final DaemonMessage response = await daemonStreams.outputs.stream.firstWhere(_notEvent);
expect(response.data['id'], 0);
expect(response.data['result'], isList);
});
testUsingContext('daemon can send exposeUrl requests to the client', () async {
......@@ -346,10 +522,10 @@ void main() {
// Respond to any requests from the daemon to expose a URL.
unawaited(daemonStreams.outputs.stream
.firstWhere((Map<String, dynamic> request) => request['method'] == 'app.exposeUrl')
.then((Map<String, dynamic> request) {
expect((request['params'] as Map<String, dynamic>)['url'], equals(originalUrl));
daemonStreams.inputs.add(<String, dynamic>{'id': request['id'], 'result': <String, dynamic>{'url': mappedUrl}});
.firstWhere((DaemonMessage request) => request.data['method'] == 'app.exposeUrl')
.then((DaemonMessage request) {
expect((request.data['params'] as Map<String, dynamic>)['url'], equals(originalUrl));
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': request.data['id'], 'result': <String, dynamic>{'url': mappedUrl}}));
})
);
......@@ -363,9 +539,9 @@ void main() {
notifyingLogger: notifyingLogger,
);
daemonStreams.inputs.add(<String, dynamic>{'id': 0, 'method': 'devtools.serve'});
final Map<String, dynamic> response = await daemonStreams.outputs.stream.firstWhere((Map<String, dynamic> response) => response['id'] == 0);
final Map<String, dynamic> result = response['result'] as Map<String, dynamic>;
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': 0, 'method': 'devtools.serve'}));
final DaemonMessage response = await daemonStreams.outputs.stream.firstWhere((DaemonMessage response) => response.data['id'] == 0);
final Map<String, dynamic> result = response.data['result'] as Map<String, dynamic>;
expect(result, isNotEmpty);
expect(result['host'], '127.0.0.1');
expect(result['port'], 1234);
......@@ -379,9 +555,9 @@ void main() {
notifyingLogger: notifyingLogger,
);
daemonStreams.inputs.add(<String, dynamic>{'id': 0, 'method': 'devtools.serve'});
final Map<String, dynamic> response = await daemonStreams.outputs.stream.firstWhere((Map<String, dynamic> response) => response['id'] == 0);
final Map<String, dynamic> result = response['result'] as Map<String, dynamic>;
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': 0, 'method': 'devtools.serve'}));
final DaemonMessage response = await daemonStreams.outputs.stream.firstWhere((DaemonMessage response) => response.data['id'] == 0);
final Map<String, dynamic> result = response.data['result'] as Map<String, dynamic>;
expect(result, isNotEmpty);
expect(result['host'], null);
expect(result['port'], null);
......@@ -507,9 +683,9 @@ void main() {
});
}
bool _notEvent(Map<String, dynamic> map) => map['event'] == null;
bool _notEvent(DaemonMessage message) => message.data['event'] == null;
bool _isConnectedEvent(Map<String, dynamic> map) => map['event'] == 'daemon.connected';
bool _isConnectedEvent(DaemonMessage message) => message.data['event'] == 'daemon.connected';
class FakeFuchsiaWorkflow extends Fake implements FuchsiaWorkflow {
FakeFuchsiaWorkflow({ this.canListDevices = true });
......@@ -559,6 +735,94 @@ class FakeAndroidDevice extends Fake implements AndroidDevice {
@override
final bool ephemeral = false;
@override
Future<String> get sdkNameAndVersion async => 'Android 12';
@override
bool get supportsHotReload => true;
@override
bool get supportsHotRestart => true;
@override
bool get supportsScreenshot => true;
@override
bool get supportsFastStart => true;
@override
bool get supportsFlutterExit => true;
@override
Future<bool> get supportsHardwareRendering async => true;
@override
bool get supportsStartPaused => true;
BuildMode supportsRuntimeModeCalledBuildMode;
@override
Future<bool> supportsRuntimeMode(BuildMode buildMode) async {
supportsRuntimeModeCalledBuildMode = buildMode;
return true;
}
DeviceLogReader logReader;
@override
FutureOr<DeviceLogReader> getLogReader({
covariant ApplicationPackage app,
bool includePastLogs = false,
}) => logReader;
ApplicationPackage startAppPackage;
LaunchResult launchResult;
@override
Future<LaunchResult> startApp(
ApplicationPackage package, {
String mainPath,
String route,
DebuggingOptions debuggingOptions,
Map<String, Object> platformArgs = const <String, Object>{},
bool prebuiltApplication = false,
bool ipv6 = false,
String userIdentifier,
}) async {
startAppPackage = package;
return launchResult;
}
ApplicationPackage stopAppPackage;
@override
Future<bool> stopApp(
ApplicationPackage app, {
String userIdentifier,
}) async {
stopAppPackage = app;
return true;
}
}
class FakeDeviceLogReader implements DeviceLogReader {
final StreamController<String> logLinesController = StreamController<String>();
bool disposeCalled = false;
@override
int appPid;
@override
FlutterVmService connectedVMService;
@override
void dispose() {
disposeCalled = true;
}
@override
Stream<String> get logLines => logLinesController.stream;
@override
String get name => 'device';
}
class FakeDevtoolsLauncher extends Fake implements DevtoolsLauncher {
......@@ -572,3 +836,18 @@ class FakeDevtoolsLauncher extends Fake implements DevtoolsLauncher {
@override
Future<void> close() async {}
}
class FakeApplicationPackageFactory implements ApplicationPackageFactory {
TargetPlatform platformRequested;
File applicationBinaryRequested;
ApplicationPackage applicationPackage;
@override
Future<ApplicationPackage> getPackageForPlatform(TargetPlatform platform, {BuildInfo buildInfo, File applicationBinary}) async {
platformRequested = platform;
applicationBinaryRequested = applicationBinary;
return applicationPackage;
}
}
class FakeApplicationPackage extends Fake implements ApplicationPackage {}
// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// @dart = 2.8
import 'dart:async';
import 'package:file/memory.dart';
import 'package:file/src/interface/file.dart';
import 'package:flutter_tools/src/android/android_device.dart';
import 'package:flutter_tools/src/application_package.dart';
import 'package:flutter_tools/src/base/common.dart';
import 'package:flutter_tools/src/base/file_system.dart';
import 'package:flutter_tools/src/base/logger.dart';
import 'package:flutter_tools/src/build_info.dart';
import 'package:flutter_tools/src/commands/daemon.dart';
import 'package:flutter_tools/src/daemon.dart';
import 'package:flutter_tools/src/device.dart';
import 'package:flutter_tools/src/proxied_devices/devices.dart';
import 'package:flutter_tools/src/vmservice.dart';
import 'package:test/fake.dart';
import '../../src/common.dart';
import '../../src/context.dart';
import '../../src/fake_devices.dart';
void main() {
Daemon daemon;
NotifyingLogger notifyingLogger;
BufferLogger bufferLogger;
FakeAndroidDevice fakeDevice;
FakeApplicationPackageFactory applicationPackageFactory;
MemoryFileSystem memoryFileSystem;
FakeProcessManager fakeProcessManager;
group('ProxiedDevices', () {
DaemonConnection serverDaemonConnection;
DaemonConnection clientDaemonConnection;
setUp(() {
bufferLogger = BufferLogger.test();
notifyingLogger = NotifyingLogger(verbose: false, parent: bufferLogger);
final FakeDaemonStreams serverDaemonStreams = FakeDaemonStreams();
serverDaemonConnection = DaemonConnection(
daemonStreams: serverDaemonStreams,
logger: bufferLogger,
);
final FakeDaemonStreams clientDaemonStreams = FakeDaemonStreams();
clientDaemonConnection = DaemonConnection(
daemonStreams: clientDaemonStreams,
logger: bufferLogger,
);
serverDaemonStreams.inputs.addStream(clientDaemonStreams.outputs.stream);
clientDaemonStreams.inputs.addStream(serverDaemonStreams.outputs.stream);
applicationPackageFactory = FakeApplicationPackageFactory();
memoryFileSystem = MemoryFileSystem();
fakeProcessManager = FakeProcessManager.empty();
});
tearDown(() async {
if (daemon != null) {
return daemon.shutdown();
}
notifyingLogger.dispose();
await serverDaemonConnection.dispose();
await clientDaemonConnection.dispose();
});
testUsingContext('can list devices', () async {
daemon = Daemon(
serverDaemonConnection,
notifyingLogger: notifyingLogger,
);
fakeDevice = FakeAndroidDevice();
final FakePollingDeviceDiscovery discoverer = FakePollingDeviceDiscovery();
daemon.deviceDomain.addDeviceDiscoverer(discoverer);
discoverer.addDevice(fakeDevice);
final ProxiedDevices proxiedDevices = ProxiedDevices(clientDaemonConnection, logger: bufferLogger);
final List<Device> devices = await proxiedDevices.discoverDevices();
expect(devices, hasLength(1));
final Device device = devices[0];
expect(device.id, fakeDevice.id);
expect(device.name, 'Proxied ${fakeDevice.name}');
expect(await device.targetPlatform, await fakeDevice.targetPlatform);
expect(await device.isLocalEmulator, await fakeDevice.isLocalEmulator);
});
testUsingContext('calls supportsRuntimeMode', () async {
daemon = Daemon(
serverDaemonConnection,
notifyingLogger: notifyingLogger,
);
fakeDevice = FakeAndroidDevice();
final FakePollingDeviceDiscovery discoverer = FakePollingDeviceDiscovery();
daemon.deviceDomain.addDeviceDiscoverer(discoverer);
discoverer.addDevice(fakeDevice);
final ProxiedDevices proxiedDevices = ProxiedDevices(clientDaemonConnection, logger: bufferLogger);
final List<Device> devices = await proxiedDevices.devices;
expect(devices, hasLength(1));
final Device device = devices[0];
final bool supportsRuntimeMode = await device.supportsRuntimeMode(BuildMode.release);
expect(fakeDevice.supportsRuntimeModeCalledBuildMode, BuildMode.release);
expect(supportsRuntimeMode, true);
});
testUsingContext('redirects logs', () async {
daemon = Daemon(
serverDaemonConnection,
notifyingLogger: notifyingLogger,
);
fakeDevice = FakeAndroidDevice();
final FakePollingDeviceDiscovery discoverer = FakePollingDeviceDiscovery();
daemon.deviceDomain.addDeviceDiscoverer(discoverer);
discoverer.addDevice(fakeDevice);
final ProxiedDevices proxiedDevices = ProxiedDevices(clientDaemonConnection, logger: bufferLogger);
final FakeDeviceLogReader fakeLogReader = FakeDeviceLogReader();
fakeDevice.logReader = fakeLogReader;
final List<Device> devices = await proxiedDevices.devices;
expect(devices, hasLength(1));
final Device device = devices[0];
final DeviceLogReader logReader = await device.getLogReader();
fakeLogReader.logLinesController.add('Some log line');
final String receivedLogLine = await logReader.logLines.first;
expect(receivedLogLine, 'Some log line');
// Now try to stop the log reader
expect(fakeLogReader.disposeCalled, false);
logReader.dispose();
await pumpEventQueue();
expect(fakeLogReader.disposeCalled, true);
});
testUsingContext('starts and stops app', () async {
daemon = Daemon(
serverDaemonConnection,
notifyingLogger: notifyingLogger,
);
fakeDevice = FakeAndroidDevice();
final FakePollingDeviceDiscovery discoverer = FakePollingDeviceDiscovery();
daemon.deviceDomain.addDeviceDiscoverer(discoverer);
discoverer.addDevice(fakeDevice);
final ProxiedDevices proxiedDevices = ProxiedDevices(clientDaemonConnection, logger: bufferLogger);
final FakePrebuiltApplicationPackage prebuiltApplicationPackage = FakePrebuiltApplicationPackage();
final File dummyApplicationBinary = memoryFileSystem.file('/directory/dummy_file');
dummyApplicationBinary.parent.createSync();
dummyApplicationBinary.writeAsStringSync('dummy content');
prebuiltApplicationPackage.applicationPackage = dummyApplicationBinary;
final List<Device> devices = await proxiedDevices.devices;
expect(devices, hasLength(1));
final Device device = devices[0];
// Now try to start the app
final FakeApplicationPackage applicationPackage = FakeApplicationPackage();
applicationPackageFactory.applicationPackage = applicationPackage;
final Uri observatoryUri = Uri.parse('http://127.0.0.1:12345/observatory');
fakeDevice.launchResult = LaunchResult.succeeded(observatoryUri: observatoryUri);
final LaunchResult launchResult = await device.startApp(
prebuiltApplicationPackage,
debuggingOptions: DebuggingOptions.enabled(BuildInfo.debug),
);
expect(launchResult.started, true);
// The returned observatoryUri was a forwarded port, so we cannot compare them directly.
expect(launchResult.observatoryUri.path, observatoryUri.path);
expect(applicationPackageFactory.applicationBinaryRequested.readAsStringSync(), 'dummy content');
expect(applicationPackageFactory.platformRequested, TargetPlatform.android_arm);
expect(fakeDevice.startAppPackage, applicationPackage);
// Now try to stop the app
final bool stopAppResult = await device.stopApp(prebuiltApplicationPackage);
expect(fakeDevice.stopAppPackage, applicationPackage);
expect(stopAppResult, true);
}, overrides: <Type, Generator>{
ApplicationPackageFactory: () => applicationPackageFactory,
FileSystem: () => memoryFileSystem,
ProcessManager: () => fakeProcessManager,
});
testUsingContext('takes screenshot', () async {
daemon = Daemon(
serverDaemonConnection,
notifyingLogger: notifyingLogger,
);
fakeDevice = FakeAndroidDevice();
final FakePollingDeviceDiscovery discoverer = FakePollingDeviceDiscovery();
daemon.deviceDomain.addDeviceDiscoverer(discoverer);
discoverer.addDevice(fakeDevice);
final ProxiedDevices proxiedDevices = ProxiedDevices(clientDaemonConnection, logger: bufferLogger);
final List<Device> devices = await proxiedDevices.devices;
expect(devices, hasLength(1));
final Device device = devices[0];
final List<int> screenshot = <int>[1,2,3,4,5];
fakeDevice.screenshot = screenshot;
final File screenshotOutputFile = memoryFileSystem.file('screenshot_file');
await device.takeScreenshot(screenshotOutputFile);
expect(await screenshotOutputFile.readAsBytes(), screenshot);
}, overrides: <Type, Generator>{
FileSystem: () => memoryFileSystem,
ProcessManager: () => fakeProcessManager,
});
});
}
class FakeDaemonStreams implements DaemonStreams {
final StreamController<DaemonMessage> inputs = StreamController<DaemonMessage>();
final StreamController<DaemonMessage> outputs = StreamController<DaemonMessage>();
@override
Stream<DaemonMessage> get inputStream {
return inputs.stream;
}
@override
void send(Map<String, dynamic> message, [ List<int> binary ]) {
outputs.add(DaemonMessage(message, binary != null ? Stream<List<int>>.value(binary) : null));
}
@override
Future<void> dispose() async {
await inputs.close();
// In some tests, outputs have no listeners. We don't wait for outputs to close.
unawaited(outputs.close());
}
}
// Unfortunately Device, despite not being immutable, has an `operator ==`.
// Until we fix that, we have to also ignore related lints here.
// ignore: avoid_implementing_value_types
class FakeAndroidDevice extends Fake implements AndroidDevice {
@override
final String id = 'device';
@override
final String name = 'device';
@override
Future<String> get emulatorId async => 'device';
@override
Future<TargetPlatform> get targetPlatform async => TargetPlatform.android_arm;
@override
Future<bool> get isLocalEmulator async => false;
@override
final Category category = Category.mobile;
@override
final PlatformType platformType = PlatformType.android;
@override
final bool ephemeral = false;
@override
Future<String> get sdkNameAndVersion async => 'Android 12';
@override
bool get supportsHotReload => true;
@override
bool get supportsHotRestart => true;
@override
bool get supportsScreenshot => true;
@override
bool get supportsFastStart => true;
@override
bool get supportsFlutterExit => true;
@override
Future<bool> get supportsHardwareRendering async => true;
@override
bool get supportsStartPaused => true;
BuildMode supportsRuntimeModeCalledBuildMode;
@override
Future<bool> supportsRuntimeMode(BuildMode buildMode) async {
supportsRuntimeModeCalledBuildMode = buildMode;
return true;
}
DeviceLogReader logReader;
@override
FutureOr<DeviceLogReader> getLogReader({
covariant ApplicationPackage app,
bool includePastLogs = false,
}) => logReader;
ApplicationPackage startAppPackage;
LaunchResult launchResult;
@override
Future<LaunchResult> startApp(
ApplicationPackage package, {
String mainPath,
String route,
DebuggingOptions debuggingOptions,
Map<String, Object> platformArgs = const <String, Object>{},
bool prebuiltApplication = false,
bool ipv6 = false,
String userIdentifier,
}) async {
startAppPackage = package;
return launchResult;
}
ApplicationPackage stopAppPackage;
@override
Future<bool> stopApp(
ApplicationPackage app, {
String userIdentifier,
}) async {
stopAppPackage = app;
return true;
}
List<int> screenshot;
@override
Future<void> takeScreenshot(File outputFile) {
return outputFile.writeAsBytes(screenshot);
}
}
class FakeDeviceLogReader implements DeviceLogReader {
final StreamController<String> logLinesController = StreamController<String>();
bool disposeCalled = false;
@override
int appPid;
@override
FlutterVmService connectedVMService;
@override
void dispose() {
disposeCalled = true;
}
@override
Stream<String> get logLines => logLinesController.stream;
@override
String get name => 'device';
}
class FakeApplicationPackageFactory implements ApplicationPackageFactory {
TargetPlatform platformRequested;
File applicationBinaryRequested;
ApplicationPackage applicationPackage;
@override
Future<ApplicationPackage> getPackageForPlatform(TargetPlatform platform, {BuildInfo buildInfo, File applicationBinary}) async {
platformRequested = platform;
applicationBinaryRequested = applicationBinary;
return applicationPackage;
}
}
class FakeApplicationPackage extends Fake implements ApplicationPackage {}
class FakePrebuiltApplicationPackage extends Fake implements PrebuiltApplicationPackage {
@override
File applicationPackage;
}
......@@ -14,18 +14,18 @@ import 'package:test/fake.dart';
import '../src/common.dart';
class FakeDaemonStreams extends DaemonStreams {
final StreamController<Map<String, dynamic>> inputs = StreamController<Map<String, dynamic>>();
final StreamController<Map<String, dynamic>> outputs = StreamController<Map<String, dynamic>>();
class FakeDaemonStreams implements DaemonStreams {
final StreamController<DaemonMessage> inputs = StreamController<DaemonMessage>();
final StreamController<DaemonMessage> outputs = StreamController<DaemonMessage>();
@override
Stream<Map<String, dynamic>> get inputStream {
Stream<DaemonMessage> get inputStream {
return inputs.stream;
}
@override
void send(Map<String, dynamic> message) {
outputs.add(message);
void send(Map<String, dynamic> message, [ List<int>? binary ]) {
outputs.add(DaemonMessage(message, binary != null ? Stream<List<int>>.value(binary) : null));
}
@override
......@@ -56,163 +56,340 @@ void main() {
group('DaemonConnection receiving end', () {
testWithoutContext('redirects input to incoming commands', () async {
final Map<String, dynamic> commandToSend = <String, dynamic>{'id': 0, 'method': 'some_method'};
daemonStreams.inputs.add(commandToSend);
daemonStreams.inputs.add(DaemonMessage(commandToSend));
final Map<String, dynamic> commandReceived = await daemonConnection.incomingCommands.first;
final DaemonMessage commandReceived = await daemonConnection.incomingCommands.first;
await daemonStreams.dispose();
expect(commandReceived, commandToSend);
expect(commandReceived.data, commandToSend);
});
testWithoutContext('listenToEvent can receive the right events', () async {
final Future<List<dynamic>> events = daemonConnection.listenToEvent('event1').toList();
final Future<List<DaemonEventData>> events = daemonConnection.listenToEvent('event1').toList();
daemonStreams.inputs.add(<String, dynamic>{'event': 'event1', 'params': '1'});
daemonStreams.inputs.add(<String, dynamic>{'event': 'event2', 'params': '2'});
daemonStreams.inputs.add(<String, dynamic>{'event': 'event1', 'params': null});
daemonStreams.inputs.add(<String, dynamic>{'event': 'event1', 'params': 3});
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'event': 'event1', 'params': '1'}));
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'event': 'event2', 'params': '2'}));
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'event': 'event1', 'params': null}));
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'event': 'event1', 'params': 3}));
await pumpEventQueue();
await daemonConnection.dispose();
expect(await events, <dynamic>['1', null, 3]);
expect((await events).map((DaemonEventData event) => event.data).toList(), <dynamic>['1', null, 3]);
});
});
group('DaemonConnection sending end', () {
testWithoutContext('sending requests', () async {
unawaited(daemonConnection.sendRequest('some_method', 'param'));
final Map<String, dynamic> data = await daemonStreams.outputs.stream.first;
expect(data['id'], isNotNull);
expect(data['method'], 'some_method');
expect(data['params'], 'param');
final DaemonMessage message = await daemonStreams.outputs.stream.first;
expect(message.data['id'], isNotNull);
expect(message.data['method'], 'some_method');
expect(message.data['params'], 'param');
});
testWithoutContext('sending requests without param', () async {
unawaited(daemonConnection.sendRequest('some_method'));
final Map<String, dynamic> data = await daemonStreams.outputs.stream.first;
expect(data['id'], isNotNull);
expect(data['method'], 'some_method');
expect(data['params'], isNull);
final DaemonMessage message = await daemonStreams.outputs.stream.first;
expect(message.data['id'], isNotNull);
expect(message.data['method'], 'some_method');
expect(message.data['params'], isNull);
});
testWithoutContext('sending response', () async {
daemonConnection.sendResponse('1', 'some_data');
final Map<String, dynamic> data = await daemonStreams.outputs.stream.first;
expect(data['id'], '1');
expect(data['method'], isNull);
expect(data['error'], isNull);
expect(data['result'], 'some_data');
final DaemonMessage message = await daemonStreams.outputs.stream.first;
expect(message.data['id'], '1');
expect(message.data['method'], isNull);
expect(message.data['error'], isNull);
expect(message.data['result'], 'some_data');
});
testWithoutContext('sending response without data', () async {
daemonConnection.sendResponse('1');
final Map<String, dynamic> data = await daemonStreams.outputs.stream.first;
expect(data['id'], '1');
expect(data['method'], isNull);
expect(data['error'], isNull);
expect(data['result'], isNull);
final DaemonMessage message = await daemonStreams.outputs.stream.first;
expect(message.data['id'], '1');
expect(message.data['method'], isNull);
expect(message.data['error'], isNull);
expect(message.data['result'], isNull);
});
testWithoutContext('sending error response', () async {
daemonConnection.sendErrorResponse('1', 'error', StackTrace.fromString('stack trace'));
final Map<String, dynamic> data = await daemonStreams.outputs.stream.first;
expect(data['id'], '1');
expect(data['method'], isNull);
expect(data['error'], 'error');
expect(data['trace'], 'stack trace');
final DaemonMessage message = await daemonStreams.outputs.stream.first;
expect(message.data['id'], '1');
expect(message.data['method'], isNull);
expect(message.data['error'], 'error');
expect(message.data['trace'], 'stack trace');
});
testWithoutContext('sending events', () async {
daemonConnection.sendEvent('some_event', '123');
final Map<String, dynamic> data = await daemonStreams.outputs.stream.first;
expect(data['id'], isNull);
expect(data['event'], 'some_event');
expect(data['params'], '123');
final DaemonMessage message = await daemonStreams.outputs.stream.first;
expect(message.data['id'], isNull);
expect(message.data['event'], 'some_event');
expect(message.data['params'], '123');
});
testWithoutContext('sending events without params', () async {
daemonConnection.sendEvent('some_event');
final Map<String, dynamic> data = await daemonStreams.outputs.stream.first;
expect(data['id'], isNull);
expect(data['event'], 'some_event');
expect(data['params'], isNull);
final DaemonMessage message = await daemonStreams.outputs.stream.first;
expect(message.data['id'], isNull);
expect(message.data['event'], 'some_event');
expect(message.data['params'], isNull);
});
});
group('DaemonConnection request and response', () {
testWithoutContext('receiving response from requests', () async {
final Future<dynamic> requestFuture = daemonConnection.sendRequest('some_method', 'param');
final Map<String, dynamic> data = await daemonStreams.outputs.stream.first;
final DaemonMessage message = await daemonStreams.outputs.stream.first;
expect(data['id'], isNotNull);
expect(data['method'], 'some_method');
expect(data['params'], 'param');
expect(message.data['id'], isNotNull);
expect(message.data['method'], 'some_method');
expect(message.data['params'], 'param');
final String id = data['id'] as String;
daemonStreams.inputs.add(<String, dynamic>{'id': id, 'result': '123'});
final String id = message.data['id']! as String;
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': id, 'result': '123'}));
expect(await requestFuture, '123');
});
testWithoutContext('receiving response from requests without result', () async {
final Future<dynamic> requestFuture = daemonConnection.sendRequest('some_method', 'param');
final Map<String, dynamic> data = await daemonStreams.outputs.stream.first;
final DaemonMessage message = await daemonStreams.outputs.stream.first;
expect(data['id'], isNotNull);
expect(data['method'], 'some_method');
expect(data['params'], 'param');
expect(message.data['id'], isNotNull);
expect(message.data['method'], 'some_method');
expect(message.data['params'], 'param');
final String id = data['id'] as String;
daemonStreams.inputs.add(<String, dynamic>{'id': id});
final String id = message.data['id']! as String;
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': id}));
expect(await requestFuture, null);
});
testWithoutContext('receiving error response from requests without result', () async {
final Future<dynamic> requestFuture = daemonConnection.sendRequest('some_method', 'param');
final Map<String, dynamic> data = await daemonStreams.outputs.stream.first;
final DaemonMessage message = await daemonStreams.outputs.stream.first;
expect(data['id'], isNotNull);
expect(data['method'], 'some_method');
expect(data['params'], 'param');
expect(message.data['id'], isNotNull);
expect(message.data['method'], 'some_method');
expect(message.data['params'], 'param');
final String id = data['id'] as String;
daemonStreams.inputs.add(<String, dynamic>{'id': id, 'error': 'some_error', 'trace': 'stack trace'});
final String id = message.data['id']! as String;
daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': id, 'error': 'some_error', 'trace': 'stack trace'}));
expect(requestFuture, throwsA('some_error'));
});
});
group('TcpDaemonStreams', () {
group('DaemonInputStreamConverter', () {
Map<String, Object?> testCommand(int id, [int? binarySize]) => <String, Object?>{
'id': id,
'method': 'test',
if (binarySize != null)
'_binaryLength': binarySize,
};
List<int> testCommandBinary(int id, [int? binarySize]) => utf8.encode('[${json.encode(testCommand(id, binarySize))}]\n');
testWithoutContext('can parse a single message', () async {
final Stream<List<int>> inputStream = Stream<List<int>>.fromIterable(<List<int>>[
testCommandBinary(10),
]);
final DaemonInputStreamConverter converter = DaemonInputStreamConverter(inputStream);
final Stream<DaemonMessage> outputStream = converter.convertedStream;
final List<DaemonMessage> outputs = await outputStream.toList();
expect(outputs, hasLength(1));
expect(outputs[0].data, testCommand(10));
expect(outputs[0].binary, null);
});
testWithoutContext('can parse multiple messages', () async {
final Stream<List<int>> inputStream = Stream<List<int>>.fromIterable(<List<int>>[
testCommandBinary(10),
testCommandBinary(20),
]);
final DaemonInputStreamConverter converter = DaemonInputStreamConverter(inputStream);
final Stream<DaemonMessage> outputStream = converter.convertedStream;
final List<DaemonMessage> outputs = await outputStream.toList();
expect(outputs, hasLength(2));
expect(outputs[0].data, testCommand(10));
expect(outputs[0].binary, null);
expect(outputs[1].data, testCommand(20));
expect(outputs[1].binary, null);
});
testWithoutContext('can parse multiple messages while ignoring non json data in between', () async {
final Stream<List<int>> inputStream = Stream<List<int>>.fromIterable(<List<int>>[
testCommandBinary(10),
utf8.encode('This is not a json data...\n'),
testCommandBinary(20),
]);
final DaemonInputStreamConverter converter = DaemonInputStreamConverter(inputStream);
final Stream<DaemonMessage> outputStream = converter.convertedStream;
final List<DaemonMessage> outputs = await outputStream.toList();
expect(outputs, hasLength(2));
expect(outputs[0].data, testCommand(10));
expect(outputs[0].binary, null);
expect(outputs[1].data, testCommand(20));
expect(outputs[1].binary, null);
});
testWithoutContext('can parse multiple messages even when they are split in multiple packets', () async {
final List<int> binary1 = testCommandBinary(10);
final List<int> binary2 = testCommandBinary(20);
final Stream<List<int>> inputStream = Stream<List<int>>.fromIterable(<List<int>>[
binary1.sublist(0, 5),
binary1.sublist(5, 15),
binary1.sublist(15) + binary2.sublist(0, 13),
binary2.sublist(13),
]);
final DaemonInputStreamConverter converter = DaemonInputStreamConverter(inputStream);
final Stream<DaemonMessage> outputStream = converter.convertedStream;
final List<DaemonMessage> outputs = await outputStream.toList();
expect(outputs, hasLength(2));
expect(outputs[0].data, testCommand(10));
expect(outputs[0].binary, null);
expect(outputs[1].data, testCommand(20));
expect(outputs[1].binary, null);
});
testWithoutContext('can parse multiple messages even when they are combined in a single packet', () async {
final List<int> binary1 = testCommandBinary(10);
final List<int> binary2 = testCommandBinary(20);
final Stream<List<int>> inputStream = Stream<List<int>>.fromIterable(<List<int>>[
binary1 + binary2,
]);
final DaemonInputStreamConverter converter = DaemonInputStreamConverter(inputStream);
final Stream<DaemonMessage> outputStream = converter.convertedStream;
final List<DaemonMessage> outputs = await outputStream.toList();
expect(outputs, hasLength(2));
expect(outputs[0].data, testCommand(10));
expect(outputs[0].binary, null);
expect(outputs[1].data, testCommand(20));
expect(outputs[1].binary, null);
});
testWithoutContext('can parse a single message with binary stream', () async {
final List<int> binary = <int>[1,2,3,4,5];
final Stream<List<int>> inputStream = Stream<List<int>>.fromIterable(<List<int>>[
testCommandBinary(10, binary.length),
binary,
]);
final DaemonInputStreamConverter converter = DaemonInputStreamConverter(inputStream);
final Stream<DaemonMessage> outputStream = converter.convertedStream;
final List<_DaemonMessageAndBinary> allOutputs = await _readAllBinaries(outputStream);
expect(allOutputs, hasLength(1));
expect(allOutputs[0].message.data, testCommand(10, binary.length));
expect(allOutputs[0].binary, binary);
});
testWithoutContext('can parse a single message with binary stream when messages are combined in a single packet', () async {
final List<int> binary = <int>[1,2,3,4,5];
final Stream<List<int>> inputStream = Stream<List<int>>.fromIterable(<List<int>>[
testCommandBinary(10, binary.length) + binary,
]);
final DaemonInputStreamConverter converter = DaemonInputStreamConverter(inputStream);
final Stream<DaemonMessage> outputStream = converter.convertedStream;
final List<_DaemonMessageAndBinary> allOutputs = await _readAllBinaries(outputStream);
expect(allOutputs, hasLength(1));
expect(allOutputs[0].message.data, testCommand(10, binary.length));
expect(allOutputs[0].binary, binary);
});
testWithoutContext('can parse multiple messages with binary stream', () async {
final List<int> binary1 = <int>[1,2,3,4,5];
final List<int> binary2 = <int>[6,7,8,9,10,11,12];
final Stream<List<int>> inputStream = Stream<List<int>>.fromIterable(<List<int>>[
testCommandBinary(10, binary1.length),
binary1,
testCommandBinary(20, binary2.length),
binary2,
]);
final DaemonInputStreamConverter converter = DaemonInputStreamConverter(inputStream);
final Stream<DaemonMessage> outputStream = converter.convertedStream;
final List<_DaemonMessageAndBinary> allOutputs = await _readAllBinaries(outputStream);
expect(allOutputs, hasLength(2));
expect(allOutputs[0].message.data, testCommand(10, binary1.length));
expect(allOutputs[0].binary, binary1);
expect(allOutputs[1].message.data, testCommand(20, binary2.length));
expect(allOutputs[1].binary, binary2);
});
testWithoutContext('can parse multiple messages with binary stream when messages are split', () async {
final List<int> binary1 = <int>[1,2,3,4,5];
final List<int> message1 = testCommandBinary(10, binary1.length);
final List<int> binary2 = <int>[6,7,8,9,10,11,12];
final List<int> message2 = testCommandBinary(20, binary2.length);
final Stream<List<int>> inputStream = Stream<List<int>>.fromIterable(<List<int>>[
message1.sublist(0, 10),
message1.sublist(10) + binary1 + message2.sublist(0, 5),
message2.sublist(5) + binary2.sublist(0, 3),
binary2.sublist(3, 5),
binary2.sublist(5),
]);
final DaemonInputStreamConverter converter = DaemonInputStreamConverter(inputStream);
final Stream<DaemonMessage> outputStream = converter.convertedStream;
final List<_DaemonMessageAndBinary> allOutputs = await _readAllBinaries(outputStream);
expect(allOutputs, hasLength(2));
expect(allOutputs[0].message.data, testCommand(10, binary1.length));
expect(allOutputs[0].binary, binary1);
expect(allOutputs[1].message.data, testCommand(20, binary2.length));
expect(allOutputs[1].binary, binary2);
});
});
group('DaemonStreams', () {
final Map<String, Object?> testCommand = <String, Object?>{
'id': 100,
'method': 'test',
};
late FakeSocket socket;
late TcpDaemonStreams daemonStreams;
late StreamController<List<int>> inputStream;
late StreamController<List<int>> outputStream;
late DaemonStreams daemonStreams;
setUp(() {
socket = FakeSocket();
daemonStreams = TcpDaemonStreams(socket, logger: bufferLogger);
inputStream = StreamController<List<int>>();
outputStream = StreamController<List<int>>();
daemonStreams = DaemonStreams(inputStream.stream, outputStream.sink, logger: bufferLogger);
});
test('parses the message received on the socket', () async {
socket.controller.add(Uint8List.fromList(utf8.encode('[${jsonEncode(testCommand)}]\n')));
final Map<String, Object?> command = await daemonStreams.inputStream.first;
expect(command, testCommand);
testWithoutContext('parses the message received on the stream', () async {
inputStream.add(utf8.encode('[${jsonEncode(testCommand)}]\n'));
final DaemonMessage command = await daemonStreams.inputStream.first;
expect(command.data, testCommand);
expect(command.binary, null);
});
test('sends the encoded message through the socket', () async {
testWithoutContext('sends the encoded message through the sink', () async {
daemonStreams.send(testCommand);
await pumpEventQueue();
expect(socket.writtenObjects.length, 1);
expect(socket.writtenObjects[0].toString(), '[${jsonEncode(testCommand)}]\n');
final List<int> commands = await outputStream.stream.first;
expect(commands, utf8.encode('[${jsonEncode(testCommand)}]\n'));
});
test('dispose calls socket.close', () async {
testWithoutContext('dispose closes the sink', () async {
await daemonStreams.dispose();
expect(socket.closeCalled, isTrue);
expect(outputStream.isClosed, true);
});
});
}
class _DaemonMessageAndBinary {
_DaemonMessageAndBinary(this.message, this.binary);
final DaemonMessage message;
final List<int>? binary;
}
Future<List<_DaemonMessageAndBinary>> _readAllBinaries(Stream<DaemonMessage> inputStream) async {
final StreamIterator<DaemonMessage> iterator = StreamIterator<DaemonMessage>(inputStream);
final List<_DaemonMessageAndBinary> outputs = <_DaemonMessageAndBinary>[];
while (await iterator.moveNext()) {
List<int>? binary;
if (iterator.current.binary != null) {
binary = await iterator.current.binary!.reduce((List<int> a, List<int> b) => a + b);
}
outputs.add(_DaemonMessageAndBinary(iterator.current, binary));
}
return outputs;
}
class FakeSocket extends Fake implements Socket {
bool closeCalled = false;
final StreamController<Uint8List> controller = StreamController<Uint8List>();
......
......@@ -168,6 +168,14 @@ class FakePollingDeviceDiscovery extends PollingDeviceDiscovery {
devices.forEach(addDevice);
}
bool discoverDevicesCalled = false;
@override
Future<List<Device>> discoverDevices({Duration? timeout}) {
discoverDevicesCalled = true;
return super.discoverDevices(timeout: timeout);
}
@override
Stream<Device> get onAdded => _onAddedController.stream;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment