// Copyright 2014 The Flutter Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. import 'dart:async'; import 'base/io.dart'; import 'base/logger.dart'; import 'device.dart'; import 'device_port_forwarder.dart'; import 'globals.dart' as globals; /// Discovers a specific service protocol on a device, and forwards the service /// protocol device port to the host. class ProtocolDiscovery { ProtocolDiscovery._( this.logReader, this.serviceName, { this.portForwarder, required this.throttleDuration, this.hostPort, this.devicePort, required this.ipv6, required Logger logger, }) : _logger = logger, assert(logReader != null) { _deviceLogSubscription = logReader.logLines.listen( _handleLine, onDone: _stopScrapingLogs, ); } factory ProtocolDiscovery.observatory( DeviceLogReader logReader, { DevicePortForwarder? portForwarder, Duration? throttleDuration, int? hostPort, int? devicePort, required bool ipv6, required Logger logger, }) { const String kObservatoryService = 'Observatory'; return ProtocolDiscovery._( logReader, kObservatoryService, portForwarder: portForwarder, throttleDuration: throttleDuration ?? const Duration(milliseconds: 200), hostPort: hostPort, devicePort: devicePort, ipv6: ipv6, logger: logger, ); } final DeviceLogReader logReader; final String serviceName; final DevicePortForwarder? portForwarder; final int? hostPort; final int? devicePort; final bool ipv6; final Logger _logger; /// The time to wait before forwarding a new observatory URIs from [logReader]. final Duration throttleDuration; StreamSubscription<String>? _deviceLogSubscription; final _BufferedStreamController<Uri> _uriStreamController = _BufferedStreamController<Uri>(); /// The discovered service URL. /// /// Returns null if the log reader shuts down before any uri is found. /// /// Use [uris] instead. // TODO(egarciad): replace `uri` for `uris`. Future<Uri?> get uri async { try { return await uris.first; } on StateError { return null; } } /// The discovered service URLs. /// /// When a new observatory URL: is available in [logReader], /// the URLs are forwarded at most once every [throttleDuration]. /// Returns when no event has been observed for [throttleTimeout]. /// /// Port forwarding is only attempted when this is invoked, /// for each observatory URL in the stream. Stream<Uri> get uris { final Stream<Uri> uriStream = _uriStreamController.stream .transform(_throttle<Uri>( waitDuration: throttleDuration, )); return uriStream.asyncMap<Uri>(_forwardPort); } Future<void> cancel() => _stopScrapingLogs(); Future<void> _stopScrapingLogs() async { await _uriStreamController.close(); await _deviceLogSubscription?.cancel(); _deviceLogSubscription = null; } Match? _getPatternMatch(String line) { return globals.kVMServiceMessageRegExp.firstMatch(line); } Uri? _getObservatoryUri(String line) { final Match? match = _getPatternMatch(line); if (match != null) { return Uri.parse(match[1]!); } return null; } void _handleLine(String line) { Uri? uri; try { uri = _getObservatoryUri(line); } on FormatException catch (error, stackTrace) { _uriStreamController.addError(error, stackTrace); } if (uri == null || uri.host.isEmpty) { return; } if (devicePort != null && uri.port != devicePort) { _logger.printTrace('skipping potential observatory $uri due to device port mismatch'); return; } _uriStreamController.add(uri); } Future<Uri> _forwardPort(Uri deviceUri) async { _logger.printTrace('$serviceName URL on device: $deviceUri'); Uri hostUri = deviceUri; final DevicePortForwarder? forwarder = portForwarder; if (forwarder != null) { final int actualDevicePort = deviceUri.port; final int actualHostPort = await forwarder.forward(actualDevicePort, hostPort: hostPort); _logger.printTrace('Forwarded host port $actualHostPort to device port $actualDevicePort for $serviceName'); hostUri = deviceUri.replace(port: actualHostPort); } if (InternetAddress(hostUri.host).isLoopback && ipv6) { hostUri = hostUri.replace(host: InternetAddress.loopbackIPv6.host); } return hostUri; } } /// Provides a broadcast stream controller that buffers the events /// if there isn't a listener attached. /// The events are then delivered when a listener is attached to the stream. class _BufferedStreamController<T> { _BufferedStreamController() : _events = <dynamic>[]; /// The stream that this controller is controlling. Stream<T> get stream { return _streamController.stream; } late final StreamController<T> _streamController = () { final StreamController<T> streamControllerInstance = StreamController<T>.broadcast(); streamControllerInstance.onListen = () { for (final dynamic event in _events) { assert(T is! List); if (event is T) { streamControllerInstance.add(event); } else { streamControllerInstance.addError( (event as Iterable<dynamic>).first as Object, event.last as StackTrace, ); } } _events.clear(); }; return streamControllerInstance; }(); final List<dynamic> _events; /// Sends [event] if there is a listener attached to the broadcast stream. /// Otherwise, it enqueues [event] until a listener is attached. void add(T event) { if (_streamController.hasListener) { _streamController.add(event); } else { _events.add(event); } } /// Sends or enqueues an error event. void addError(Object error, [StackTrace? stackTrace]) { if (_streamController.hasListener) { _streamController.addError(error, stackTrace); } else { _events.add(<dynamic>[error, stackTrace]); } } /// Closes the stream. Future<void> close() { return _streamController.close(); } } /// This transformer will produce an event at most once every [waitDuration]. /// /// For example, consider a `waitDuration` of `10ms`, and list of event names /// and arrival times: `a (0ms), b (5ms), c (11ms), d (21ms)`. /// The events `a`, `c`, and `d` will be produced as a result. StreamTransformer<S, S> _throttle<S>({ required Duration waitDuration, }) { assert(waitDuration != null); S latestLine; int? lastExecution; Future<void>? throttleFuture; bool done = false; return StreamTransformer<S, S> .fromHandlers( handleData: (S value, EventSink<S> sink) { latestLine = value; final bool isFirstMessage = lastExecution == null; final int currentTime = DateTime.now().millisecondsSinceEpoch; lastExecution ??= currentTime; final int remainingTime = currentTime - lastExecution!; // Always send the first event immediately. final int nextExecutionTime = isFirstMessage || remainingTime > waitDuration.inMilliseconds ? 0 : waitDuration.inMilliseconds - remainingTime; throttleFuture ??= Future<void> .delayed(Duration(milliseconds: nextExecutionTime)) .whenComplete(() { if (done) { return; } sink.add(latestLine); throttleFuture = null; lastExecution = DateTime.now().millisecondsSinceEpoch; }); }, handleDone: (EventSink<S> sink) { done = true; sink.close(); } ); }