1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
import 'dart:async';
import 'dart:typed_data';
import 'package:meta/meta.dart';
import 'base/common.dart';
import 'base/io.dart';
import 'base/logger.dart';
import 'base/utils.dart';
import 'convert.dart';
/// A single message passed through the [DaemonConnection].
class DaemonMessage {
DaemonMessage(this.data, [this.binary]);
/// Content of the JSON message in the message.
final Map<String, Object?> data;
/// Stream of the binary content of the message.
///
/// Must be listened to if binary data is present.
final Stream<List<int>>? binary;
}
/// Data of an event passed through the [DaemonConnection].
class DaemonEventData {
DaemonEventData(this.eventName, this.data, [this.binary]);
/// The name of the event.
final String eventName;
/// The data of the event.
final Object? data;
/// Stream of the binary content of the event.
///
/// Must be listened to if binary data is present.
final Stream<List<int>>? binary;
}
const String _binaryLengthKey = '_binaryLength';
enum _InputStreamParseState {
json,
binary,
}
/// Converts a binary stream to a stream of [DaemonMessage].
///
/// The daemon JSON-RPC protocol is defined as follows: every single line of
/// text that starts with `[{` and ends with `}]` will be parsed as a JSON
/// message. The array should contain only one single object which contains the
/// message data.
///
/// If the JSON object contains the key [_binaryLengthKey] with an integer
/// value (will be referred to as N), the following N bytes after the newline
/// character will contain the binary part of the message.
@visibleForTesting
class DaemonInputStreamConverter {
DaemonInputStreamConverter(this.inputStream) {
// Lazily listen to the input stream.
_controller.onListen = () {
final StreamSubscription<List<int>> subscription = inputStream.listen((List<int> chunk) {
_processChunk(chunk);
}, onError: (Object error, StackTrace stackTrace) {
_controller.addError(error, stackTrace);
}, onDone: () {
unawaited(_controller.close());
});
_controller.onCancel = subscription.cancel;
// We should not handle onPause or onResume. When the stream is paused, we
// still need to read from the input stream.
};
}
final Stream<List<int>> inputStream;
final StreamController<DaemonMessage> _controller = StreamController<DaemonMessage>();
Stream<DaemonMessage> get convertedStream => _controller.stream;
// Internal states
/// The current parse state, whether we are expecting JSON or binary data.
_InputStreamParseState state = _InputStreamParseState.json;
/// The binary stream that is being transferred.
late StreamController<List<int>> currentBinaryStream;
/// Remaining length in bytes that have to be sent to the binary stream.
int remainingBinaryLength = 0;
/// Buffer to hold the current line of input data.
final BytesBuilder bytesBuilder = BytesBuilder(copy: false);
// Processes a single chunk received in the input stream.
void _processChunk(List<int> chunk) {
int start = 0;
while (start < chunk.length) {
if (state == _InputStreamParseState.json) {
start += _processChunkInJsonMode(chunk, start);
} else if (state == _InputStreamParseState.binary) {
final int bytesSent = _addBinaryChunk(chunk, start, remainingBinaryLength);
start += bytesSent;
remainingBinaryLength -= bytesSent;
if (remainingBinaryLength <= 0) {
assert(remainingBinaryLength == 0);
unawaited(currentBinaryStream.close());
state = _InputStreamParseState.json;
}
}
}
}
/// Processes a chunk in JSON mode, and returns the number of bytes processed.
int _processChunkInJsonMode(List<int> chunk, int start) {
const int LF = 10; // The '\n' character
// Search for newline character.
final int indexOfNewLine = chunk.indexOf(LF, start);
if (indexOfNewLine < 0) {
bytesBuilder.add(chunk.sublist(start));
return chunk.length - start;
}
bytesBuilder.add(chunk.sublist(start, indexOfNewLine + 1));
// Process chunk here
final Uint8List combinedChunk = bytesBuilder.takeBytes();
String jsonString = utf8.decode(combinedChunk).trim();
if (jsonString.startsWith('[{') && jsonString.endsWith('}]')) {
jsonString = jsonString.substring(1, jsonString.length - 1);
final Map<String, Object?>? value = castStringKeyedMap(json.decode(jsonString));
if (value != null) {
// Check if we need to consume another binary blob.
if (value[_binaryLengthKey] != null) {
remainingBinaryLength = value[_binaryLengthKey]! as int;
currentBinaryStream = StreamController<List<int>>();
state = _InputStreamParseState.binary;
_controller.add(DaemonMessage(value, currentBinaryStream.stream));
} else {
_controller.add(DaemonMessage(value));
}
}
}
return indexOfNewLine + 1 - start;
}
int _addBinaryChunk(List<int> chunk, int start, int maximumSizeToRead) {
if (start == 0 && chunk.length <= remainingBinaryLength) {
currentBinaryStream.add(chunk);
return chunk.length;
} else {
final int chunkRemainingLength = chunk.length - start;
final int sizeToRead = chunkRemainingLength < remainingBinaryLength ? chunkRemainingLength : remainingBinaryLength;
currentBinaryStream.add(chunk.sublist(start, start + sizeToRead));
return sizeToRead;
}
}
}
/// A stream that a [DaemonConnection] uses to communicate with each other.
class DaemonStreams {
DaemonStreams(
Stream<List<int>> rawInputStream,
StreamSink<List<int>> outputSink, {
required Logger logger,
}) :
_outputSink = outputSink,
inputStream = DaemonInputStreamConverter(rawInputStream).convertedStream,
_logger = logger;
/// Creates a [DaemonStreams] that uses stdin and stdout as the underlying streams.
DaemonStreams.fromStdio(Stdio stdio, { required Logger logger })
: this(stdio.stdin, stdio.stdout, logger: logger);
/// Creates a [DaemonStreams] that uses [Socket] as the underlying streams.
DaemonStreams.fromSocket(Socket socket, { required Logger logger })
: this(socket, socket, logger: logger);
/// Connects to a server and creates a [DaemonStreams] from the connection as the underlying streams.
factory DaemonStreams.connect(String host, int port, { required Logger logger }) {
final Future<Socket> socketFuture = Socket.connect(host, port);
final StreamController<List<int>> inputStreamController = StreamController<List<int>>();
final StreamController<List<int>> outputStreamController = StreamController<List<int>>();
socketFuture.then((Socket socket) {
inputStreamController.addStream(socket);
socket.addStream(outputStreamController.stream);
}).onError((Object error, StackTrace stackTrace) {
logger.printError('Socket error: $error');
logger.printTrace('$stackTrace');
// Propagate the error to the streams.
inputStreamController.addError(error, stackTrace);
unawaited(outputStreamController.close());
});
return DaemonStreams(inputStreamController.stream, outputStreamController.sink, logger: logger);
}
final StreamSink<List<int>> _outputSink;
final Logger _logger;
/// Stream that contains input to the [DaemonConnection].
final Stream<DaemonMessage> inputStream;
/// Outputs a message through the connection.
void send(Map<String, Object?> message, [ List<int>? binary ]) {
try {
if (binary != null) {
message[_binaryLengthKey] = binary.length;
}
_outputSink.add(utf8.encode('[${json.encode(message)}]\n'));
if (binary != null) {
_outputSink.add(binary);
}
} on StateError catch (error) {
_logger.printError('Failed to write daemon command response: $error');
// Failed to send, close the connection
_outputSink.close();
} on IOException catch (error) {
_logger.printError('Failed to write daemon command response: $error');
// Failed to send, close the connection
_outputSink.close();
}
}
/// Cleans up any resources used.
Future<void> dispose() async {
unawaited(_outputSink.close());
}
}
/// Connection between a flutter daemon and a client.
class DaemonConnection {
DaemonConnection({
required DaemonStreams daemonStreams,
required Logger logger,
}): _logger = logger,
_daemonStreams = daemonStreams {
_commandSubscription = daemonStreams.inputStream.listen(
_handleMessage,
onError: (Object error, StackTrace stackTrace) {
// We have to listen for on error otherwise the error on the socket
// will end up in the Zone error handler.
// Do nothing here and let the stream close handlers handle shutting
// down the daemon.
}
);
}
final DaemonStreams _daemonStreams;
final Logger _logger;
late final StreamSubscription<DaemonMessage> _commandSubscription;
int _outgoingRequestId = 0;
final Map<String, Completer<Object?>> _outgoingRequestCompleters = <String, Completer<Object?>>{};
final StreamController<DaemonEventData> _events = StreamController<DaemonEventData>.broadcast();
final StreamController<DaemonMessage> _incomingCommands = StreamController<DaemonMessage>();
/// A stream that contains all the incoming requests.
Stream<DaemonMessage> get incomingCommands => _incomingCommands.stream;
/// Listens to the event with the event name [eventToListen].
Stream<DaemonEventData> listenToEvent(String eventToListen) {
return _events.stream
.where((DaemonEventData event) => event.eventName == eventToListen);
}
/// Sends a request to the other end of the connection.
///
/// Returns a [Future] that resolves with the content.
Future<Object?> sendRequest(String method, [Object? params, List<int>? binary]) async {
final String id = '${++_outgoingRequestId}';
final Completer<Object?> completer = Completer<Object?>();
_outgoingRequestCompleters[id] = completer;
final Map<String, Object?> data = <String, Object?>{
'id': id,
'method': method,
if (params != null) 'params': params,
};
_logger.printTrace('-> Sending to daemon, id = $id, method = $method');
_daemonStreams.send(data, binary);
return completer.future;
}
/// Sends a response to the other end of the connection.
void sendResponse(Object id, [Object? result]) {
_daemonStreams.send(<String, Object?>{
'id': id,
if (result != null) 'result': result,
});
}
/// Sends an error response to the other end of the connection.
void sendErrorResponse(Object id, Object? error, StackTrace trace) {
_daemonStreams.send(<String, Object?>{
'id': id,
'error': error,
'trace': '$trace',
});
}
/// Sends an event to the client.
void sendEvent(String name, [ Object? params, List<int>? binary ]) {
_daemonStreams.send(<String, Object?>{
'event': name,
if (params != null) 'params': params,
}, binary);
}
/// Handles the input from the stream.
///
/// There are three kinds of data: Request, Response, Event.
///
/// Request:
/// {"id": <Object>. "method": <String>, "params": <optional, Object?>}
///
/// Response:
/// {"id": <Object>. "result": <optional, Object?>} for a successful response.
/// {"id": <Object>. "error": <Object>, "stackTrace": <String>} for an error response.
///
/// Event:
/// {"event": <String>. "params": <optional, Object?>}
void _handleMessage(DaemonMessage message) {
final Map<String, Object?> data = message.data;
if (data['id'] != null) {
if (data['method'] == null) {
// This is a response to previously sent request.
final String id = data['id']! as String;
if (data['error'] != null) {
// This is an error response.
_logger.printTrace('<- Error response received from daemon, id = $id');
final Object error = data['error']!;
final String stackTrace = data['stackTrace'] as String? ?? '';
_outgoingRequestCompleters.remove(id)?.completeError(error, StackTrace.fromString(stackTrace));
} else {
_logger.printTrace('<- Response received from daemon, id = $id');
final Object? result = data['result'];
_outgoingRequestCompleters.remove(id)?.complete(result);
}
} else {
_incomingCommands.add(message);
}
} else if (data['event'] != null) {
// This is an event
_logger.printTrace('<- Event received: ${data['event']}');
final Object? eventName = data['event'];
if (eventName is String) {
_events.add(DaemonEventData(
eventName,
data['params'],
message.binary,
));
} else {
throwToolExit('event name received is not string!');
}
} else {
_logger.printError('Unknown data received from daemon');
}
}
/// Cleans up any resources used in the connection.
Future<void> dispose() async {
await _commandSubscription.cancel();
await _daemonStreams.dispose();
unawaited(_events.close());
unawaited(_incomingCommands.close());
}
}