Commit 10decc7c authored by Todd Volkert's avatar Todd Volkert Committed by GitHub

Fix race condition in protocol_discovery.dart (#10092)

For some reaosn, when we discovered our URI, we were re-instantiating
the `Completer` instance variable whose future we listen to in `nextUri()`.
This led to a race between a caller calling `nextUri()` and us discovering
the URI. If we happened to discover our URI before a caller called
`nextUri()`, then they would be left waiting on a future from the newly
allocated `Completer` (which would never complete).

Fixes #10064
parent ce2c834f
...@@ -407,12 +407,12 @@ class AndroidDevice extends Device { ...@@ -407,12 +407,12 @@ class AndroidDevice extends Device {
if (debuggingOptions.buildMode == BuildMode.debug) { if (debuggingOptions.buildMode == BuildMode.debug) {
final List<Uri> deviceUris = await Future.wait( final List<Uri> deviceUris = await Future.wait(
<Future<Uri>>[observatoryDiscovery.nextUri(), diagnosticDiscovery.nextUri()] <Future<Uri>>[observatoryDiscovery.uri, diagnosticDiscovery.uri]
); );
observatoryUri = deviceUris[0]; observatoryUri = deviceUris[0];
diagnosticUri = deviceUris[1]; diagnosticUri = deviceUris[1];
} else if (debuggingOptions.buildMode == BuildMode.profile) { } else if (debuggingOptions.buildMode == BuildMode.profile) {
observatoryUri = await observatoryDiscovery.nextUri(); observatoryUri = await observatoryDiscovery.uri;
} }
return new LaunchResult.succeeded( return new LaunchResult.succeeded(
......
...@@ -281,10 +281,10 @@ class IOSDevice extends Device { ...@@ -281,10 +281,10 @@ class IOSDevice extends Device {
final ProtocolDiscovery diagnosticDiscovery = new ProtocolDiscovery.diagnosticService( final ProtocolDiscovery diagnosticDiscovery = new ProtocolDiscovery.diagnosticService(
getLogReader(app: app), portForwarder: portForwarder, hostPort: debuggingOptions.diagnosticPort); getLogReader(app: app), portForwarder: portForwarder, hostPort: debuggingOptions.diagnosticPort);
final Future<Uri> forwardObsUri = observatoryDiscovery.nextUri(); final Future<Uri> forwardObsUri = observatoryDiscovery.uri;
Future<Uri> forwardDiagUri; Future<Uri> forwardDiagUri;
if (debuggingOptions.buildMode == BuildMode.debug) { if (debuggingOptions.buildMode == BuildMode.debug) {
forwardDiagUri = diagnosticDiscovery.nextUri(); forwardDiagUri = diagnosticDiscovery.uri;
} else { } else {
forwardDiagUri = new Future<Uri>.value(null); forwardDiagUri = new Future<Uri>.value(null);
} }
......
...@@ -489,7 +489,7 @@ class IOSSimulator extends Device { ...@@ -489,7 +489,7 @@ class IOSSimulator extends Device {
printTrace('Waiting for observatory port to be available...'); printTrace('Waiting for observatory port to be available...');
try { try {
final Uri deviceUri = await observatoryDiscovery.nextUri(); final Uri deviceUri = await observatoryDiscovery.uri;
return new LaunchResult.succeeded(observatoryUri: deviceUri); return new LaunchResult.succeeded(observatoryUri: deviceUri);
} catch (error) { } catch (error) {
printError('Error waiting for a debug connection: $error'); printError('Error waiting for a debug connection: $error');
......
...@@ -9,72 +9,62 @@ import 'base/port_scanner.dart'; ...@@ -9,72 +9,62 @@ import 'base/port_scanner.dart';
import 'device.dart'; import 'device.dart';
import 'globals.dart'; import 'globals.dart';
/// Discover service protocol on a device /// Discovers a specific service protocol on a device, and forward the service
/// and forward the service protocol device port to the host. /// protocol device port to the host.
class ProtocolDiscovery { class ProtocolDiscovery {
/// [logReader] - a [DeviceLogReader] to look for service messages in. ProtocolDiscovery._(
ProtocolDiscovery(DeviceLogReader logReader, String serviceName, DeviceLogReader logReader,
{this.portForwarder, this.hostPort, this.defaultHostPort}) String serviceName, {
: _logReader = logReader, _serviceName = serviceName { this.portForwarder,
this.hostPort,
this.defaultHostPort,
}) : _logReader = logReader, _serviceName = serviceName {
assert(_logReader != null); assert(_logReader != null);
_subscription = _logReader.logLines.listen(_onLine);
assert(portForwarder == null || defaultHostPort != null); assert(portForwarder == null || defaultHostPort != null);
_deviceLogSubscription = _logReader.logLines.listen(_onLine);
} }
factory ProtocolDiscovery.observatory(DeviceLogReader logReader, factory ProtocolDiscovery.observatory(DeviceLogReader logReader,
{DevicePortForwarder portForwarder, int hostPort}) => {DevicePortForwarder portForwarder, int hostPort}) =>
new ProtocolDiscovery(logReader, kObservatoryService, new ProtocolDiscovery._(logReader, _kObservatoryService,
portForwarder: portForwarder, portForwarder: portForwarder,
hostPort: hostPort, hostPort: hostPort,
defaultHostPort: kDefaultObservatoryPort); defaultHostPort: kDefaultObservatoryPort);
factory ProtocolDiscovery.diagnosticService(DeviceLogReader logReader, factory ProtocolDiscovery.diagnosticService(DeviceLogReader logReader,
{DevicePortForwarder portForwarder, int hostPort}) => {DevicePortForwarder portForwarder, int hostPort}) =>
new ProtocolDiscovery(logReader, kDiagnosticService, new ProtocolDiscovery._(logReader, _kDiagnosticService,
portForwarder: portForwarder, portForwarder: portForwarder,
hostPort: hostPort, hostPort: hostPort,
defaultHostPort: kDefaultDiagnosticPort); defaultHostPort: kDefaultDiagnosticPort);
static const String kObservatoryService = 'Observatory'; static const String _kObservatoryService = 'Observatory';
static const String kDiagnosticService = 'Diagnostic server'; static const String _kDiagnosticService = 'Diagnostic server';
final DeviceLogReader _logReader; final DeviceLogReader _logReader;
final String _serviceName; final String _serviceName;
final DevicePortForwarder portForwarder; final DevicePortForwarder portForwarder;
int hostPort; final int hostPort;
final int defaultHostPort; final int defaultHostPort;
final Completer<Uri> _completer = new Completer<Uri>();
Completer<Uri> _completer = new Completer<Uri>(); StreamSubscription<String> _deviceLogSubscription;
StreamSubscription<String> _subscription;
/// The [Future] returned by this function will complete when the next service /// The discovered service URI.
/// Uri is found. Future<Uri> get uri {
Future<Uri> nextUri() async { return _completer.future
final Uri deviceUri = await _completer.future.timeout(
const Duration(seconds: 60), onTimeout: () {
throwToolExit('Timeout while attempting to retrieve Uri for $_serviceName');
}
);
printTrace('$_serviceName Uri on device: $deviceUri');
Uri hostUri;
if (portForwarder != null) {
final int devicePort = deviceUri.port;
hostPort ??= await portScanner.findPreferredPort(defaultHostPort);
hostPort = await portForwarder
.forward(devicePort, hostPort: hostPort)
.timeout(const Duration(seconds: 60), onTimeout: () { .timeout(const Duration(seconds: 60), onTimeout: () {
throwToolExit('Timeout while atempting to foward device port $devicePort for $_serviceName'); throwToolExit('Timeout while attempting to retrieve Uri for $_serviceName');
}).whenComplete(() {
_stopScrapingLogs();
}); });
printTrace('Forwarded host port $hostPort to device port $devicePort for $_serviceName');
hostUri = deviceUri.replace(port: hostPort);
} else {
hostUri = deviceUri;
}
return hostUri;
} }
void cancel() { Future<Null> cancel() => _stopScrapingLogs();
_subscription.cancel();
Future<Null> _stopScrapingLogs() async {
await _deviceLogSubscription?.cancel();
_deviceLogSubscription = null;
} }
void _onLine(String line) { void _onLine(String line) {
...@@ -84,19 +74,35 @@ class ProtocolDiscovery { ...@@ -84,19 +74,35 @@ class ProtocolDiscovery {
if (index >= 0) { if (index >= 0) {
try { try {
uri = Uri.parse(line.substring(index + prefix.length)); uri = Uri.parse(line.substring(index + prefix.length));
} catch (_) { } catch (error) {
// Ignore errors. _stopScrapingLogs();
_completer.completeError(error);
} }
} }
if (uri != null)
_located(uri);
}
void _located(Uri uri) { if (uri != null) {
assert(_completer != null);
assert(!_completer.isCompleted); assert(!_completer.isCompleted);
_stopScrapingLogs();
_completer.complete(_forwardPort(uri));
}
}
Future<Uri> _forwardPort(Uri deviceUri) async {
printTrace('$_serviceName Uri on device: $deviceUri');
Uri hostUri = deviceUri;
_completer.complete(uri); if (portForwarder != null) {
_completer = new Completer<Uri>(); final int devicePort = deviceUri.port;
int hostPort = this.hostPort ?? await portScanner.findPreferredPort(defaultHostPort);
hostPort = await portForwarder
.forward(devicePort, hostPort: hostPort)
.timeout(const Duration(seconds: 60), onTimeout: () {
throwToolExit('Timeout while atempting to foward device port $devicePort for $_serviceName');
});
printTrace('Forwarded host port $hostPort to device port $devicePort for $_serviceName');
hostUri = deviceUri.replace(port: hostPort);
}
return hostUri;
} }
} }
...@@ -13,74 +13,113 @@ import 'src/mocks.dart'; ...@@ -13,74 +13,113 @@ import 'src/mocks.dart';
void main() { void main() {
group('service_protocol discovery', () { group('service_protocol discovery', () {
testUsingContext('no port forwarding', () async { MockDeviceLogReader logReader;
final MockDeviceLogReader logReader = new MockDeviceLogReader(); ProtocolDiscovery discoverer;
final ProtocolDiscovery discoverer =
new ProtocolDiscovery(logReader, ProtocolDiscovery.kObservatoryService); group('no port forwarding', () {
/// Performs test set-up functionality that must be performed as part of
/// the `test()` pass and not part of the `setUp()` pass.
///
/// This exists to make sure we're not creating an error that tries to
/// cross an error-zone boundary. Our use of `testUsingContext()` runs the
/// test code inside an error zone, but the `setUp()` code is not run in
/// any zone. This creates the potential for errors that try to cross
/// error-zone boundaries, which are considered uncaught.
///
/// This also exists for cases where our initialization requires access to
/// a `Context` object, which is only set up inside the zone.
///
/// Note that these issues do not pertain to real code and are a test-only
/// concern, since in real code, the zone is set-up in `main()`.
///
/// See also: [runZoned]
void initialize() {
logReader = new MockDeviceLogReader();
discoverer = new ProtocolDiscovery.observatory(logReader);
}
// Get next port future. tearDown(() {
Future<Uri> nextUri = discoverer.nextUri(); discoverer.cancel();
expect(nextUri, isNotNull); logReader.dispose();
});
// Inject some lines. testUsingContext('returns non-null uri future', () async {
initialize();
expect(discoverer.uri, isNotNull);
});
testUsingContext('discovers uri if logs already produced output', () async {
initialize();
logReader.addLine('HELLO WORLD'); logReader.addLine('HELLO WORLD');
logReader.addLine('Observatory listening on http://127.0.0.1:9999'); logReader.addLine('Observatory listening on http://127.0.0.1:9999');
// Await the port. final Uri uri = await discoverer.uri;
Uri uri = await nextUri;
expect(uri.port, 9999); expect(uri.port, 9999);
expect('$uri', 'http://127.0.0.1:9999'); expect('$uri', 'http://127.0.0.1:9999');
});
// Get next port future. testUsingContext('discovers uri if logs not yet produced output', () async {
nextUri = discoverer.nextUri(); initialize();
final Future<Uri> uriFuture = discoverer.uri;
logReader.addLine('Observatory listening on http://127.0.0.1:3333'); logReader.addLine('Observatory listening on http://127.0.0.1:3333');
uri = await nextUri; final Uri uri = await uriFuture;
expect(uri.port, 3333); expect(uri.port, 3333);
expect('$uri', 'http://127.0.0.1:3333'); expect('$uri', 'http://127.0.0.1:3333');
});
// Get next port future. testUsingContext('uri throws if logs produce bad line', () async {
nextUri = discoverer.nextUri(); initialize();
// Inject a bad line. Timer.run(() {
logReader.addLine('Observatory listening on http://127.0.0.1:apple'); logReader.addLine('Observatory listening on http://127.0.0.1:apple');
});
expect(discoverer.uri, throwsA(isFormatException));
});
testUsingContext('uri waits for correct log line', () async {
initialize();
final Future<Uri> uriFuture = discoverer.uri;
logReader.addLine('Observatory not listening...');
final Uri timeoutUri = Uri.parse('http://timeout'); final Uri timeoutUri = Uri.parse('http://timeout');
final Uri actualUri = await nextUri.timeout( final Uri actualUri = await uriFuture.timeout(
const Duration(milliseconds: 100), onTimeout: () => timeoutUri); const Duration(milliseconds: 100), onTimeout: () => timeoutUri);
expect(actualUri, timeoutUri); expect(actualUri, timeoutUri);
});
// Get next port future. testUsingContext('discovers uri if log line contains Android prefix', () async {
nextUri = discoverer.nextUri(); initialize();
logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:52584'); logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:52584');
uri = await nextUri; final Uri uri = await discoverer.uri;
expect(uri.port, 52584); expect(uri.port, 52584);
expect('$uri', 'http://127.0.0.1:52584'); expect('$uri', 'http://127.0.0.1:52584');
});
// Get next port future. testUsingContext('discovers uri if log line contains auth key', () async {
nextUri = discoverer.nextUri(); initialize();
final Future<Uri> uriFuture = discoverer.uri;
logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:54804/PTwjm8Ii8qg=/'); logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:54804/PTwjm8Ii8qg=/');
uri = await nextUri; final Uri uri = await uriFuture;
expect(uri.port, 54804); expect(uri.port, 54804);
expect('$uri', 'http://127.0.0.1:54804/PTwjm8Ii8qg=/'); expect('$uri', 'http://127.0.0.1:54804/PTwjm8Ii8qg=/');
});
// Get next port future. testUsingContext('discovers uri if log line contains non-localhost', () async {
nextUri = discoverer.nextUri(); initialize();
final Future<Uri> uriFuture = discoverer.uri;
logReader.addLine('I/flutter : Observatory listening on http://somehost:54804/PTwjm8Ii8qg=/'); logReader.addLine('I/flutter : Observatory listening on http://somehost:54804/PTwjm8Ii8qg=/');
uri = await nextUri; final Uri uri = await uriFuture;
expect(uri.port, 54804); expect(uri.port, 54804);
expect('$uri', 'http://somehost:54804/PTwjm8Ii8qg=/'); expect('$uri', 'http://somehost:54804/PTwjm8Ii8qg=/');
});
discoverer.cancel();
logReader.dispose();
}); });
testUsingContext('port forwarding - default port', () async { testUsingContext('port forwarding - default port', () async {
final MockDeviceLogReader logReader = new MockDeviceLogReader(); final MockDeviceLogReader logReader = new MockDeviceLogReader();
final ProtocolDiscovery discoverer = new ProtocolDiscovery( final ProtocolDiscovery discoverer = new ProtocolDiscovery.observatory(
logReader, logReader,
ProtocolDiscovery.kObservatoryService,
portForwarder: new MockPortForwarder(99), portForwarder: new MockPortForwarder(99),
defaultHostPort: 54777); hostPort: 54777);
// Get next port future. // Get next port future.
final Future<Uri> nextUri = discoverer.nextUri(); final Future<Uri> nextUri = discoverer.uri;
logReader.addLine('I/flutter : Observatory listening on http://somehost:54804/PTwjm8Ii8qg=/'); logReader.addLine('I/flutter : Observatory listening on http://somehost:54804/PTwjm8Ii8qg=/');
final Uri uri = await nextUri; final Uri uri = await nextUri;
expect(uri.port, 54777); expect(uri.port, 54777);
...@@ -92,15 +131,13 @@ void main() { ...@@ -92,15 +131,13 @@ void main() {
testUsingContext('port forwarding - specified port', () async { testUsingContext('port forwarding - specified port', () async {
final MockDeviceLogReader logReader = new MockDeviceLogReader(); final MockDeviceLogReader logReader = new MockDeviceLogReader();
final ProtocolDiscovery discoverer = new ProtocolDiscovery( final ProtocolDiscovery discoverer = new ProtocolDiscovery.observatory(
logReader, logReader,
ProtocolDiscovery.kObservatoryService,
portForwarder: new MockPortForwarder(99), portForwarder: new MockPortForwarder(99),
hostPort: 1243, hostPort: 1243);
defaultHostPort: 192);
// Get next port future. // Get next port future.
final Future<Uri> nextUri = discoverer.nextUri(); final Future<Uri> nextUri = discoverer.uri;
logReader.addLine('I/flutter : Observatory listening on http://somehost:54804/PTwjm8Ii8qg=/'); logReader.addLine('I/flutter : Observatory listening on http://somehost:54804/PTwjm8Ii8qg=/');
final Uri uri = await nextUri; final Uri uri = await nextUri;
expect(uri.port, 1243); expect(uri.port, 1243);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment