Unverified Commit 01af8e59 authored by Greg Spencer's avatar Greg Spencer Committed by GitHub

Make `flutter update-packages` run in parallel (#91006)

This modifies the flutter update-packages and flutter update-packages --force-upgrade commands so that the many invocations of "dart pub get" in each repo project run in parallel instead of in series.
parent fc02dcbb
// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
import 'dart:async';
import 'dart:collection';
import '../globals_null_migrated.dart' as globals;
/// A closure type used by the [TaskQueue].
typedef TaskQueueClosure<T> = Future<T> Function();
/// A task queue of Futures to be completed in parallel, throttling
/// the number of simultaneous tasks.
///
/// The tasks return results of type T.
class TaskQueue<T> {
/// Creates a task queue with a maximum number of simultaneous jobs.
/// The [maxJobs] parameter defaults to the number of CPU cores on the
/// system.
TaskQueue({int? maxJobs})
: maxJobs = maxJobs ?? globals.platform.numberOfProcessors;
/// The maximum number of jobs that this queue will run simultaneously.
final int maxJobs;
final Queue<_TaskQueueItem<T>> _pendingTasks = Queue<_TaskQueueItem<T>>();
final Set<_TaskQueueItem<T>> _activeTasks = <_TaskQueueItem<T>>{};
final Set<Completer<void>> _completeListeners = <Completer<void>>{};
/// Returns a future that completes when all tasks in the [TaskQueue] are
/// complete.
Future<void> get tasksComplete {
// In case this is called when there are no tasks, we want it to
// signal complete immediately.
if (_activeTasks.isEmpty && _pendingTasks.isEmpty) {
return Future<void>.value();
}
final Completer<void> completer = Completer<void>();
_completeListeners.add(completer);
return completer.future;
}
/// Adds a single closure to the task queue, returning a future that
/// completes when the task completes.
Future<T> add(TaskQueueClosure<T> task) {
final Completer<T> completer = Completer<T>();
_pendingTasks.add(_TaskQueueItem<T>(task, completer));
if (_activeTasks.length < maxJobs) {
_processTask();
}
return completer.future;
}
// Process a single task.
void _processTask() {
if (_pendingTasks.isNotEmpty && _activeTasks.length <= maxJobs) {
final _TaskQueueItem<T> item = _pendingTasks.removeFirst();
_activeTasks.add(item);
item.onComplete = () {
_activeTasks.remove(item);
_processTask();
};
item.run();
} else {
_checkForCompletion();
}
}
void _checkForCompletion() {
if (_activeTasks.isEmpty && _pendingTasks.isEmpty) {
for (final Completer<void> completer in _completeListeners) {
if (!completer.isCompleted) {
completer.complete();
}
}
_completeListeners.clear();
}
}
}
class _TaskQueueItem<T> {
_TaskQueueItem(this._closure, this._completer, {this.onComplete});
final TaskQueueClosure<T> _closure;
final Completer<T> _completer;
void Function()? onComplete;
Future<void> run() async {
try {
_completer.complete(await _closure());
} catch (e) { // ignore: avoid_catches_without_on_clauses
_completer.completeError(e);
} finally {
onComplete?.call();
}
}
}
......@@ -104,6 +104,7 @@ abstract class Pub {
String flutterRootOverride,
bool checkUpToDate = false,
bool shouldSkipThirdPartyGenerator = true,
bool printProgress = true,
});
/// Runs pub in 'batch' mode.
......@@ -179,6 +180,7 @@ class _DefaultPub implements Pub {
String? flutterRootOverride,
bool checkUpToDate = false,
bool shouldSkipThirdPartyGenerator = true,
bool printProgress = true,
}) async {
directory ??= _fileSystem.currentDirectory.path;
final File packageConfigFile = _fileSystem.file(
......@@ -232,9 +234,9 @@ class _DefaultPub implements Pub {
}
final String command = upgrade ? 'upgrade' : 'get';
final Status status = _logger.startProgress(
final Status? status = printProgress ? _logger.startProgress(
'Running "flutter pub $command" in ${_fileSystem.path.basename(directory)}...',
);
) : null;
final bool verbose = _logger.isVerbose;
final List<String> args = <String>[
if (verbose)
......@@ -257,10 +259,10 @@ class _DefaultPub implements Pub {
retry: !offline,
flutterRootOverride: flutterRootOverride,
);
status.stop();
status?.stop();
// The exception is rethrown, so don't catch only Exceptions.
} catch (exception) { // ignore: avoid_catches_without_on_clauses
status.cancel();
status?.cancel();
rethrow;
}
......
......@@ -119,5 +119,6 @@ class FakePub extends Fake implements Pub {
String flutterRootOverride,
bool checkUpToDate = false,
bool shouldSkipThirdPartyGenerator = true,
bool printProgress = true,
}) async { }
}
......@@ -114,6 +114,7 @@ class FakePub extends Fake implements Pub {
String flutterRootOverride,
bool checkUpToDate = false,
bool shouldSkipThirdPartyGenerator = true,
bool printProgress = true,
}) async {
fileSystem.currentDirectory
.childDirectory('.dart_tool')
......
// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
import 'dart:async';
import 'package:flutter_tools/src/base/task_queue.dart';
import '../../src/common.dart';
void main() {
group('TaskQueue', () {
/// A special test designed to check shared [TaskQueue]
/// behavior when exceptions occur after a delay in the passed closures to
/// [TaskQueue.add].
test('no deadlock when delayed exceptions fire in closures', () async {
final TaskQueue<void> sharedTracker = TaskQueue<void>(maxJobs: 2);
expect(() async {
final Future<void> t = Future<void>.delayed(const Duration(milliseconds: 10), () => throw TestException());
await sharedTracker.add(() => t);
return t;
}, throwsA(const TypeMatcher<TestException>()));
expect(() async {
final Future<void> t = Future<void>.delayed(const Duration(milliseconds: 10), () => throw TestException());
await sharedTracker.add(() => t);
return t;
}, throwsA(const TypeMatcher<TestException>()));
expect(() async {
final Future<void> t = Future<void>.delayed(const Duration(milliseconds: 10), () => throw TestException());
await sharedTracker.add(() => t);
return t;
}, throwsA(const TypeMatcher<TestException>()));
expect(() async {
final Future<void> t = Future<void>.delayed(const Duration(milliseconds: 10), () => throw TestException());
await sharedTracker.add(() => t);
return t;
}, throwsA(const TypeMatcher<TestException>()));
/// We deadlock here if the exception is not handled properly.
await sharedTracker.tasksComplete;
});
test('basic sequential processing works with no deadlock', () async {
final Set<int> completed = <int>{};
final TaskQueue<void> tracker = TaskQueue<void>(maxJobs: 1);
await tracker.add(() async => completed.add(1));
await tracker.add(() async => completed.add(2));
await tracker.add(() async => completed.add(3));
await tracker.tasksComplete;
expect(completed.length, equals(3));
});
test('basic sequential processing works on exceptions', () async {
final Set<int> completed = <int>{};
final TaskQueue<void> tracker = TaskQueue<void>(maxJobs: 1);
await tracker.add(() async => completed.add(0));
await tracker.add(() async => throw TestException()).catchError((Object _) {});
await tracker.add(() async => throw TestException()).catchError((Object _) {});
await tracker.add(() async => completed.add(3));
await tracker.tasksComplete;
expect(completed.length, equals(2));
});
/// Verify that if there are more exceptions than the maximum number
/// of in-flight [Future]s that there is no deadlock.
test('basic parallel processing works with no deadlock', () async {
final Set<int> completed = <int>{};
final TaskQueue<void> tracker = TaskQueue<void>(maxJobs: 10);
for (int i = 0; i < 100; i++) {
await tracker.add(() async => completed.add(i));
}
await tracker.tasksComplete;
expect(completed.length, equals(100));
});
test('basic parallel processing works on exceptions', () async {
final Set<int> completed = <int>{};
final TaskQueue<void> tracker = TaskQueue<void>(maxJobs: 10);
for (int i = 0; i < 50; i++) {
await tracker.add(() async => completed.add(i));
}
for (int i = 50; i < 65; i++) {
try {
await tracker.add(() async => throw TestException());
} on TestException {
// Ignore
}
}
for (int i = 65; i < 100; i++) {
await tracker.add(() async => completed.add(i));
}
await tracker.tasksComplete;
expect(completed.length, equals(85));
});
});
}
class TestException implements Exception {}
\ No newline at end of file
......@@ -1073,6 +1073,7 @@ class FakePub extends Fake implements Pub {
String flutterRootOverride,
bool checkUpToDate = false,
bool shouldSkipThirdPartyGenerator = true,
bool printProgress = true,
}) async {
calledGet += 1;
}
......
......@@ -723,5 +723,6 @@ class FakePub extends Fake implements Pub {
String flutterRootOverride,
bool checkUpToDate = false,
bool shouldSkipThirdPartyGenerator = true,
bool printProgress = true,
}) async { }
}
......@@ -31,6 +31,7 @@ class ThrowingPub implements Pub {
String? flutterRootOverride,
bool checkUpToDate = false,
bool shouldSkipThirdPartyGenerator = true,
bool printProgress = true,
}) {
throw UnsupportedError('Attempted to invoke pub during test.');
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment