daemon.dart 13.1 KB
Newer Older
1 2 3 4 5
// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

import 'dart:async';
6 7 8
import 'dart:typed_data';

import 'package:meta/meta.dart';
9 10 11 12 13 14 15

import 'base/common.dart';
import 'base/io.dart';
import 'base/logger.dart';
import 'base/utils.dart';
import 'convert.dart';

16 17 18 19 20 21 22 23 24 25 26
/// A single message passed through the [DaemonConnection].
class DaemonMessage {
  DaemonMessage(this.data, [this.binary]);

  /// Content of the JSON message in the message.
  final Map<String, Object?> data;

  /// Stream of the binary content of the message.
  ///
  /// Must be listened to if binary data is present.
  final Stream<List<int>>? binary;
27 28
}

29 30 31
/// Data of an event passed through the [DaemonConnection].
class DaemonEventData {
  DaemonEventData(this.eventName, this.data, [this.binary]);
32

33 34
  /// The name of the event.
  final String eventName;
35

36 37 38 39 40 41 42
  /// The data of the event.
  final Object? data;

  /// Stream of the binary content of the event.
  ///
  /// Must be listened to if binary data is present.
  final Stream<List<int>>? binary;
43 44
}

45
const String _binaryLengthKey = '_binaryLength';
46

47 48 49 50
enum _InputStreamParseState {
  json,
  binary,
}
51

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
/// Converts a binary stream to a stream of [DaemonMessage].
///
/// The daemon JSON-RPC protocol is defined as follows: every single line of
/// text that starts with `[{` and ends with `}]` will be parsed as a JSON
/// message. The array should contain only one single object which contains the
/// message data.
///
/// If the JSON object contains the key [_binaryLengthKey] with an integer
/// value (will be refered to as N), the following N bytes after the newline
/// character will contain the binary part of the message.
@visibleForTesting
class DaemonInputStreamConverter {
  DaemonInputStreamConverter(this.inputStream) {
    // Lazily listen to the input stream.
    _controller.onListen = () {
      final StreamSubscription<List<int>> subscription = inputStream.listen((List<int> chunk) {
        _processChunk(chunk);
      }, onError: (Object error, StackTrace stackTrace) {
        _controller.addError(error, stackTrace);
      }, onDone: () {
        unawaited(_controller.close());
      });
74

75 76 77 78
      _controller.onCancel = subscription.cancel;
      // We should not handle onPause or onResume. When the stream is paused, we
      // still need to read from the input stream.
    };
79 80
  }

81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
  final Stream<List<int>> inputStream;

  final StreamController<DaemonMessage> _controller = StreamController<DaemonMessage>();
  Stream<DaemonMessage> get convertedStream => _controller.stream;

  // Internal states
  /// The current parse state, whether we are expecting JSON or binary data.
  _InputStreamParseState state = _InputStreamParseState.json;

  /// The binary stream that is being transferred.
  late StreamController<List<int>> currentBinaryStream;

  /// Remaining length in bytes that have to be sent to the binary stream.
  int remainingBinaryLength = 0;

  /// Buffer to hold the current line of input data.
  final BytesBuilder bytesBuilder = BytesBuilder(copy: false);

  // Processes a single chunk received in the input stream.
  void _processChunk(List<int> chunk) {

    int start = 0;
    while (start < chunk.length) {
      if (state == _InputStreamParseState.json) {
105
        start += _processChunkInJsonMode(chunk, start);
106 107 108 109 110 111 112 113 114 115 116 117 118
      } else if (state == _InputStreamParseState.binary) {
        final int bytesSent = _addBinaryChunk(chunk, start, remainingBinaryLength);
        start += bytesSent;
        remainingBinaryLength -= bytesSent;

        if (remainingBinaryLength <= 0) {
          assert(remainingBinaryLength == 0);

          unawaited(currentBinaryStream.close());
          state = _InputStreamParseState.json;
        }
      }
    }
119 120
  }

121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
  /// Processes a chunk in JSON mode, and returns the number of bytes processed.
  int _processChunkInJsonMode(List<int> chunk, int start) {
    const int LF = 10; // The '\n' character

    // Search for newline character.
    final int indexOfNewLine = chunk.indexOf(LF, start);
    if (indexOfNewLine < 0) {
      bytesBuilder.add(chunk.sublist(start));
      return chunk.length - start;
    }

    bytesBuilder.add(chunk.sublist(start, indexOfNewLine + 1));

    // Process chunk here
    final Uint8List combinedChunk = bytesBuilder.takeBytes();
    String jsonString = utf8.decode(combinedChunk).trim();
    if (jsonString.startsWith('[{') && jsonString.endsWith('}]')) {
      jsonString = jsonString.substring(1, jsonString.length - 1);
      final Map<String, Object?>? value = castStringKeyedMap(json.decode(jsonString));
      if (value != null) {
        // Check if we need to consume another binary blob.
        if (value[_binaryLengthKey] != null) {
          remainingBinaryLength = value[_binaryLengthKey]! as int;
          currentBinaryStream = StreamController<List<int>>();
          state = _InputStreamParseState.binary;
          _controller.add(DaemonMessage(value, currentBinaryStream.stream));
        } else {
          _controller.add(DaemonMessage(value));
        }
      }
    }

    return indexOfNewLine + 1 - start;
  }

156 157 158 159 160 161 162 163 164 165
  int _addBinaryChunk(List<int> chunk, int start, int maximumSizeToRead) {
    if (start == 0 && chunk.length <= remainingBinaryLength) {
      currentBinaryStream.add(chunk);
      return chunk.length;
    } else {
      final int chunkRemainingLength = chunk.length - start;
      final int sizeToRead = chunkRemainingLength < remainingBinaryLength ? chunkRemainingLength : remainingBinaryLength;
      currentBinaryStream.add(chunk.sublist(start, start + sizeToRead));
      return sizeToRead;
    }
166
  }
167
}
168

169 170 171 172 173 174 175 176 177 178 179
/// A stream that a [DaemonConnection] uses to communicate with each other.
class DaemonStreams {
  DaemonStreams(
    Stream<List<int>> rawInputStream,
    StreamSink<List<int>> outputSink, {
    required Logger logger,
  }) :
    _outputSink = outputSink,
    inputStream = DaemonInputStreamConverter(rawInputStream).convertedStream,
    _logger = logger;

180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
  /// Creates a [DaemonStreams] that uses stdin and stdout as the underlying streams.
  DaemonStreams.fromStdio(Stdio stdio, { required Logger logger })
    : this(stdio.stdin, stdio.stdout, logger: logger);

  /// Creates a [DaemonStreams] that uses [Socket] as the underlying streams.
  DaemonStreams.fromSocket(Socket socket, { required Logger logger })
    : this(socket, socket, logger: logger);

  /// Connects to a server and creates a [DaemonStreams] from the connection as the underlying streams.
  factory DaemonStreams.connect(String host, int port, { required Logger logger }) {
    final Future<Socket> socketFuture = Socket.connect(host, port);
    final StreamController<List<int>> inputStreamController = StreamController<List<int>>();
    final StreamController<List<int>> outputStreamController = StreamController<List<int>>();
    socketFuture.then((Socket socket) {
      inputStreamController.addStream(socket);
      socket.addStream(outputStreamController.stream);
    }).onError((Object error, StackTrace stackTrace) {
      logger.printError('Socket error: $error');
      logger.printTrace('$stackTrace');
      // Propagate the error to the streams.
      inputStreamController.addError(error, stackTrace);
      unawaited(outputStreamController.close());
    });
    return DaemonStreams(inputStreamController.stream, outputStreamController.sink, logger: logger);
  }

206
  final StreamSink<List<int>> _outputSink;
207 208
  final Logger _logger;

209 210 211 212 213 214 215 216
  /// Stream that contains input to the [DaemonConnection].
  final Stream<DaemonMessage> inputStream;

  /// Outputs a message through the connection.
  void send(Map<String, Object?> message, [ List<int>? binary ]) {
    try {
      if (binary != null) {
        message[_binaryLengthKey] = binary.length;
217
      }
218 219 220 221
      _outputSink.add(utf8.encode('[${json.encode(message)}]\n'));
      if (binary != null) {
        _outputSink.add(binary);
      }
222 223 224 225
    } on StateError catch (error) {
      _logger.printError('Failed to write daemon command response: $error');
      // Failed to send, close the connection
      _outputSink.close();
226 227 228 229 230
    } on IOException catch (error) {
      _logger.printError('Failed to write daemon command response: $error');
      // Failed to send, close the connection
      _outputSink.close();
    }
231 232
  }

233 234 235
  /// Cleans up any resources used.
  Future<void> dispose() async {
    unawaited(_outputSink.close());
236 237 238 239 240 241 242 243 244 245 246
  }
}

/// Connection between a flutter daemon and a client.
class DaemonConnection {
  DaemonConnection({
    required DaemonStreams daemonStreams,
    required Logger logger,
  }): _logger = logger,
      _daemonStreams = daemonStreams {
    _commandSubscription = daemonStreams.inputStream.listen(
247
      _handleMessage,
248 249 250 251 252 253 254 255 256 257 258 259 260
      onError: (Object error, StackTrace stackTrace) {
        // We have to listen for on error otherwise the error on the socket
        // will end up in the Zone error handler.
        // Do nothing here and let the stream close handlers handle shutting
        // down the daemon.
      }
    );
  }

  final DaemonStreams _daemonStreams;

  final Logger _logger;

261
  late final StreamSubscription<DaemonMessage> _commandSubscription;
262 263 264 265

  int _outgoingRequestId = 0;
  final Map<String, Completer<Object?>> _outgoingRequestCompleters = <String, Completer<Object?>>{};

266 267
  final StreamController<DaemonEventData> _events = StreamController<DaemonEventData>.broadcast();
  final StreamController<DaemonMessage> _incomingCommands = StreamController<DaemonMessage>();
268 269

  /// A stream that contains all the incoming requests.
270
  Stream<DaemonMessage> get incomingCommands => _incomingCommands.stream;
271 272

  /// Listens to the event with the event name [eventToListen].
273
  Stream<DaemonEventData> listenToEvent(String eventToListen) {
274
    return _events.stream
275
      .where((DaemonEventData event) => event.eventName == eventToListen);
276 277 278 279 280
  }

  /// Sends a request to the other end of the connection.
  ///
  /// Returns a [Future] that resolves with the content.
281
  Future<Object?> sendRequest(String method, [Object? params, List<int>? binary]) async {
282 283 284 285 286 287 288 289 290
    final String id = '${++_outgoingRequestId}';
    final Completer<Object?> completer = Completer<Object?>();
    _outgoingRequestCompleters[id] = completer;
    final Map<String, Object?> data = <String, Object?>{
      'id': id,
      'method': method,
      if (params != null) 'params': params,
    };
    _logger.printTrace('-> Sending to daemon, id = $id, method = $method');
291
    _daemonStreams.send(data, binary);
292 293 294 295 296 297 298 299 300 301 302 303
    return completer.future;
  }

  /// Sends a response to the other end of the connection.
  void sendResponse(Object id, [Object? result]) {
    _daemonStreams.send(<String, Object?>{
      'id': id,
      if (result != null) 'result': result,
    });
  }

  /// Sends an error response to the other end of the connection.
304
  void sendErrorResponse(Object id, Object? error, StackTrace trace) {
305 306 307 308 309 310 311 312
    _daemonStreams.send(<String, Object?>{
      'id': id,
      'error': error,
      'trace': '$trace',
    });
  }

  /// Sends an event to the client.
313
  void sendEvent(String name, [ Object? params, List<int>? binary ]) {
314 315 316
    _daemonStreams.send(<String, Object?>{
      'event': name,
      if (params != null) 'params': params,
317
    }, binary);
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332
  }

  /// Handles the input from the stream.
  ///
  /// There are three kinds of data: Request, Response, Event.
  ///
  /// Request:
  /// {"id": <Object>. "method": <String>, "params": <optional, Object?>}
  ///
  /// Response:
  /// {"id": <Object>. "result": <optional, Object?>} for a successful response.
  /// {"id": <Object>. "error": <Object>, "stackTrace": <String>} for an error response.
  ///
  /// Event:
  /// {"event": <String>. "params": <optional, Object?>}
333 334
  void _handleMessage(DaemonMessage message) {
    final Map<String, Object?> data = message.data;
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
    if (data['id'] != null) {
      if (data['method'] == null) {
        // This is a response to previously sent request.
        final String id = data['id']! as String;
        if (data['error'] != null) {
          // This is an error response.
          _logger.printTrace('<- Error response received from daemon, id = $id');
          final Object error = data['error']!;
          final String stackTrace = data['stackTrace'] as String? ?? '';
          _outgoingRequestCompleters.remove(id)?.completeError(error, StackTrace.fromString(stackTrace));
        } else {
          _logger.printTrace('<- Response received from daemon, id = $id');
          final Object? result = data['result'];
          _outgoingRequestCompleters.remove(id)?.complete(result);
        }
      } else {
351
        _incomingCommands.add(message);
352 353 354 355
      }
    } else if (data['event'] != null) {
      // This is an event
      _logger.printTrace('<- Event received: ${data['event']}');
356 357 358 359 360 361 362 363 364 365
      final Object? eventName = data['event'];
      if (eventName is String) {
        _events.add(DaemonEventData(
          eventName,
          data['params'],
          message.binary,
        ));
      } else {
        throwToolExit('event name received is not string!');
      }
366 367 368 369 370 371 372 373 374 375 376 377 378
    } else {
      _logger.printError('Unknown data received from daemon');
    }
  }

  /// Cleans up any resources used in the connection.
  Future<void> dispose() async {
    await _commandSubscription.cancel();
    await _daemonStreams.dispose();
    unawaited(_events.close());
    unawaited(_incomingCommands.close());
  }
}