1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// @dart = 2.8
import 'dart:async';
import 'package:meta/meta.dart';
import 'base/io.dart';
import 'base/logger.dart';
import 'device.dart';
import 'device_port_forwarder.dart';
/// Discovers a specific service protocol on a device, and forwards the service
/// protocol device port to the host.
class ProtocolDiscovery {
ProtocolDiscovery._(
this.logReader,
this.serviceName, {
this.portForwarder,
this.throttleDuration,
this.hostPort,
this.devicePort,
this.ipv6,
Logger logger,
}) : _logger = logger,
assert(logReader != null) {
_deviceLogSubscription = logReader.logLines.listen(
_handleLine,
onDone: _stopScrapingLogs,
);
_uriStreamController = _BufferedStreamController<Uri>();
}
factory ProtocolDiscovery.observatory(
DeviceLogReader logReader, {
DevicePortForwarder portForwarder,
Duration throttleDuration,
@required int hostPort,
@required int devicePort,
@required bool ipv6,
@required Logger logger,
}) {
const String kObservatoryService = 'Observatory';
return ProtocolDiscovery._(
logReader,
kObservatoryService,
portForwarder: portForwarder,
throttleDuration: throttleDuration ?? const Duration(milliseconds: 200),
hostPort: hostPort,
devicePort: devicePort,
ipv6: ipv6,
logger: logger,
);
}
final DeviceLogReader logReader;
final String serviceName;
final DevicePortForwarder portForwarder;
final int hostPort;
final int devicePort;
final bool ipv6;
final Logger _logger;
/// The time to wait before forwarding a new observatory URIs from [logReader].
final Duration throttleDuration;
StreamSubscription<String> _deviceLogSubscription;
_BufferedStreamController<Uri> _uriStreamController;
/// The discovered service URL.
///
/// Returns null if the log reader shuts down before any uri is found.
///
/// Use [uris] instead.
// TODO(egarciad): replace `uri` for `uris`.
Future<Uri> get uri async {
try {
return await uris.first;
} on StateError {
return null;
}
}
/// The discovered service URLs.
///
/// When a new observatory URL: is available in [logReader],
/// the URLs are forwarded at most once every [throttleDuration].
/// Returns when no event has been observed for [throttleTimeout].
///
/// Port forwarding is only attempted when this is invoked,
/// for each observatory URL in the stream.
Stream<Uri> get uris {
final Stream<Uri> uriStream = _uriStreamController.stream
.transform(_throttle<Uri>(
waitDuration: throttleDuration,
));
return uriStream.asyncMap<Uri>(_forwardPort);
}
Future<void> cancel() => _stopScrapingLogs();
Future<void> _stopScrapingLogs() async {
await _uriStreamController?.close();
await _deviceLogSubscription?.cancel();
_deviceLogSubscription = null;
}
Match _getPatternMatch(String line) {
final RegExp r = RegExp(RegExp.escape(serviceName) + r' listening on ((http|//)[a-zA-Z0-9:/=_\-\.\[\]]+)');
return r.firstMatch(line);
}
Uri _getObservatoryUri(String line) {
final Match match = _getPatternMatch(line);
if (match != null) {
return Uri.parse(match[1]);
}
return null;
}
void _handleLine(String line) {
Uri uri;
try {
uri = _getObservatoryUri(line);
} on FormatException catch (error, stackTrace) {
_uriStreamController.addError(error, stackTrace);
}
if (uri == null || uri.host.isEmpty) {
return;
}
if (devicePort != null && uri.port != devicePort) {
_logger.printTrace('skipping potential observatory $uri due to device port mismatch');
return;
}
_uriStreamController.add(uri);
}
Future<Uri> _forwardPort(Uri deviceUri) async {
_logger.printTrace('$serviceName URL on device: $deviceUri');
Uri hostUri = deviceUri;
if (portForwarder != null) {
final int actualDevicePort = deviceUri.port;
final int actualHostPort = await portForwarder.forward(actualDevicePort, hostPort: hostPort);
_logger.printTrace('Forwarded host port $actualHostPort to device port $actualDevicePort for $serviceName');
hostUri = deviceUri.replace(port: actualHostPort);
}
if (InternetAddress(hostUri.host).isLoopback && ipv6) {
hostUri = hostUri.replace(host: InternetAddress.loopbackIPv6.host);
}
return hostUri;
}
}
/// Provides a broadcast stream controller that buffers the events
/// if there isn't a listener attached.
/// The events are then delivered when a listener is attached to the stream.
class _BufferedStreamController<T> {
_BufferedStreamController() : _events = <dynamic>[];
/// The stream that this controller is controlling.
Stream<T> get stream {
return _streamController.stream;
}
StreamController<T> _streamControllerInstance;
StreamController<T> get _streamController {
_streamControllerInstance ??= StreamController<T>.broadcast(onListen: () {
for (final dynamic event in _events) {
assert(T is! List);
if (event is T) {
_streamControllerInstance.add(event);
} else {
_streamControllerInstance.addError(
(event as Iterable<dynamic>).first as Object,
(event as Iterable<dynamic>).last as StackTrace,
);
}
}
_events.clear();
});
return _streamControllerInstance;
}
final List<dynamic> _events;
/// Sends [event] if there is a listener attached to the broadcast stream.
/// Otherwise, it enqueues [event] until a listener is attached.
void add(T event) {
if (_streamController.hasListener) {
_streamController.add(event);
} else {
_events.add(event);
}
}
/// Sends or enqueues an error event.
void addError(Object error, [StackTrace stackTrace]) {
if (_streamController.hasListener) {
_streamController.addError(error, stackTrace);
} else {
_events.add(<dynamic>[error, stackTrace]);
}
}
/// Closes the stream.
Future<void> close() {
return _streamController.close();
}
}
/// This transformer will produce an event at most once every [waitDuration].
///
/// For example, consider a `waitDuration` of `10ms`, and list of event names
/// and arrival times: `a (0ms), b (5ms), c (11ms), d (21ms)`.
/// The events `a`, `c`, and `d` will be produced as a result.
StreamTransformer<S, S> _throttle<S>({
@required Duration waitDuration,
}) {
assert(waitDuration != null);
S latestLine;
int lastExecution;
Future<void> throttleFuture;
bool done = false;
return StreamTransformer<S, S>
.fromHandlers(
handleData: (S value, EventSink<S> sink) {
latestLine = value;
final bool isFirstMessage = lastExecution == null;
final int currentTime = DateTime.now().millisecondsSinceEpoch;
lastExecution ??= currentTime;
final int remainingTime = currentTime - lastExecution;
// Always send the first event immediately.
final int nextExecutionTime = isFirstMessage || remainingTime > waitDuration.inMilliseconds
? 0
: waitDuration.inMilliseconds - remainingTime;
throttleFuture ??= Future<void>
.delayed(Duration(milliseconds: nextExecutionTime))
.whenComplete(() {
if (done) {
return;
}
sink.add(latestLine);
throttleFuture = null;
lastExecution = DateTime.now().millisecondsSinceEpoch;
});
},
handleDone: (EventSink<S> sink) {
done = true;
sink.close();
}
);
}