• Ben Konyi's avatar
    Update flutter_tools to look for new VM service message (#97683) · 2a2f9731
    Ben Konyi authored
    * Update flutter_tools to look for new VM service message
    
    The Dart SDK will soon move away from the current Observatory message:
    
    "Observatory listening on ..."
    
    To a new message that no longer references Observatory:
    
    "Dart VM Service listening on ..."
    
    This change updates all tests with mocks to check for the new message
    and also adds support for the new message in ProtocolDiscovery.
    
    See https://github.com/dart-lang/sdk/issues/46756
    
    * Fix some parsing locations
    
    * Fix analysis failures
    
    * Update message
    
    * Remove extra comment
    
    * Update message
    
    * Add globals prefix
    Unverified
    2a2f9731
protocol_discovery.dart 7.58 KB
// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

import 'dart:async';

import 'base/io.dart';
import 'base/logger.dart';
import 'device.dart';
import 'device_port_forwarder.dart';
import 'globals.dart' as globals;

/// Discovers a specific service protocol on a device, and forwards the service
/// protocol device port to the host.
class ProtocolDiscovery {
  ProtocolDiscovery._(
    this.logReader,
    this.serviceName, {
    this.portForwarder,
    required this.throttleDuration,
    this.hostPort,
    this.devicePort,
    required this.ipv6,
    required Logger logger,
  }) : _logger = logger,
       assert(logReader != null) {
    _deviceLogSubscription = logReader.logLines.listen(
      _handleLine,
      onDone: _stopScrapingLogs,
    );
  }

  factory ProtocolDiscovery.observatory(
    DeviceLogReader logReader, {
    DevicePortForwarder? portForwarder,
    Duration? throttleDuration,
    int? hostPort,
    int? devicePort,
    required bool ipv6,
    required Logger logger,
  }) {
    const String kObservatoryService = 'Observatory';
    return ProtocolDiscovery._(
      logReader,
      kObservatoryService,
      portForwarder: portForwarder,
      throttleDuration: throttleDuration ?? const Duration(milliseconds: 200),
      hostPort: hostPort,
      devicePort: devicePort,
      ipv6: ipv6,
      logger: logger,
    );
  }

  final DeviceLogReader logReader;
  final String serviceName;
  final DevicePortForwarder? portForwarder;
  final int? hostPort;
  final int? devicePort;
  final bool ipv6;
  final Logger _logger;

  /// The time to wait before forwarding a new observatory URIs from [logReader].
  final Duration throttleDuration;

  StreamSubscription<String>? _deviceLogSubscription;
  final _BufferedStreamController<Uri> _uriStreamController = _BufferedStreamController<Uri>();

  /// The discovered service URL.
  ///
  /// Returns null if the log reader shuts down before any uri is found.
  ///
  /// Use [uris] instead.
  // TODO(egarciad): replace `uri` for `uris`.
  Future<Uri?> get uri async {
    try {
      return await uris.first;
    } on StateError {
      return null;
    }
  }

  /// The discovered service URLs.
  ///
  /// When a new observatory URL: is available in [logReader],
  /// the URLs are forwarded at most once every [throttleDuration].
  /// Returns when no event has been observed for [throttleTimeout].
  ///
  /// Port forwarding is only attempted when this is invoked,
  /// for each observatory URL in the stream.
  Stream<Uri> get uris {
    final Stream<Uri> uriStream = _uriStreamController.stream
      .transform(_throttle<Uri>(
        waitDuration: throttleDuration,
      ));
    return uriStream.asyncMap<Uri>(_forwardPort);
  }

  Future<void> cancel() => _stopScrapingLogs();

  Future<void> _stopScrapingLogs() async {
    await _uriStreamController.close();
    await _deviceLogSubscription?.cancel();
    _deviceLogSubscription = null;
  }

  Match? _getPatternMatch(String line) {
    return globals.kVMServiceMessageRegExp.firstMatch(line);
  }

  Uri? _getObservatoryUri(String line) {
    final Match? match = _getPatternMatch(line);
    if (match != null) {
      return Uri.parse(match[1]!);
    }
    return null;
  }

  void _handleLine(String line) {
    Uri? uri;
    try {
      uri = _getObservatoryUri(line);
    } on FormatException catch (error, stackTrace) {
      _uriStreamController.addError(error, stackTrace);
    }
    if (uri == null || uri.host.isEmpty) {
      return;
    }
    if (devicePort != null && uri.port != devicePort) {
      _logger.printTrace('skipping potential observatory $uri due to device port mismatch');
      return;
    }
    _uriStreamController.add(uri);
  }

  Future<Uri> _forwardPort(Uri deviceUri) async {
    _logger.printTrace('$serviceName URL on device: $deviceUri');
    Uri hostUri = deviceUri;

    final DevicePortForwarder? forwarder = portForwarder;
    if (forwarder != null) {
      final int actualDevicePort = deviceUri.port;
      final int actualHostPort = await forwarder.forward(actualDevicePort, hostPort: hostPort);
      _logger.printTrace('Forwarded host port $actualHostPort to device port $actualDevicePort for $serviceName');
      hostUri = deviceUri.replace(port: actualHostPort);
    }

    if (InternetAddress(hostUri.host).isLoopback && ipv6) {
      hostUri = hostUri.replace(host: InternetAddress.loopbackIPv6.host);
    }
    return hostUri;
  }
}

/// Provides a broadcast stream controller that buffers the events
/// if there isn't a listener attached.
/// The events are then delivered when a listener is attached to the stream.
class _BufferedStreamController<T> {
  _BufferedStreamController() : _events = <dynamic>[];

  /// The stream that this controller is controlling.
  Stream<T> get stream {
    return _streamController.stream;
  }

  late final StreamController<T> _streamController = () {
    final StreamController<T> streamControllerInstance = StreamController<T>.broadcast();
      streamControllerInstance.onListen = () {
      for (final dynamic event in _events) {
        assert(T is! List);
        if (event is T) {
          streamControllerInstance.add(event);
        } else {
          streamControllerInstance.addError(
            (event as Iterable<dynamic>).first as Object,
            event.last as StackTrace,
          );
        }
      }
      _events.clear();
    };
    return streamControllerInstance;
  }();

  final List<dynamic> _events;

  /// Sends [event] if there is a listener attached to the broadcast stream.
  /// Otherwise, it enqueues [event] until a listener is attached.
  void add(T event) {
    if (_streamController.hasListener) {
      _streamController.add(event);
    } else {
      _events.add(event);
    }
  }

  /// Sends or enqueues an error event.
  void addError(Object error, [StackTrace? stackTrace]) {
    if (_streamController.hasListener) {
      _streamController.addError(error, stackTrace);
    } else {
      _events.add(<dynamic>[error, stackTrace]);
    }
  }

  /// Closes the stream.
  Future<void> close() {
    return _streamController.close();
  }
}

/// This transformer will produce an event at most once every [waitDuration].
///
/// For example, consider a `waitDuration` of `10ms`, and list of event names
/// and arrival times: `a (0ms), b (5ms), c (11ms), d (21ms)`.
/// The events `a`, `c`, and `d` will be produced as a result.
StreamTransformer<S, S> _throttle<S>({
  required Duration waitDuration,
}) {
  assert(waitDuration != null);

  S latestLine;
  int? lastExecution;
  Future<void>? throttleFuture;
  bool done = false;

  return StreamTransformer<S, S>
    .fromHandlers(
      handleData: (S value, EventSink<S> sink) {
        latestLine = value;

        final bool isFirstMessage = lastExecution == null;
        final int currentTime = DateTime.now().millisecondsSinceEpoch;
        lastExecution ??= currentTime;
        final int remainingTime = currentTime - lastExecution!;

        // Always send the first event immediately.
        final int nextExecutionTime = isFirstMessage || remainingTime > waitDuration.inMilliseconds
          ? 0
          : waitDuration.inMilliseconds - remainingTime;
        throttleFuture ??= Future<void>
          .delayed(Duration(milliseconds: nextExecutionTime))
          .whenComplete(() {
            if (done) {
              return;
            }
            sink.add(latestLine);
            throttleFuture = null;
            lastExecution = DateTime.now().millisecondsSinceEpoch;
          });
      },
      handleDone: (EventSink<S> sink) {
        done = true;
        sink.close();
      }
    );
}