Unverified Commit 3c6b760e authored by jellynoone's avatar jellynoone Committed by GitHub

Refractor `compute` (#99527)

parent 1d2f9c97
...@@ -13,56 +13,81 @@ import 'isolates.dart' as isolates; ...@@ -13,56 +13,81 @@ import 'isolates.dart' as isolates;
/// The dart:io implementation of [isolate.compute]. /// The dart:io implementation of [isolate.compute].
Future<R> compute<Q, R>(isolates.ComputeCallback<Q, R> callback, Q message, { String? debugLabel }) async { Future<R> compute<Q, R>(isolates.ComputeCallback<Q, R> callback, Q message, { String? debugLabel }) async {
debugLabel ??= kReleaseMode ? 'compute' : callback.toString(); debugLabel ??= kReleaseMode ? 'compute' : callback.toString();
final Flow flow = Flow.begin(); final Flow flow = Flow.begin();
Timeline.startSync('$debugLabel: start', flow: flow); Timeline.startSync('$debugLabel: start', flow: flow);
final ReceivePort resultPort = ReceivePort(); final RawReceivePort port = RawReceivePort();
final ReceivePort exitPort = ReceivePort();
final ReceivePort errorPort = ReceivePort();
Timeline.finishSync();
await Isolate.spawn<_IsolateConfiguration<Q, FutureOr<R>>>(
_spawn,
_IsolateConfiguration<Q, FutureOr<R>>(
callback,
message,
resultPort.sendPort,
debugLabel,
flow.id,
),
errorsAreFatal: true,
onExit: exitPort.sendPort,
onError: errorPort.sendPort,
);
final Completer<R> result = Completer<R>();
errorPort.listen((dynamic errorData) {
assert(errorData is List<dynamic>);
if (errorData is List<dynamic>) {
assert(errorData.length == 2);
final Exception exception = Exception(errorData[0]);
final StackTrace stack = StackTrace.fromString(errorData[1] as String);
if (result.isCompleted) {
Zone.current.handleUncaughtError(exception, stack);
} else {
result.completeError(exception, stack);
}
}
});
exitPort.listen((dynamic exitData) {
if (!result.isCompleted) {
result.completeError(Exception('Isolate exited without result or error.'));
}
});
resultPort.listen((dynamic resultData) {
assert(resultData == null || resultData is R);
if (!result.isCompleted)
result.complete(resultData as R);
});
await result.future;
Timeline.startSync('$debugLabel: end', flow: Flow.end(flow.id));
resultPort.close();
exitPort.close();
errorPort.close();
Timeline.finishSync(); Timeline.finishSync();
return result.future;
void _timeEndAndCleanup() {
Timeline.startSync('$debugLabel: end', flow: Flow.end(flow.id));
port.close();
Timeline.finishSync();
}
final Completer<dynamic> completer = Completer<dynamic>();
port.handler = (dynamic msg) {
_timeEndAndCleanup();
completer.complete(msg);
};
try {
await Isolate.spawn<_IsolateConfiguration<Q, R>>(
_spawn,
_IsolateConfiguration<Q, R>(
callback,
message,
port.sendPort,
debugLabel,
flow.id,
),
errorsAreFatal: true,
onExit: port.sendPort,
onError: port.sendPort,
debugName: debugLabel,
);
} on Object {
_timeEndAndCleanup();
rethrow;
}
final dynamic response = await completer.future;
if(response == null) {
throw RemoteError('Isolate exited without result or error.', '');
}
assert(response is List<dynamic>);
response as List<dynamic>;
final int type = response.length;
assert(1 <= type && type <= 3);
switch (type) {
// success; see _buildSuccessResponse
case 1:
assert(response[0] is R);
return response[0] as R;
// native error; see Isolate.addErrorListener
case 2:
await Future<Never>.error(RemoteError(
response[0] as String,
response[1] as String,
));
// caught error; see _buildErrorResponse
case 3:
default:
assert(type == 3);
assert(response[0] is Object);
assert(response[1] is StackTrace);
assert(response[2] == null);
await Future<Never>.error(
response[0] as Object,
response[1] as StackTrace,
);
}
} }
@immutable @immutable
...@@ -80,21 +105,52 @@ class _IsolateConfiguration<Q, R> { ...@@ -80,21 +105,52 @@ class _IsolateConfiguration<Q, R> {
final String debugLabel; final String debugLabel;
final int flowId; final int flowId;
FutureOr<R> apply() => callback(message); FutureOr<R> applyAndTime() {
return Timeline.timeSync(
debugLabel,
() => callback(message),
flow: Flow.step(flowId),
);
}
} }
Future<void> _spawn<Q, R>(_IsolateConfiguration<Q, FutureOr<R>> configuration) async { /// The spawn point MUST guarantee only one result event is sent through the
final R result = await Timeline.timeSync( /// [SendPort.send] be it directly or indirectly i.e. [Isolate.exit].
configuration.debugLabel, ///
() async { /// In case an [Error] or [Exception] are thrown AFTER the data
final FutureOr<R> applicationResult = await configuration.apply(); /// is sent, they will NOT be handled or reported by the main [Isolate] because
return await applicationResult; /// it stops listening after the first event is received.
}, ///
flow: Flow.step(configuration.flowId), /// Also use the helpers [_buildSuccessResponse] and [_buildErrorResponse] to
); /// build the response
Timeline.timeSync( Future<void> _spawn<Q, R>(_IsolateConfiguration<Q, R> configuration) async {
'${configuration.debugLabel}: exiting and returning a result', () {}, late final List<dynamic> computationResult;
flow: Flow.step(configuration.flowId),
); try {
Isolate.exit(configuration.resultPort, result); computationResult = _buildSuccessResponse(await configuration.applyAndTime());
} catch (e, s) {
computationResult = _buildErrorResponse(e, s);
}
Isolate.exit(configuration.resultPort, computationResult);
}
/// Wrap in [List] to ensure our expectations in the main [Isolate] are met.
///
/// We need to wrap a success result in a [List] because the user provided type
/// [R] could also be a [List]. Meaning, a check `result is R` could return true
/// for what was an error event.
List<R> _buildSuccessResponse<R>(R result) {
return List<R>.filled(1, result);
}
/// Wrap in [List] to ensure our expectations in the main isolate are met.
///
/// We wrap a caught error in a 3 element [List]. Where the last element is
/// always null. We do this so we have a way to know if an error was one we
/// caught or one thrown by the library code.
List<dynamic> _buildErrorResponse(Object error, StackTrace stack) {
return List<dynamic>.filled(3, null)
..[0] = error
..[1] = stack;
} }
...@@ -11,8 +11,12 @@ import '_isolates_io.dart' ...@@ -11,8 +11,12 @@ import '_isolates_io.dart'
/// ///
/// {@macro flutter.foundation.compute.types} /// {@macro flutter.foundation.compute.types}
/// ///
/// Instances of [ComputeCallback] must be top-level functions or static methods /// Instances of [ComputeCallback] must be sendable between isolates i.e.
/// of classes, not closures or instance methods of objects. /// top-level functions, static methods or a closures that only capture objects
/// sendable between isolates or an instance methods which instance properties
/// are also sendable.
///
/// {@macro flutter.foundation.compute.closure.note}
/// ///
/// {@macro flutter.foundation.compute.limitations} /// {@macro flutter.foundation.compute.limitations}
typedef ComputeCallback<Q, R> = FutureOr<R> Function(Q message); typedef ComputeCallback<Q, R> = FutureOr<R> Function(Q message);
...@@ -33,13 +37,23 @@ typedef ComputeCallback<Q, R> = FutureOr<R> Function(Q message); ...@@ -33,13 +37,23 @@ typedef ComputeCallback<Q, R> = FutureOr<R> Function(Q message);
/// * `R` is the type of the value returned. /// * `R` is the type of the value returned.
/// {@endtemplate} /// {@endtemplate}
/// ///
/// The `callback` argument must be a top-level function, not a closure or an /// The `callback` must be sendable between isolates i.e. a top-level function,
/// instance or static method of a class. /// static method or a closure that only captures objects sendable between
/// isolates or an instance method which instance properties are also sendable.
///
/// {@template flutter.foundation.compute.closure.note}
/// However, using arbitrary closures should be done with great care because
/// it may be that the closure captures more variables than initially thought.
/// See the underlying [dart-lang/sdk#36983](https://github.com/dart-lang/sdk/issues/36983)
/// issue.
/// {@endtemplate}
/// ///
/// {@template flutter.foundation.compute.limitations} /// {@template flutter.foundation.compute.limitations}
/// There are limitations on the values that can be sent and received to and /// There are limitations on the values that can be sent and received to and
/// from isolates. These limitations constrain the values of `Q` and `R` that /// from isolates. These limitations constrain the values of `Q` and `R` that
/// are possible. See the discussion at [SendPort.send]. /// are possible. See the discussion at [SendPort.send].
///
/// The same limitations apply to any errors generated by the computation.
/// {@endtemplate} /// {@endtemplate}
/// ///
/// The `debugLabel` argument can be specified to provide a name to add to the /// The `debugLabel` argument can be specified to provide a name to add to the
......
// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// A test script that invokes compute() to start an isolate.
import 'package:flutter/src/foundation/_isolates_io.dart';
int getLength(String s) {
throw 10;
}
Future<void> main() async {
const String s = 'hello world';
try {
await compute(getLength, s);
} catch (e) {
if (e != 10) {
throw Exception('compute threw bad result');
}
}
}
// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// A test script that invokes compute() to start an isolate.
import 'dart:isolate';
import 'package:flutter/src/foundation/_isolates_io.dart';
int getLength(String s) {
final ReceivePort r = ReceivePort();
try {
throw r;
} finally {
r.close();
}
}
Future<void> main() async {
const String s = 'hello world';
bool wasError = false;
try {
await compute(getLength, s);
} on RemoteError {
wasError = true;
}
if (!wasError) {
throw Exception('compute threw bad result');
}
}
// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// A test script that invokes compute() to start an isolate.
import 'dart:isolate';
import 'package:flutter/src/foundation/_isolates_io.dart';
int getLength(ReceivePort s) {
return 0;
}
Future<void> main() async {
final ReceivePort s = ReceivePort();
bool wasError = false;
try {
await compute(getLength, s);
} on Object {
wasError = true;
}
s.close();
assert(wasError);
}
...@@ -3,12 +3,15 @@ ...@@ -3,12 +3,15 @@
// found in the LICENSE file. // found in the LICENSE file.
import 'dart:io'; import 'dart:io';
import 'dart:isolate';
import 'package:file/file.dart'; import 'package:file/file.dart';
import 'package:file/local.dart'; import 'package:file/local.dart';
import 'package:flutter/foundation.dart'; import 'package:flutter/foundation.dart';
import 'package:flutter_test/flutter_test.dart'; import 'package:flutter_test/flutter_test.dart';
import 'package:platform/platform.dart'; import 'package:platform/platform.dart';
final Matcher throwsRemoteError = throwsA(isA<RemoteError>());
int test1(int value) { int test1(int value) {
return value + 1; return value + 1;
} }
...@@ -17,6 +20,22 @@ int test2(int value) { ...@@ -17,6 +20,22 @@ int test2(int value) {
throw 2; throw 2;
} }
int test3(int value) {
Isolate.exit();
}
int test4(int value) {
Isolate.current.kill();
return value + 1;
}
int test5(int value) {
Isolate.current.kill(priority: Isolate.immediate);
return value + 1;
}
Future<int> test1Async(int value) async { Future<int> test1Async(int value) async {
return value + 1; return value + 1;
} }
...@@ -25,26 +44,165 @@ Future<int> test2Async(int value) async { ...@@ -25,26 +44,165 @@ Future<int> test2Async(int value) async {
throw 2; throw 2;
} }
Future<int> test3Async(int value) async {
Isolate.exit();
}
Future<int> test4Async(int value) async {
Isolate.current.kill();
return value + 1;
}
Future<int> test5Async(int value) async {
Isolate.current.kill(priority: Isolate.immediate);
return value + 1;
}
Future<int> test1CallCompute(int value) {
return compute(test1, value);
}
Future<int> test2CallCompute(int value) {
return compute(test2, value);
}
Future<int> test3CallCompute(int value) {
return compute(test3, value);
}
Future<int> test4CallCompute(int value) {
return compute(test4, value);
}
Future<int> test5CallCompute(int value) {
return compute(test5, value);
}
Future<void> expectFileClosesAllPorts(String filename) async {
// Run a Dart script that calls compute().
// The Dart process will terminate only if the script exits cleanly with
// all isolate ports closed.
const FileSystem fs = LocalFileSystem();
const Platform platform = LocalPlatform();
final String flutterRoot = platform.environment['FLUTTER_ROOT']!;
final String dartPath = fs.path.join(flutterRoot, 'bin', 'cache', 'dart-sdk', 'bin', 'dart');
final String packageRoot = fs.path.dirname(fs.path.fromUri(platform.script));
final String scriptPath = fs.path.join(packageRoot, 'test', 'foundation', filename);
final ProcessResult result = await Process.run(dartPath, <String>[scriptPath]);
expect(result.exitCode, 0);
}
class ComputeTestSubject {
ComputeTestSubject(this.base, [this.additional]);
final int base;
final dynamic additional;
int method(int x) {
return base * x;
}
static int staticMethod(int square) {
return square * square;
}
}
Future<int> computeStaticMethod(int square) {
return compute(ComputeTestSubject.staticMethod, square);
}
Future<int> computeClosure(int square) {
return compute((_) => square * square, null);
}
Future<int> computeInvalidClosure(int square) {
final ReceivePort r = ReceivePort();
return compute((_) {
r.sendPort.send('Computing!');
return square * square;
}, null);
}
Future<int> computeInstanceMethod(int square) {
final ComputeTestSubject subject = ComputeTestSubject(square);
return compute(subject.method, square);
}
Future<int> computeInvalidInstanceMethod(int square) {
final ComputeTestSubject subject = ComputeTestSubject(square, ReceivePort());
return compute(subject.method, square);
}
dynamic testInvalidResponse(int square) {
final ReceivePort r = ReceivePort();
try {
return r;
} finally {
r.close();
}
}
dynamic testInvalidError(int square) {
final ReceivePort r = ReceivePort();
try {
throw r;
} finally {
r.close();
}
}
String? testDebugName(_) {
return Isolate.current.debugName;
}
void main() { void main() {
test('compute()', () async { test('compute()', () async {
expect(await compute(test1, 0), 1); expect(await compute(test1, 0), 1);
expect(compute(test2, 0), throwsException); expect(compute(test2, 0), throwsA(2));
expect(compute(test3, 0), throwsRemoteError);
expect(await compute(test4, 0), 1);
expect(compute(test5, 0), throwsRemoteError);
expect(await compute(test1Async, 0), 1); expect(await compute(test1Async, 0), 1);
expect(compute(test2Async, 0), throwsException); expect(compute(test2Async, 0), throwsA(2));
expect(compute(test3Async, 0), throwsRemoteError);
expect(await compute(test4Async, 0), 1);
expect(compute(test5Async, 0), throwsRemoteError);
expect(await compute(test1CallCompute, 0), 1);
expect(compute(test2CallCompute, 0), throwsA(2));
expect(compute(test3CallCompute, 0), throwsRemoteError);
expect(await compute(test4CallCompute, 0), 1);
expect(compute(test5CallCompute, 0), throwsRemoteError);
expect(compute(testInvalidResponse, 0), throwsRemoteError);
expect(compute(testInvalidError, 0), throwsRemoteError);
expect(await computeStaticMethod(10), 100);
expect(await computeClosure(10), 100);
expect(computeInvalidClosure(10), throwsArgumentError);
expect(await computeInstanceMethod(10), 100);
expect(computeInvalidInstanceMethod(10), throwsArgumentError);
expect(await compute(testDebugName, null, debugLabel: 'debug_name'), 'debug_name');
}, skip: kIsWeb); // [intended] isn't supported on the web. }, skip: kIsWeb); // [intended] isn't supported on the web.
test('compute closes all ports', () async { group('compute closes all ports', () {
// Run a Dart script that calls compute(). test('with valid message', () async {
// The Dart process will terminate only if the script exits cleanly with await expectFileClosesAllPorts('_compute_caller.dart');
// all isolate ports closed. });
const FileSystem fs = LocalFileSystem(); test('with invalid message', () async {
const Platform platform = LocalPlatform(); await expectFileClosesAllPorts('_compute_caller_invalid_message.dart');
final String flutterRoot = platform.environment['FLUTTER_ROOT']!; });
final String dartPath = fs.path.join(flutterRoot, 'bin', 'cache', 'dart-sdk', 'bin', 'dart'); test('with valid error', () async {
final String packageRoot = fs.path.dirname(fs.path.fromUri(platform.script)); await expectFileClosesAllPorts('_compute_caller.dart');
final String scriptPath = fs.path.join(packageRoot, 'test', 'foundation', '_compute_caller.dart'); });
final ProcessResult result = await Process.run(dartPath, <String>[scriptPath]); test('with invalid error', () async {
expect(result.exitCode, 0); await expectFileClosesAllPorts('_compute_caller_invalid_message.dart');
});
}, skip: kIsWeb); // [intended] isn't supported on the web. }, skip: kIsWeb); // [intended] isn't supported on the web.
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment