Unverified Commit c759c22e authored by Lau Ching Jun's avatar Lau Ching Jun Committed by GitHub

Fixed race condition in PollingDeviceDiscovery. (#145506)

There are two issues in the previous implementation:
1. `_populateDevices` will return the devices from `deviceNotifier` if it had been initialized, assuming that once it's initialized, it has been properly populated. That assumption is not true because calling getters like `onAdded` would initialize `deviceNotifier` without populating it.
2. `deviceNotifier` instance might be replaced in some cases, causing `onAdded` subscribers to lose any future updates.

To fix (1), this commit added the `isPopulated` field in `deviceNotifier` as a more accurate flag to determine if we need to populate it.

To fix (2), this commit made `deviceNotifier` a final member in `PolingDeviceDiscovery`.
parent d69833ce
...@@ -97,9 +97,9 @@ String getSizeAsMB(int bytesLength) { ...@@ -97,9 +97,9 @@ String getSizeAsMB(int bytesLength) {
/// removed, and calculate a diff of changes when a new list of items is /// removed, and calculate a diff of changes when a new list of items is
/// available. /// available.
class ItemListNotifier<T> { class ItemListNotifier<T> {
ItemListNotifier(): _items = <T>{}; ItemListNotifier(): _items = <T>{}, _isPopulated = false;
ItemListNotifier.from(List<T> items) : _items = Set<T>.of(items); ItemListNotifier.from(List<T> items) : _items = Set<T>.of(items), _isPopulated = true;
Set<T> _items; Set<T> _items;
...@@ -111,6 +111,11 @@ class ItemListNotifier<T> { ...@@ -111,6 +111,11 @@ class ItemListNotifier<T> {
List<T> get items => _items.toList(); List<T> get items => _items.toList();
bool _isPopulated;
/// Returns whether the list has been populated.
bool get isPopulated => _isPopulated;
void updateWithNewList(List<T> updatedList) { void updateWithNewList(List<T> updatedList) {
final Set<T> updatedSet = Set<T>.of(updatedList); final Set<T> updatedSet = Set<T>.of(updatedList);
...@@ -118,9 +123,10 @@ class ItemListNotifier<T> { ...@@ -118,9 +123,10 @@ class ItemListNotifier<T> {
final Set<T> removedItems = _items.difference(updatedSet); final Set<T> removedItems = _items.difference(updatedSet);
_items = updatedSet; _items = updatedSet;
_isPopulated = true;
addedItems.forEach(_addedController.add);
removedItems.forEach(_removedController.add); removedItems.forEach(_removedController.add);
addedItems.forEach(_addedController.add);
} }
void removeItem(T item) { void removeItem(T item) {
......
...@@ -480,18 +480,15 @@ abstract class PollingDeviceDiscovery extends DeviceDiscovery { ...@@ -480,18 +480,15 @@ abstract class PollingDeviceDiscovery extends DeviceDiscovery {
@protected @protected
@visibleForTesting @visibleForTesting
ItemListNotifier<Device>? deviceNotifier; final ItemListNotifier<Device> deviceNotifier = ItemListNotifier<Device>();
Timer? _timer; Timer? _timer;
Future<List<Device>> pollingGetDevices({Duration? timeout}); Future<List<Device>> pollingGetDevices({Duration? timeout});
void startPolling() { void startPolling() {
if (_timer == null) {
deviceNotifier ??= ItemListNotifier<Device>();
// Make initial population the default, fast polling timeout. // Make initial population the default, fast polling timeout.
_timer = _initTimer(null, initialCall: true); _timer ??= _initTimer(null, initialCall: true);
}
} }
Timer _initTimer(Duration? pollingTimeout, {bool initialCall = false}) { Timer _initTimer(Duration? pollingTimeout, {bool initialCall = false}) {
...@@ -499,7 +496,7 @@ abstract class PollingDeviceDiscovery extends DeviceDiscovery { ...@@ -499,7 +496,7 @@ abstract class PollingDeviceDiscovery extends DeviceDiscovery {
return Timer(initialCall ? Duration.zero : _pollingInterval, () async { return Timer(initialCall ? Duration.zero : _pollingInterval, () async {
try { try {
final List<Device> devices = await pollingGetDevices(timeout: pollingTimeout); final List<Device> devices = await pollingGetDevices(timeout: pollingTimeout);
deviceNotifier!.updateWithNewList(devices); deviceNotifier.updateWithNewList(devices);
} on TimeoutException { } on TimeoutException {
// Do nothing on a timeout. // Do nothing on a timeout.
} }
...@@ -546,32 +543,28 @@ abstract class PollingDeviceDiscovery extends DeviceDiscovery { ...@@ -546,32 +543,28 @@ abstract class PollingDeviceDiscovery extends DeviceDiscovery {
DeviceDiscoveryFilter? filter, DeviceDiscoveryFilter? filter,
bool resetCache = false, bool resetCache = false,
}) async { }) async {
if (deviceNotifier == null || resetCache) { if (!deviceNotifier.isPopulated || resetCache) {
final List<Device> devices = await pollingGetDevices(timeout: timeout); final List<Device> devices = await pollingGetDevices(timeout: timeout);
// If the cache was populated while the polling was ongoing, do not // If the cache was populated while the polling was ongoing, do not
// overwrite the cache unless it's explicitly refreshing the cache. // overwrite the cache unless it's explicitly refreshing the cache.
if (resetCache) { if (!deviceNotifier.isPopulated || resetCache) {
deviceNotifier = ItemListNotifier<Device>.from(devices); deviceNotifier.updateWithNewList(devices);
} else {
deviceNotifier ??= ItemListNotifier<Device>.from(devices);
} }
} }
// If a filter is provided, filter cache to only return devices matching. // If a filter is provided, filter cache to only return devices matching.
if (filter != null) { if (filter != null) {
return filter.filterDevices(deviceNotifier!.items); return filter.filterDevices(deviceNotifier.items);
} }
return deviceNotifier!.items; return deviceNotifier.items;
} }
Stream<Device> get onAdded { Stream<Device> get onAdded {
deviceNotifier ??= ItemListNotifier<Device>(); return deviceNotifier.onAdded;
return deviceNotifier!.onAdded;
} }
Stream<Device> get onRemoved { Stream<Device> get onRemoved {
deviceNotifier ??= ItemListNotifier<Device>(); return deviceNotifier.onRemoved;
return deviceNotifier!.onRemoved;
} }
void dispose() => stopPolling(); void dispose() => stopPolling();
......
...@@ -102,8 +102,6 @@ class IOSDevices extends PollingDeviceDiscovery { ...@@ -102,8 +102,6 @@ class IOSDevices extends PollingDeviceDiscovery {
return; return;
} }
deviceNotifier ??= ItemListNotifier<Device>();
// Start by populating all currently attached devices. // Start by populating all currently attached devices.
_updateCachedDevices(await pollingGetDevices()); _updateCachedDevices(await pollingGetDevices());
_updateNotifierFromCache(); _updateNotifierFromCache();
...@@ -127,10 +125,8 @@ class IOSDevices extends PollingDeviceDiscovery { ...@@ -127,10 +125,8 @@ class IOSDevices extends PollingDeviceDiscovery {
@visibleForTesting @visibleForTesting
Future<void> onDeviceEvent(XCDeviceEventNotification event) async { Future<void> onDeviceEvent(XCDeviceEventNotification event) async {
final ItemListNotifier<Device>? notifier = deviceNotifier; final ItemListNotifier<Device> notifier = deviceNotifier;
if (notifier == null) {
return;
}
Device? knownDevice; Device? knownDevice;
for (final Device device in notifier.items) { for (final Device device in notifier.items) {
if (device.id == event.deviceIdentifier) { if (device.id == event.deviceIdentifier) {
...@@ -186,10 +182,8 @@ class IOSDevices extends PollingDeviceDiscovery { ...@@ -186,10 +182,8 @@ class IOSDevices extends PollingDeviceDiscovery {
/// Updates notifier with devices found in the cache that are determined /// Updates notifier with devices found in the cache that are determined
/// to be connected. /// to be connected.
void _updateNotifierFromCache() { void _updateNotifierFromCache() {
final ItemListNotifier<Device>? notifier = deviceNotifier; final ItemListNotifier<Device> notifier = deviceNotifier;
if (notifier == null) {
return;
}
// Device is connected if it has either an observed usb or wifi connection // Device is connected if it has either an observed usb or wifi connection
// or it has not been observed but was found as connected in the cache. // or it has not been observed but was found as connected in the cache.
final List<Device> connectedDevices = _cachedPolledDevices.values.where((Device device) { final List<Device> connectedDevices = _cachedPolledDevices.values.where((Device device) {
......
...@@ -36,5 +36,27 @@ void main() { ...@@ -36,5 +36,27 @@ void main() {
expect(removedItems.first, 'aaa'); expect(removedItems.first, 'aaa');
expect(removedItems[1], 'bbb'); expect(removedItems[1], 'bbb');
}); });
test('becomes populated when item is added', () async {
final ItemListNotifier<String> list = ItemListNotifier<String>();
expect(list.isPopulated, false);
expect(list.items, isEmpty);
// Becomes populated when a new list is added.
list.updateWithNewList(<String>['a']);
expect(list.isPopulated, true);
expect(list.items, <String>['a']);
// Remain populated even when the last item is removed.
list.removeItem('a');
expect(list.isPopulated, true);
expect(list.items, isEmpty);
});
test('is populated by default if initialized with list of items', () async {
final ItemListNotifier<String> list = ItemListNotifier<String>.from(<String>['a']);
expect(list.isPopulated, true);
expect(list.items, <String>['a']);
});
}); });
} }
...@@ -1089,6 +1089,47 @@ void main() { ...@@ -1089,6 +1089,47 @@ void main() {
); );
}); });
}); });
group('PollingDeviceDiscovery', () {
final FakeDevice device1 = FakeDevice('Nexus 5', '0553790d0a4e726f');
testWithoutContext('initial call to devices returns the correct list', () async {
final List<Device> deviceList = <Device>[device1];
final TestPollingDeviceDiscovery testDeviceDiscovery = TestPollingDeviceDiscovery(deviceList);
// Call `onAdded` to make sure that calling `onAdded` does not affect the
// result of `devices()`.
final List<Device> addedDevice = <Device>[];
final List<Device> removedDevice = <Device>[];
testDeviceDiscovery.onAdded.listen(addedDevice.add);
testDeviceDiscovery.onRemoved.listen(removedDevice.add);
final List<Device> devices = await testDeviceDiscovery.devices();
expect(devices.length, 1);
expect(devices.first.id, device1.id);
});
testWithoutContext('call to devices triggers onAdded', () async {
final List<Device> deviceList = <Device>[device1];
final TestPollingDeviceDiscovery testDeviceDiscovery = TestPollingDeviceDiscovery(deviceList);
// Call `onAdded` to make sure that calling `onAdded` does not affect the
// result of `devices()`.
final List<Device> addedDevice = <Device>[];
final List<Device> removedDevice = <Device>[];
testDeviceDiscovery.onAdded.listen(addedDevice.add);
testDeviceDiscovery.onRemoved.listen(removedDevice.add);
final List<Device> devices = await testDeviceDiscovery.devices();
expect(devices.length, 1);
expect(devices.first.id, device1.id);
await pumpEventQueue();
expect(addedDevice.length, 1);
expect(addedDevice.first.id, device1.id);
});
});
} }
class TestDeviceManager extends DeviceManager { class TestDeviceManager extends DeviceManager {
...@@ -1203,3 +1244,24 @@ class ThrowingPollingDeviceDiscovery extends PollingDeviceDiscovery { ...@@ -1203,3 +1244,24 @@ class ThrowingPollingDeviceDiscovery extends PollingDeviceDiscovery {
@override @override
List<String> get wellKnownIds => <String>[]; List<String> get wellKnownIds => <String>[];
} }
class TestPollingDeviceDiscovery extends PollingDeviceDiscovery {
TestPollingDeviceDiscovery(this._devices) : super('test');
final List<Device> _devices;
@override
Future<List<Device>> pollingGetDevices({ Duration? timeout }) async {
return _devices;
}
@override
bool get supportsPlatform => true;
@override
bool get canListAnything => true;
@override
List<String> get wellKnownIds => <String>[];
}
...@@ -659,7 +659,7 @@ void main() { ...@@ -659,7 +659,7 @@ void main() {
await iosDevices.startPolling(); await iosDevices.startPolling();
expect(xcdevice.getAvailableIOSDevicesCount, 1); expect(xcdevice.getAvailableIOSDevicesCount, 1);
expect(iosDevices.deviceNotifier!.items, isEmpty); expect(iosDevices.deviceNotifier.items, isEmpty);
expect(xcdevice.deviceEventController.hasListener, isTrue); expect(xcdevice.deviceEventController.hasListener, isTrue);
xcdevice.deviceEventController.add( xcdevice.deviceEventController.add(
...@@ -670,9 +670,9 @@ void main() { ...@@ -670,9 +670,9 @@ void main() {
), ),
); );
await added.future; await added.future;
expect(iosDevices.deviceNotifier!.items.length, 2); expect(iosDevices.deviceNotifier.items.length, 2);
expect(iosDevices.deviceNotifier!.items, contains(device1)); expect(iosDevices.deviceNotifier.items, contains(device1));
expect(iosDevices.deviceNotifier!.items, contains(device2)); expect(iosDevices.deviceNotifier.items, contains(device2));
expect(iosDevices.eventsReceived, 1); expect(iosDevices.eventsReceived, 1);
iosDevices.resetEventCompleter(); iosDevices.resetEventCompleter();
...@@ -684,9 +684,9 @@ void main() { ...@@ -684,9 +684,9 @@ void main() {
), ),
); );
await iosDevices.receivedEvent.future; await iosDevices.receivedEvent.future;
expect(iosDevices.deviceNotifier!.items.length, 2); expect(iosDevices.deviceNotifier.items.length, 2);
expect(iosDevices.deviceNotifier!.items, contains(device1)); expect(iosDevices.deviceNotifier.items, contains(device1));
expect(iosDevices.deviceNotifier!.items, contains(device2)); expect(iosDevices.deviceNotifier.items, contains(device2));
expect(iosDevices.eventsReceived, 2); expect(iosDevices.eventsReceived, 2);
iosDevices.resetEventCompleter(); iosDevices.resetEventCompleter();
...@@ -698,9 +698,9 @@ void main() { ...@@ -698,9 +698,9 @@ void main() {
), ),
); );
await iosDevices.receivedEvent.future; await iosDevices.receivedEvent.future;
expect(iosDevices.deviceNotifier!.items.length, 2); expect(iosDevices.deviceNotifier.items.length, 2);
expect(iosDevices.deviceNotifier!.items, contains(device1)); expect(iosDevices.deviceNotifier.items, contains(device1));
expect(iosDevices.deviceNotifier!.items, contains(device2)); expect(iosDevices.deviceNotifier.items, contains(device2));
expect(iosDevices.eventsReceived, 3); expect(iosDevices.eventsReceived, 3);
xcdevice.deviceEventController.add( xcdevice.deviceEventController.add(
...@@ -711,7 +711,7 @@ void main() { ...@@ -711,7 +711,7 @@ void main() {
), ),
); );
await removed.future; await removed.future;
expect(iosDevices.deviceNotifier!.items, <Device>[device2]); expect(iosDevices.deviceNotifier.items, <Device>[device2]);
expect(iosDevices.eventsReceived, 4); expect(iosDevices.eventsReceived, 4);
iosDevices.resetEventCompleter(); iosDevices.resetEventCompleter();
...@@ -777,7 +777,7 @@ void main() { ...@@ -777,7 +777,7 @@ void main() {
xcdevice.devices.add(<IOSDevice>[]); xcdevice.devices.add(<IOSDevice>[]);
await iosDevices.startPolling(); await iosDevices.startPolling();
expect(iosDevices.deviceNotifier!.items, isEmpty); expect(iosDevices.deviceNotifier.items, isEmpty);
expect(xcdevice.deviceEventController.hasListener, isTrue); expect(xcdevice.deviceEventController.hasListener, isTrue);
iosDevices.dispose(); iosDevices.dispose();
......
...@@ -539,11 +539,11 @@ void main() { ...@@ -539,11 +539,11 @@ void main() {
proxiedDevices.startPolling(); proxiedDevices.startPolling();
final ItemListNotifier<Device>? deviceNotifier = proxiedDevices.deviceNotifier; final ItemListNotifier<Device> deviceNotifier = proxiedDevices.deviceNotifier;
expect(deviceNotifier, isNotNull); expect(deviceNotifier, isNotNull);
final List<Device> devicesAdded = <Device>[]; final List<Device> devicesAdded = <Device>[];
deviceNotifier!.onAdded.listen((Device device) { deviceNotifier.onAdded.listen((Device device) {
devicesAdded.add(device); devicesAdded.add(device);
}); });
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment