protocol_discovery.dart 7.58 KB
Newer Older
Ian Hickson's avatar
Ian Hickson committed
1
// Copyright 2014 The Flutter Authors. All rights reserved.
2 3 4 5 6
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

import 'dart:async';

7
import 'base/io.dart';
8
import 'base/logger.dart';
9
import 'device.dart';
10
import 'device_port_forwarder.dart';
11
import 'globals.dart' as globals;
12

13
/// Discovers a specific service protocol on a device, and forwards the service
14
/// protocol device port to the host.
15
class ProtocolDiscovery {
16
  ProtocolDiscovery._(
17 18
    this.logReader,
    this.serviceName, {
19
    this.portForwarder,
20
    required this.throttleDuration,
21
    this.hostPort,
22
    this.devicePort,
23 24
    required this.ipv6,
    required Logger logger,
25 26
  }) : _logger = logger,
       assert(logReader != null) {
27 28 29 30
    _deviceLogSubscription = logReader.logLines.listen(
      _handleLine,
      onDone: _stopScrapingLogs,
    );
31 32 33 34
  }

  factory ProtocolDiscovery.observatory(
    DeviceLogReader logReader, {
35 36 37 38 39 40
    DevicePortForwarder? portForwarder,
    Duration? throttleDuration,
    int? hostPort,
    int? devicePort,
    required bool ipv6,
    required Logger logger,
41 42
  }) {
    const String kObservatoryService = 'Observatory';
43
    return ProtocolDiscovery._(
44 45
      logReader,
      kObservatoryService,
46
      portForwarder: portForwarder,
47
      throttleDuration: throttleDuration ?? const Duration(milliseconds: 200),
48
      hostPort: hostPort,
49
      devicePort: devicePort,
50
      ipv6: ipv6,
51
      logger: logger,
52
    );
53 54
  }

55 56
  final DeviceLogReader logReader;
  final String serviceName;
57 58 59
  final DevicePortForwarder? portForwarder;
  final int? hostPort;
  final int? devicePort;
60
  final bool ipv6;
61
  final Logger _logger;
62

63 64
  /// The time to wait before forwarding a new observatory URIs from [logReader].
  final Duration throttleDuration;
Devon Carew's avatar
Devon Carew committed
65

66 67
  StreamSubscription<String>? _deviceLogSubscription;
  final _BufferedStreamController<Uri> _uriStreamController = _BufferedStreamController<Uri>();
68

69
  /// The discovered service URL.
70 71 72
  ///
  /// Returns null if the log reader shuts down before any uri is found.
  ///
73 74
  /// Use [uris] instead.
  // TODO(egarciad): replace `uri` for `uris`.
75
  Future<Uri?> get uri async {
76 77 78 79 80
    try {
      return await uris.first;
    } on StateError {
      return null;
    }
81 82
  }

83
  /// The discovered service URLs.
84
  ///
85 86
  /// When a new observatory URL: is available in [logReader],
  /// the URLs are forwarded at most once every [throttleDuration].
87
  /// Returns when no event has been observed for [throttleTimeout].
88 89
  ///
  /// Port forwarding is only attempted when this is invoked,
90
  /// for each observatory URL in the stream.
91
  Stream<Uri> get uris {
92
    final Stream<Uri> uriStream = _uriStreamController.stream
93 94
      .transform(_throttle<Uri>(
        waitDuration: throttleDuration,
95 96
      ));
    return uriStream.asyncMap<Uri>(_forwardPort);
97
  }
98

99
  Future<void> cancel() => _stopScrapingLogs();
100

101
  Future<void> _stopScrapingLogs() async {
102
    await _uriStreamController.close();
103 104
    await _deviceLogSubscription?.cancel();
    _deviceLogSubscription = null;
Devon Carew's avatar
Devon Carew committed
105 106
  }

107
  Match? _getPatternMatch(String line) {
108
    return globals.kVMServiceMessageRegExp.firstMatch(line);
109
  }
110

111 112
  Uri? _getObservatoryUri(String line) {
    final Match? match = _getPatternMatch(line);
113
    if (match != null) {
114
      return Uri.parse(match[1]!);
115 116 117 118 119
    }
    return null;
  }

  void _handleLine(String line) {
120
    Uri? uri;
121 122
    try {
      uri = _getObservatoryUri(line);
123
    } on FormatException catch (error, stackTrace) {
124
      _uriStreamController.addError(error, stackTrace);
125
    }
126
    if (uri == null || uri.host.isEmpty) {
127
      return;
128
    }
129
    if (devicePort != null && uri.port != devicePort) {
130
      _logger.printTrace('skipping potential observatory $uri due to device port mismatch');
131 132
      return;
    }
133
    _uriStreamController.add(uri);
134 135
  }

136
  Future<Uri> _forwardPort(Uri deviceUri) async {
137
    _logger.printTrace('$serviceName URL on device: $deviceUri');
138 139
    Uri hostUri = deviceUri;

140 141
    final DevicePortForwarder? forwarder = portForwarder;
    if (forwarder != null) {
142
      final int actualDevicePort = deviceUri.port;
143
      final int actualHostPort = await forwarder.forward(actualDevicePort, hostPort: hostPort);
144
      _logger.printTrace('Forwarded host port $actualHostPort to device port $actualDevicePort for $serviceName');
145
      hostUri = deviceUri.replace(port: actualHostPort);
146
    }
Devon Carew's avatar
Devon Carew committed
147

148
    if (InternetAddress(hostUri.host).isLoopback && ipv6) {
149
      hostUri = hostUri.replace(host: InternetAddress.loopbackIPv6.host);
150
    }
151
    return hostUri;
152 153
  }
}
154 155 156 157 158 159 160 161 162 163 164 165

/// Provides a broadcast stream controller that buffers the events
/// if there isn't a listener attached.
/// The events are then delivered when a listener is attached to the stream.
class _BufferedStreamController<T> {
  _BufferedStreamController() : _events = <dynamic>[];

  /// The stream that this controller is controlling.
  Stream<T> get stream {
    return _streamController.stream;
  }

166 167 168
  late final StreamController<T> _streamController = () {
    final StreamController<T> streamControllerInstance = StreamController<T>.broadcast();
      streamControllerInstance.onListen = () {
169
      for (final dynamic event in _events) {
170
        assert(T is! List);
171
        if (event is T) {
172
          streamControllerInstance.add(event);
173
        } else {
174
          streamControllerInstance.addError(
175
            (event as Iterable<dynamic>).first as Object,
176
            event.last as StackTrace,
177 178 179 180
          );
        }
      }
      _events.clear();
181 182 183
    };
    return streamControllerInstance;
  }();
184 185 186 187 188 189 190 191 192 193 194 195 196 197

  final List<dynamic> _events;

  /// Sends [event] if there is a listener attached to the broadcast stream.
  /// Otherwise, it enqueues [event] until a listener is attached.
  void add(T event) {
    if (_streamController.hasListener) {
      _streamController.add(event);
    } else {
      _events.add(event);
    }
  }

  /// Sends or enqueues an error event.
198
  void addError(Object error, [StackTrace? stackTrace]) {
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
    if (_streamController.hasListener) {
      _streamController.addError(error, stackTrace);
    } else {
      _events.add(<dynamic>[error, stackTrace]);
    }
  }

  /// Closes the stream.
  Future<void> close() {
    return _streamController.close();
  }
}

/// This transformer will produce an event at most once every [waitDuration].
///
/// For example, consider a `waitDuration` of `10ms`, and list of event names
/// and arrival times: `a (0ms), b (5ms), c (11ms), d (21ms)`.
216
/// The events `a`, `c`, and `d` will be produced as a result.
217
StreamTransformer<S, S> _throttle<S>({
218
  required Duration waitDuration,
219 220 221 222
}) {
  assert(waitDuration != null);

  S latestLine;
223 224
  int? lastExecution;
  Future<void>? throttleFuture;
225
  bool done = false;
226 227 228 229 230 231

  return StreamTransformer<S, S>
    .fromHandlers(
      handleData: (S value, EventSink<S> sink) {
        latestLine = value;

232
        final bool isFirstMessage = lastExecution == null;
233 234
        final int currentTime = DateTime.now().millisecondsSinceEpoch;
        lastExecution ??= currentTime;
235
        final int remainingTime = currentTime - lastExecution!;
236 237 238

        // Always send the first event immediately.
        final int nextExecutionTime = isFirstMessage || remainingTime > waitDuration.inMilliseconds
239 240 241 242 243
          ? 0
          : waitDuration.inMilliseconds - remainingTime;
        throttleFuture ??= Future<void>
          .delayed(Duration(milliseconds: nextExecutionTime))
          .whenComplete(() {
244 245 246
            if (done) {
              return;
            }
247 248 249 250
            sink.add(latestLine);
            throttleFuture = null;
            lastExecution = DateTime.now().millisecondsSinceEpoch;
          });
251 252 253 254
      },
      handleDone: (EventSink<S> sink) {
        done = true;
        sink.close();
255 256 257
      }
    );
}