Unverified Commit 2b46ea44 authored by Lau Ching Jun's avatar Lau Ching Jun Committed by GitHub

Add an option for flutter daemon to listen on a TCP port (#95418)

* Add an option for flutter daemon to listen on a TCP port

Added a new class DaemonConnection to reuse the connection handling
between daemon server and client, and handle connection with different
medium (stdio, socket).

Added a new option `listen-on-tcp-port` to the flutter daemon command,
when passed, the daemon will accept commands on a port instead of stdio.

* Review feedback and add test for TcpDaemonStreams

* Review feedbacks
parent dc684e89
......@@ -18,6 +18,7 @@ import '../base/io.dart';
import '../build_info.dart';
import '../commands/daemon.dart';
import '../compile.dart';
import '../daemon.dart';
import '../device.dart';
import '../device_port_forwarder.dart';
import '../fuchsia/fuchsia_device.dart';
......@@ -235,8 +236,10 @@ known, it can be explicitly provided to attach via the command-line, e.g.
final Daemon daemon = boolArg('machine')
? Daemon(
stdinCommandStream,
stdoutCommandResponse,
DaemonConnection(
daemonStreams: StdioDaemonStreams(globals.stdio),
logger: globals.logger,
),
notifyingLogger: (globals.logger is NotifyingLogger)
? globals.logger as NotifyingLogger
: NotifyingLogger(verbose: globals.logger.isVerbose, parent: globals.logger),
......
......@@ -18,7 +18,7 @@ import '../base/logger.dart';
import '../base/terminal.dart';
import '../base/utils.dart';
import '../build_info.dart';
import '../convert.dart';
import '../daemon.dart';
import '../device.dart';
import '../device_port_forwarder.dart';
import '../emulator.dart';
......@@ -41,7 +41,13 @@ const String protocolVersion = '0.6.1';
/// It can be shutdown with a `daemon.shutdown` command (or by killing the
/// process).
class DaemonCommand extends FlutterCommand {
DaemonCommand({ this.hidden = false });
DaemonCommand({ this.hidden = false }) {
argParser.addOption(
'listen-on-tcp-port',
help: 'If specified, the daemon will be listening for commands on the specified port instead of stdio.',
valueHelp: 'port',
);
}
@override
final String name = 'daemon';
......@@ -57,9 +63,31 @@ class DaemonCommand extends FlutterCommand {
@override
Future<FlutterCommandResult> runCommand() async {
if (argResults['listen-on-tcp-port'] != null) {
int port;
try {
port = int.parse(stringArg('listen-on-tcp-port'));
} on FormatException catch (error) {
throwToolExit('Invalid port for `--listen-on-tcp-port`: $error');
}
await _DaemonServer(
port: port,
logger: StdoutLogger(
terminal: globals.terminal,
stdio: globals.stdio,
outputPreferences: globals.outputPreferences,
),
notifyingLogger: asLogger<NotifyingLogger>(globals.logger),
).run();
return FlutterCommandResult.success();
}
globals.printStatus('Starting device daemon...');
final Daemon daemon = Daemon(
stdinCommandStream, stdoutCommandResponse,
DaemonConnection(
daemonStreams: StdioDaemonStreams(globals.stdio),
logger: globals.logger,
),
notifyingLogger: asLogger<NotifyingLogger>(globals.logger),
);
final int code = await daemon.onExit;
......@@ -70,14 +98,57 @@ class DaemonCommand extends FlutterCommand {
}
}
typedef DispatchCommand = void Function(Map<String, dynamic> command);
class _DaemonServer {
_DaemonServer({
this.port,
this.logger,
this.notifyingLogger,
});
final int port;
/// Stdout logger used to print general server-related errors.
final Logger logger;
// Logger that sends the message to the other end of daemon connection.
final NotifyingLogger notifyingLogger;
Future<void> run() async {
final ServerSocket serverSocket = await ServerSocket.bind(InternetAddress.loopbackIPv4, port);
logger.printStatus('Daemon server listening on ${serverSocket.port}');
final StreamSubscription<Socket> subscription = serverSocket.listen(
(Socket socket) async {
// We have to listen to socket.done. Otherwise when the connection is
// reset, we will receive an uncatchable exception.
// https://github.com/dart-lang/sdk/issues/25518
final Future<void> socketDone = socket.done.catchError((dynamic error, StackTrace stackTrace) {
logger.printError('Socket error: $error');
logger.printTrace('$stackTrace');
});
final Daemon daemon = Daemon(
DaemonConnection(
daemonStreams: TcpDaemonStreams(socket, logger: logger),
logger: logger,
),
notifyingLogger: notifyingLogger,
);
await daemon.onExit;
await socketDone;
},
);
// Wait indefinitely until the server closes.
await subscription.asFuture<void>();
await subscription.cancel();
}
}
typedef CommandHandler = Future<dynamic> Function(Map<String, dynamic> args);
class Daemon {
Daemon(
Stream<Map<String, dynamic>> commandStream,
this.sendCommand, {
this.connection, {
this.notifyingLogger,
this.logToStdout = false,
}) {
......@@ -89,9 +160,10 @@ class Daemon {
_registerDomain(devToolsDomain = DevToolsDomain(this));
// Start listening.
_commandSubscription = commandStream.listen(
_commandSubscription = connection.incomingCommands.listen(
_handleRequest,
onDone: () {
shutdown();
if (!_onExitCompleter.isCompleted) {
_onExitCompleter.complete(0);
}
......@@ -99,16 +171,15 @@ class Daemon {
);
}
final DaemonConnection connection;
DaemonDomain daemonDomain;
AppDomain appDomain;
DeviceDomain deviceDomain;
EmulatorDomain emulatorDomain;
DevToolsDomain devToolsDomain;
StreamSubscription<Map<String, dynamic>> _commandSubscription;
int _outgoingRequestId = 1;
final Map<String, Completer<dynamic>> _outgoingRequestCompleters = <String, Completer<dynamic>>{};
final DispatchCommand sendCommand;
final NotifyingLogger notifyingLogger;
final bool logToStdout;
......@@ -134,7 +205,7 @@ class Daemon {
try {
final String method = request['method'] as String;
if (method != null) {
assert(method != null);
if (!method.contains('.')) {
throw 'method not understood: $method';
}
......@@ -146,50 +217,15 @@ class Daemon {
}
_domainMap[prefix].handleCommand(name, id, castStringKeyedMap(request['params']) ?? const <String, dynamic>{});
} else {
// If there was no 'method' field then it's a response to a daemon-to-editor request.
final Completer<dynamic> completer = _outgoingRequestCompleters[id.toString()];
if (completer == null) {
throw 'unexpected response with id: $id';
}
_outgoingRequestCompleters.remove(id.toString());
if (request['error'] != null) {
completer.completeError(request['error']);
} else {
completer.complete(request['result']);
}
}
} on Exception catch (error, trace) {
_send(<String, dynamic>{
'id': id,
'error': _toJsonable(error),
'trace': '$trace',
});
connection.sendErrorResponse(id, _toJsonable(error), trace);
}
}
Future<dynamic> sendRequest(String method, [ dynamic args ]) {
final Map<String, dynamic> map = <String, dynamic>{'method': method};
if (args != null) {
map['params'] = _toJsonable(args);
}
final int id = _outgoingRequestId++;
final Completer<dynamic> completer = Completer<dynamic>();
map['id'] = id.toString();
_outgoingRequestCompleters[id.toString()] = completer;
_send(map);
return completer.future;
}
void _send(Map<String, dynamic> map) => sendCommand(map);
Future<void> shutdown({ dynamic error }) async {
await devToolsDomain?.dispose();
await _commandSubscription?.cancel();
await connection.dispose();
for (final Domain domain in _domainMap.values) {
await domain.dispose();
}
......@@ -225,30 +261,16 @@ abstract class Domain {
}
throw 'command not understood: $name.$command';
}).then<dynamic>((dynamic result) {
if (result == null) {
_send(<String, dynamic>{'id': id});
} else {
_send(<String, dynamic>{'id': id, 'result': _toJsonable(result)});
}
}).catchError((dynamic error, dynamic trace) {
_send(<String, dynamic>{
'id': id,
'error': _toJsonable(error),
'trace': '$trace',
});
daemon.connection.sendResponse(id, _toJsonable(result));
}).catchError((Object error, StackTrace stackTrace) {
daemon.connection.sendErrorResponse(id, _toJsonable(error), stackTrace);
});
}
void sendEvent(String name, [ dynamic args ]) {
final Map<String, dynamic> map = <String, dynamic>{'event': name};
if (args != null) {
map['params'] = _toJsonable(args);
}
_send(map);
daemon.connection.sendEvent(name, _toJsonable(args));
}
void _send(Map<String, dynamic> map) => daemon._send(map);
String _getStringArg(Map<String, dynamic> args, String name, { bool required = false }) {
if (required && !args.containsKey(name)) {
throw '$name is required';
......@@ -346,7 +368,7 @@ class DaemonDomain extends Domain {
/// --web-allow-expose-url switch. The client may return the same URL back if
/// tunnelling is not required for a given URL.
Future<String> exposeUrl(String url) async {
final dynamic res = await daemon.sendRequest('app.exposeUrl', <String, String>{'url': url});
final dynamic res = await daemon.connection.sendRequest('app.exposeUrl', <String, String>{'url': url});
if (res is Map<String, dynamic> && res['url'] is String) {
return res['url'] as String;
} else {
......@@ -907,35 +929,6 @@ class DevToolsDomain extends Domain {
}
}
Stream<Map<String, dynamic>> get stdinCommandStream => globals.stdio.stdin
.transform<String>(utf8.decoder)
.transform<String>(const LineSplitter())
.where((String line) => line.startsWith('[{') && line.endsWith('}]'))
.map<Map<String, dynamic>>((String line) {
line = line.substring(1, line.length - 1);
return castStringKeyedMap(json.decode(line));
});
void stdoutCommandResponse(Map<String, dynamic> command) {
globals.stdio.stdoutWrite(
'[${jsonEncodeObject(command)}]\n',
fallback: (String message, dynamic error, StackTrace stack) {
throwToolExit('Failed to write daemon command response to stdout: $error');
},
);
}
String jsonEncodeObject(dynamic object) {
return json.encode(object, toEncodable: _toEncodable);
}
dynamic _toEncodable(dynamic object) {
if (object is OperationResult) {
return _operationResultToMap(object);
}
return object;
}
Future<Map<String, dynamic>> _deviceToMap(Device device) async {
return <String, dynamic>{
'id': device.id,
......@@ -970,7 +963,7 @@ dynamic _toJsonable(dynamic obj) {
return obj;
}
if (obj is OperationResult) {
return obj;
return _operationResultToMap(obj);
}
if (obj is ToolExit) {
return obj.message;
......
......@@ -14,6 +14,7 @@ import '../base/common.dart';
import '../base/file_system.dart';
import '../base/utils.dart';
import '../build_info.dart';
import '../daemon.dart';
import '../device.dart';
import '../features.dart';
import '../globals.dart' as globals;
......@@ -556,8 +557,10 @@ class RunCommand extends RunCommandBase {
throwToolExit('"--machine" does not support "-d all".');
}
final Daemon daemon = Daemon(
stdinCommandStream,
stdoutCommandResponse,
DaemonConnection(
daemonStreams: StdioDaemonStreams(globals.stdio),
logger: globals.logger,
),
notifyingLogger: (globals.logger is NotifyingLogger)
? globals.logger as NotifyingLogger
: NotifyingLogger(verbose: globals.logger.isVerbose, parent: globals.logger),
......
// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
import 'dart:async';
import 'base/common.dart';
import 'base/io.dart';
import 'base/logger.dart';
import 'base/utils.dart';
import 'convert.dart';
/// Parse binary streams in the JSON RPC format understood by the daemon, and
/// convert it into a stream of JSON RPC messages.
Stream<Map<String, Object?>> _convertInputStream(Stream<List<int>> inputStream) {
return utf8.decoder.bind(inputStream)
.transform<String>(const LineSplitter())
.where((String line) => line.startsWith('[{') && line.endsWith('}]'))
.map<Map<String, Object?>?>((String line) {
line = line.substring(1, line.length - 1);
return castStringKeyedMap(json.decode(line));
})
.where((Map<String, Object?>? entry) => entry != null)
.cast<Map<String, Object?>>();
}
/// A stream that a [DaemonConnection] uses to communicate with each other.
abstract class DaemonStreams {
/// Stream that contains input to the [DaemonConnection].
Stream<Map<String, Object?>> get inputStream;
/// Outputs a message through the connection.
void send(Map<String, Object?> message);
/// Cleans up any resources used.
Future<void> dispose() async { }
}
/// A [DaemonStream] that uses stdin and stdout as the underlying streams.
class StdioDaemonStreams extends DaemonStreams {
StdioDaemonStreams(Stdio stdio) :
_stdio = stdio,
inputStream = _convertInputStream(stdio.stdin);
final Stdio _stdio;
@override
final Stream<Map<String, Object?>> inputStream;
@override
void send(Map<String, Object?> message) {
_stdio.stdoutWrite(
'[${json.encode(message)}]\n',
fallback: (String message, Object? error, StackTrace stack) {
throwToolExit('Failed to write daemon command response to stdout: $error');
},
);
}
}
/// A [DaemonStream] that uses [Socket] as the underlying stream.
class TcpDaemonStreams extends DaemonStreams {
/// Creates a [DaemonStreams] with an existing [Socket].
TcpDaemonStreams(
Socket socket, {
required Logger logger,
}): _logger = logger {
_socket = Future<Socket>.value(_initializeSocket(socket));
}
/// Connects to a remote host and creates a [DaemonStreams] from the socket.
TcpDaemonStreams.connect(
String host,
int port, {
required Logger logger,
}) : _logger = logger {
_socket = Socket.connect(host, port).then(_initializeSocket);
}
late final Future<Socket> _socket;
final StreamController<Map<String, Object?>> _commands = StreamController<Map<String, Object?>>();
final Logger _logger;
@override
Stream<Map<String, Object?>> get inputStream => _commands.stream;
@override
void send(Map<String, Object?> message) {
_socket.then((Socket socket) {
try {
socket.write('[${json.encode(message)}]\n');
} on SocketException catch (error) {
_logger.printError('Failed to write daemon command response to socket: $error');
// Failed to send, close the connection
socket.close();
}
});
}
Socket _initializeSocket(Socket socket) {
_commands.addStream(_convertInputStream(socket));
return socket;
}
@override
Future<void> dispose() async {
await (await _socket).close();
}
}
/// Connection between a flutter daemon and a client.
class DaemonConnection {
DaemonConnection({
required DaemonStreams daemonStreams,
required Logger logger,
}): _logger = logger,
_daemonStreams = daemonStreams {
_commandSubscription = daemonStreams.inputStream.listen(
_handleData,
onError: (Object error, StackTrace stackTrace) {
// We have to listen for on error otherwise the error on the socket
// will end up in the Zone error handler.
// Do nothing here and let the stream close handlers handle shutting
// down the daemon.
}
);
}
final DaemonStreams _daemonStreams;
final Logger _logger;
late final StreamSubscription<Map<String, Object?>> _commandSubscription;
int _outgoingRequestId = 0;
final Map<String, Completer<Object?>> _outgoingRequestCompleters = <String, Completer<Object?>>{};
final StreamController<Map<String, Object?>> _events = StreamController<Map<String, Object?>>.broadcast();
final StreamController<Map<String, Object?>> _incomingCommands = StreamController<Map<String, Object?>>();
/// A stream that contains all the incoming requests.
Stream<Map<String, Object?>> get incomingCommands => _incomingCommands.stream;
/// Listens to the event with the event name [eventToListen].
Stream<Object?> listenToEvent(String eventToListen) {
return _events.stream
.where((Map<String, Object?> event) => event['event'] == eventToListen)
.map<Object?>((Map<String, Object?> event) => event['params']);
}
/// Sends a request to the other end of the connection.
///
/// Returns a [Future] that resolves with the content.
Future<Object?> sendRequest(String method, [Object? params]) async {
final String id = '${++_outgoingRequestId}';
final Completer<Object?> completer = Completer<Object?>();
_outgoingRequestCompleters[id] = completer;
final Map<String, Object?> data = <String, Object?>{
'id': id,
'method': method,
if (params != null) 'params': params,
};
_logger.printTrace('-> Sending to daemon, id = $id, method = $method');
_daemonStreams.send(data);
return completer.future;
}
/// Sends a response to the other end of the connection.
void sendResponse(Object id, [Object? result]) {
_daemonStreams.send(<String, Object?>{
'id': id,
if (result != null) 'result': result,
});
}
/// Sends an error response to the other end of the connection.
void sendErrorResponse(Object id, Object error, StackTrace trace) {
_daemonStreams.send(<String, Object?>{
'id': id,
'error': error,
'trace': '$trace',
});
}
/// Sends an event to the client.
void sendEvent(String name, [ Object? params ]) {
_daemonStreams.send(<String, Object?>{
'event': name,
if (params != null) 'params': params,
});
}
/// Handles the input from the stream.
///
/// There are three kinds of data: Request, Response, Event.
///
/// Request:
/// {"id": <Object>. "method": <String>, "params": <optional, Object?>}
///
/// Response:
/// {"id": <Object>. "result": <optional, Object?>} for a successful response.
/// {"id": <Object>. "error": <Object>, "stackTrace": <String>} for an error response.
///
/// Event:
/// {"event": <String>. "params": <optional, Object?>}
void _handleData(Map<String, Object?> data) {
if (data['id'] != null) {
if (data['method'] == null) {
// This is a response to previously sent request.
final String id = data['id']! as String;
if (data['error'] != null) {
// This is an error response.
_logger.printTrace('<- Error response received from daemon, id = $id');
final Object error = data['error']!;
final String stackTrace = data['stackTrace'] as String? ?? '';
_outgoingRequestCompleters.remove(id)?.completeError(error, StackTrace.fromString(stackTrace));
} else {
_logger.printTrace('<- Response received from daemon, id = $id');
final Object? result = data['result'];
_outgoingRequestCompleters.remove(id)?.complete(result);
}
} else {
_incomingCommands.add(data);
}
} else if (data['event'] != null) {
// This is an event
_logger.printTrace('<- Event received: ${data['event']}');
_events.add(data);
} else {
_logger.printError('Unknown data received from daemon');
}
}
/// Cleans up any resources used in the connection.
Future<void> dispose() async {
await _commandSubscription.cancel();
await _daemonStreams.dispose();
unawaited(_events.close());
unawaited(_incomingCommands.close());
}
}
// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
import 'dart:async';
import 'dart:typed_data';
import 'package:flutter_tools/src/base/common.dart';
import 'package:flutter_tools/src/base/io.dart';
import 'package:flutter_tools/src/base/logger.dart';
import 'package:flutter_tools/src/convert.dart';
import 'package:flutter_tools/src/daemon.dart';
import 'package:test/fake.dart';
import '../src/common.dart';
class FakeDaemonStreams extends DaemonStreams {
final StreamController<Map<String, dynamic>> inputs = StreamController<Map<String, dynamic>>();
final StreamController<Map<String, dynamic>> outputs = StreamController<Map<String, dynamic>>();
@override
Stream<Map<String, dynamic>> get inputStream {
return inputs.stream;
}
@override
void send(Map<String, dynamic> message) {
outputs.add(message);
}
@override
Future<void> dispose() async {
await inputs.close();
// In some tests, outputs have no listeners. We don't wait for outputs to close.
unawaited(outputs.close());
}
}
void main() {
late BufferLogger bufferLogger;
late FakeDaemonStreams daemonStreams;
late DaemonConnection daemonConnection;
setUp(() {
bufferLogger = BufferLogger.test();
daemonStreams = FakeDaemonStreams();
daemonConnection = DaemonConnection(
daemonStreams: daemonStreams,
logger: bufferLogger,
);
});
tearDown(() async {
await daemonConnection.dispose();
});
group('DaemonConnection receiving end', () {
testWithoutContext('redirects input to incoming commands', () async {
final Map<String, dynamic> commandToSend = <String, dynamic>{'id': 0, 'method': 'some_method'};
daemonStreams.inputs.add(commandToSend);
final Map<String, dynamic> commandReceived = await daemonConnection.incomingCommands.first;
await daemonStreams.dispose();
expect(commandReceived, commandToSend);
});
testWithoutContext('listenToEvent can receive the right events', () async {
final Future<List<dynamic>> events = daemonConnection.listenToEvent('event1').toList();
daemonStreams.inputs.add(<String, dynamic>{'event': 'event1', 'params': '1'});
daemonStreams.inputs.add(<String, dynamic>{'event': 'event2', 'params': '2'});
daemonStreams.inputs.add(<String, dynamic>{'event': 'event1', 'params': null});
daemonStreams.inputs.add(<String, dynamic>{'event': 'event1', 'params': 3});
await pumpEventQueue();
await daemonConnection.dispose();
expect(await events, <dynamic>['1', null, 3]);
});
});
group('DaemonConnection sending end', () {
testWithoutContext('sending requests', () async {
unawaited(daemonConnection.sendRequest('some_method', 'param'));
final Map<String, dynamic> data = await daemonStreams.outputs.stream.first;
expect(data['id'], isNotNull);
expect(data['method'], 'some_method');
expect(data['params'], 'param');
});
testWithoutContext('sending requests without param', () async {
unawaited(daemonConnection.sendRequest('some_method'));
final Map<String, dynamic> data = await daemonStreams.outputs.stream.first;
expect(data['id'], isNotNull);
expect(data['method'], 'some_method');
expect(data['params'], isNull);
});
testWithoutContext('sending response', () async {
daemonConnection.sendResponse('1', 'some_data');
final Map<String, dynamic> data = await daemonStreams.outputs.stream.first;
expect(data['id'], '1');
expect(data['method'], isNull);
expect(data['error'], isNull);
expect(data['result'], 'some_data');
});
testWithoutContext('sending response without data', () async {
daemonConnection.sendResponse('1');
final Map<String, dynamic> data = await daemonStreams.outputs.stream.first;
expect(data['id'], '1');
expect(data['method'], isNull);
expect(data['error'], isNull);
expect(data['result'], isNull);
});
testWithoutContext('sending error response', () async {
daemonConnection.sendErrorResponse('1', 'error', StackTrace.fromString('stack trace'));
final Map<String, dynamic> data = await daemonStreams.outputs.stream.first;
expect(data['id'], '1');
expect(data['method'], isNull);
expect(data['error'], 'error');
expect(data['trace'], 'stack trace');
});
testWithoutContext('sending events', () async {
daemonConnection.sendEvent('some_event', '123');
final Map<String, dynamic> data = await daemonStreams.outputs.stream.first;
expect(data['id'], isNull);
expect(data['event'], 'some_event');
expect(data['params'], '123');
});
testWithoutContext('sending events without params', () async {
daemonConnection.sendEvent('some_event');
final Map<String, dynamic> data = await daemonStreams.outputs.stream.first;
expect(data['id'], isNull);
expect(data['event'], 'some_event');
expect(data['params'], isNull);
});
});
group('DaemonConnection request and response', () {
testWithoutContext('receiving response from requests', () async {
final Future<dynamic> requestFuture = daemonConnection.sendRequest('some_method', 'param');
final Map<String, dynamic> data = await daemonStreams.outputs.stream.first;
expect(data['id'], isNotNull);
expect(data['method'], 'some_method');
expect(data['params'], 'param');
final String id = data['id'] as String;
daemonStreams.inputs.add(<String, dynamic>{'id': id, 'result': '123'});
expect(await requestFuture, '123');
});
testWithoutContext('receiving response from requests without result', () async {
final Future<dynamic> requestFuture = daemonConnection.sendRequest('some_method', 'param');
final Map<String, dynamic> data = await daemonStreams.outputs.stream.first;
expect(data['id'], isNotNull);
expect(data['method'], 'some_method');
expect(data['params'], 'param');
final String id = data['id'] as String;
daemonStreams.inputs.add(<String, dynamic>{'id': id});
expect(await requestFuture, null);
});
testWithoutContext('receiving error response from requests without result', () async {
final Future<dynamic> requestFuture = daemonConnection.sendRequest('some_method', 'param');
final Map<String, dynamic> data = await daemonStreams.outputs.stream.first;
expect(data['id'], isNotNull);
expect(data['method'], 'some_method');
expect(data['params'], 'param');
final String id = data['id'] as String;
daemonStreams.inputs.add(<String, dynamic>{'id': id, 'error': 'some_error', 'trace': 'stack trace'});
expect(requestFuture, throwsA('some_error'));
});
});
group('TcpDaemonStreams', () {
final Map<String, Object?> testCommand = <String, Object?>{
'id': 100,
'method': 'test',
};
late FakeSocket socket;
late TcpDaemonStreams daemonStreams;
setUp(() {
socket = FakeSocket();
daemonStreams = TcpDaemonStreams(socket, logger: bufferLogger);
});
test('parses the message received on the socket', () async {
socket.controller.add(Uint8List.fromList(utf8.encode('[${jsonEncode(testCommand)}]\n')));
final Map<String, Object?> command = await daemonStreams.inputStream.first;
expect(command, testCommand);
});
test('sends the encoded message through the socket', () async {
daemonStreams.send(testCommand);
await pumpEventQueue();
expect(socket.writtenObjects.length, 1);
expect(socket.writtenObjects[0].toString(), '[${jsonEncode(testCommand)}]\n');
});
test('dispose calls socket.close', () async {
await daemonStreams.dispose();
expect(socket.closeCalled, isTrue);
});
});
}
class FakeSocket extends Fake implements Socket {
bool closeCalled = false;
final StreamController<Uint8List> controller = StreamController<Uint8List>();
final List<Object?> writtenObjects = <Object?>[];
@override
StreamSubscription<Uint8List> listen(
void Function(Uint8List event)? onData, {
Function? onError,
void Function()? onDone,
bool? cancelOnError,
}) {
return controller.stream.listen(onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
@override
void write(Object? object) {
writtenObjects.add(object);
}
@override
Future<void> close() async {
closeCalled = true;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment