// Copyright 2014 The Flutter Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'package:meta/meta.dart'; import 'package:vm_service/vm_service.dart'; import 'cocoon.dart'; import 'devices.dart'; import 'task_result.dart'; import 'utils.dart'; /// Run a list of tasks. /// /// For each task, an auto rerun will be triggered when task fails. /// /// If the task succeeds the first time, it will be recorded as successful. /// /// If the task fails first, but gets passed in the end, the /// test will be recorded as successful but with a flake flag. /// /// If the task fails all reruns, it will be recorded as failed. Future<void> runTasks( List<String> taskNames, { bool exitOnFirstTestFailure = false, // terminateStrayDartProcesses defaults to false so that tests don't have to specify it. // It is set based on the --terminate-stray-dart-processes command line argument in // normal execution, and that flag defaults to true. bool terminateStrayDartProcesses = false, bool silent = false, String? deviceId, String? gitBranch, String? localEngine, String? localEngineSrcPath, String? luciBuilder, String? resultsPath, List<String>? taskArgs, @visibleForTesting Map<String, String>? isolateParams, @visibleForTesting Function(String) print = print, @visibleForTesting List<String>? logs, }) async { for (final String taskName in taskNames) { TaskResult result = TaskResult.success(null); int retry = 0; while (retry <= Cocoon.retryNumber) { result = await rerunTask( taskName, deviceId: deviceId, localEngine: localEngine, localEngineSrcPath: localEngineSrcPath, terminateStrayDartProcesses: terminateStrayDartProcesses, silent: silent, taskArgs: taskArgs, resultsPath: resultsPath, gitBranch: gitBranch, luciBuilder: luciBuilder, isolateParams: isolateParams, ); if (!result.succeeded) { retry += 1; } else { section('Flaky status for "$taskName"'); if (retry > 0) { print('Total ${retry+1} executions: $retry failures and 1 false positive.'); print('flaky: true'); // TODO(ianh): stop ignoring this failure. We should set exitCode=1, and quit // if exitOnFirstTestFailure is true. } else { print('Test passed on first attempt.'); print('flaky: false'); } break; } } if (!result.succeeded) { section('Flaky status for "$taskName"'); print('Consistently failed across all $retry executions.'); print('flaky: false'); exitCode = 1; if (exitOnFirstTestFailure) { return; } } } } /// A rerun wrapper for `runTask`. /// /// This separates reruns in separate sections. Future<TaskResult> rerunTask( String taskName, { String? deviceId, String? localEngine, String? localEngineSrcPath, bool terminateStrayDartProcesses = false, bool silent = false, List<String>? taskArgs, String? resultsPath, String? gitBranch, String? luciBuilder, @visibleForTesting Map<String, String>? isolateParams, }) async { section('Running task "$taskName"'); final TaskResult result = await runTask( taskName, deviceId: deviceId, localEngine: localEngine, localEngineSrcPath: localEngineSrcPath, terminateStrayDartProcesses: terminateStrayDartProcesses, silent: silent, taskArgs: taskArgs, isolateParams: isolateParams, ); print('Task result:'); print(const JsonEncoder.withIndent(' ').convert(result)); section('Finished task "$taskName"'); if (resultsPath != null) { final Cocoon cocoon = Cocoon(); await cocoon.writeTaskResultToFile( builderName: luciBuilder, gitBranch: gitBranch, result: result, resultsPath: resultsPath, ); } return result; } /// Runs a task in a separate Dart VM and collects the result using the VM /// service protocol. /// /// [taskName] is the name of the task. The corresponding task executable is /// expected to be found under `bin/tasks`. /// /// Running the task in [silent] mode will suppress standard output from task /// processes and only print standard errors. /// /// [taskArgs] are passed to the task executable for additional configuration. Future<TaskResult> runTask( String taskName, { bool terminateStrayDartProcesses = false, bool silent = false, String? localEngine, String? localEngineSrcPath, String? deviceId, List<String>? taskArgs, @visibleForTesting Map<String, String>? isolateParams, }) async { final String taskExecutable = 'bin/tasks/$taskName.dart'; if (!file(taskExecutable).existsSync()) { throw 'Executable Dart file not found: $taskExecutable'; } final Process runner = await startProcess( dartBin, <String>[ '--disable-dart-dev', '--enable-vm-service=0', // zero causes the system to choose a free port '--no-pause-isolates-on-exit', if (localEngine != null) '-DlocalEngine=$localEngine', if (localEngineSrcPath != null) '-DlocalEngineSrcPath=$localEngineSrcPath', taskExecutable, ...?taskArgs, ], environment: <String, String>{ if (deviceId != null) DeviceIdEnvName: deviceId, }, ); bool runnerFinished = false; unawaited(runner.exitCode.whenComplete(() { runnerFinished = true; })); final Completer<Uri> uri = Completer<Uri>(); final StreamSubscription<String> stdoutSub = runner.stdout .transform<String>(const Utf8Decoder()) .transform<String>(const LineSplitter()) .listen((String line) { if (!uri.isCompleted) { final Uri? serviceUri = parseServiceUri(line, prefix: RegExp('(Observatory|The Dart VM service is) listening on ')); if (serviceUri != null) { uri.complete(serviceUri); } } if (!silent) { stdout.writeln('[$taskName] [STDOUT] $line'); } }); final StreamSubscription<String> stderrSub = runner.stderr .transform<String>(const Utf8Decoder()) .transform<String>(const LineSplitter()) .listen((String line) { stderr.writeln('[$taskName] [STDERR] $line'); }); try { final ConnectionResult result = await _connectToRunnerIsolate(await uri.future); print('[$taskName] Connected to VM server.'); isolateParams = isolateParams == null ? <String, String>{} : Map<String, String>.of(isolateParams); isolateParams['runProcessCleanup'] = terminateStrayDartProcesses.toString(); final Map<String, dynamic> taskResultJson = (await result.vmService.callServiceExtension( 'ext.cocoonRunTask', args: isolateParams, isolateId: result.isolate.id, )).json!; final TaskResult taskResult = TaskResult.fromJson(taskResultJson); final int exitCode = await runner.exitCode; print('[$taskName] Process terminated with exit code $exitCode.'); return taskResult; } catch (error, stack) { print('[$taskName] Task runner system failed with exception!\n$error\n$stack'); rethrow; } finally { if (!runnerFinished) { print('[$taskName] Terminating process...'); runner.kill(ProcessSignal.sigkill); } await stdoutSub.cancel(); await stderrSub.cancel(); } } Future<ConnectionResult> _connectToRunnerIsolate(Uri vmServiceUri) async { final List<String> pathSegments = <String>[ // Add authentication code. if (vmServiceUri.pathSegments.isNotEmpty) vmServiceUri.pathSegments[0], 'ws', ]; final String url = vmServiceUri.replace(scheme: 'ws', pathSegments: pathSegments).toString(); final Stopwatch stopwatch = Stopwatch()..start(); while (true) { try { // Make sure VM server is up by successfully opening and closing a socket. await (await WebSocket.connect(url)).close(); // Look up the isolate. final VmService client = await vmServiceConnectUri(url); VM vm = await client.getVM(); while (vm.isolates!.isEmpty) { await Future<void>.delayed(const Duration(seconds: 1)); vm = await client.getVM(); } final IsolateRef isolate = vm.isolates!.first; final Response response = await client.callServiceExtension('ext.cocoonRunnerReady', isolateId: isolate.id); if (response.json!['response'] != 'ready') { throw 'not ready yet'; } return ConnectionResult(client, isolate); } catch (error) { if (stopwatch.elapsed > const Duration(seconds: 10)) { print('VM service still not ready after ${stopwatch.elapsed}: $error\nContinuing to retry...'); } await Future<void>.delayed(const Duration(milliseconds: 50)); } } } class ConnectionResult { ConnectionResult(this.vmService, this.isolate); final VmService vmService; final IsolateRef isolate; } /// The cocoon client sends an invalid VM service response, we need to intercept it. Future<VmService> vmServiceConnectUri(String wsUri, {Log? log}) async { final WebSocket socket = await WebSocket.connect(wsUri); final StreamController<dynamic> controller = StreamController<dynamic>(); final Completer<dynamic> streamClosedCompleter = Completer<dynamic>(); socket.listen( (dynamic data) { final Map<String, dynamic> rawData = json.decode(data as String) as Map<String, dynamic> ; if (rawData['result'] == 'ready') { rawData['result'] = <String, dynamic>{'response': 'ready'}; controller.add(json.encode(rawData)); } else { controller.add(data); } }, onError: (Object err, StackTrace stackTrace) => controller.addError(err, stackTrace), onDone: () => streamClosedCompleter.complete(), ); return VmService( controller.stream, (String message) => socket.add(message), log: log, disposeHandler: () => socket.close(), streamClosed: streamClosedCompleter.future, ); }