1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:meta/meta.dart';
import 'package:vm_service/vm_service.dart';
import 'cocoon.dart';
import 'devices.dart';
import 'task_result.dart';
import 'utils.dart';
/// Run a list of tasks.
///
/// For each task, an auto rerun will be triggered when task fails.
///
/// If the task succeeds the first time, it will be recorded as successful.
///
/// If the task fails first, but gets passed in the end, the
/// test will be recorded as successful but with a flake flag.
///
/// If the task fails all reruns, it will be recorded as failed.
Future<void> runTasks(
List<String> taskNames, {
bool exitOnFirstTestFailure = false,
// terminateStrayDartProcesses defaults to false so that tests don't have to specify it.
// It is set based on the --terminate-stray-dart-processes command line argument in
// normal execution, and that flag defaults to true.
bool terminateStrayDartProcesses = false,
bool silent = false,
String? deviceId,
String? gitBranch,
String? localEngine,
String? localEngineHost,
String? localEngineSrcPath,
String? luciBuilder,
String? resultsPath,
List<String>? taskArgs,
bool useEmulator = false,
@visibleForTesting Map<String, String>? isolateParams,
@visibleForTesting Function(String) print = print,
@visibleForTesting List<String>? logs,
}) async {
for (final String taskName in taskNames) {
TaskResult result = TaskResult.success(null);
int failureCount = 0;
while (failureCount <= Cocoon.retryNumber) {
result = await rerunTask(
taskName,
deviceId: deviceId,
localEngine: localEngine,
localEngineHost: localEngineHost,
localEngineSrcPath: localEngineSrcPath,
terminateStrayDartProcesses: terminateStrayDartProcesses,
silent: silent,
taskArgs: taskArgs,
resultsPath: resultsPath,
gitBranch: gitBranch,
luciBuilder: luciBuilder,
isolateParams: isolateParams,
useEmulator: useEmulator,
);
if (!result.succeeded) {
failureCount += 1;
if (exitOnFirstTestFailure) {
break;
}
} else {
section('Flaky status for "$taskName"');
if (failureCount > 0) {
print('Total ${failureCount+1} executions: $failureCount failures and 1 false positive.');
print('flaky: true');
// TODO(ianh): stop ignoring this failure. We should set exitCode=1, and quit
// if exitOnFirstTestFailure is true.
} else {
print('Test passed on first attempt.');
print('flaky: false');
}
break;
}
}
if (!result.succeeded) {
section('Flaky status for "$taskName"');
print('Consistently failed across all $failureCount executions.');
print('flaky: false');
exitCode = 1;
if (exitOnFirstTestFailure) {
return;
}
}
}
}
/// A rerun wrapper for `runTask`.
///
/// This separates reruns in separate sections.
Future<TaskResult> rerunTask(
String taskName, {
String? deviceId,
String? localEngine,
String? localEngineHost,
String? localEngineSrcPath,
bool terminateStrayDartProcesses = false,
bool silent = false,
List<String>? taskArgs,
String? resultsPath,
String? gitBranch,
String? luciBuilder,
bool useEmulator = false,
@visibleForTesting Map<String, String>? isolateParams,
}) async {
section('Running task "$taskName"');
final TaskResult result = await runTask(
taskName,
deviceId: deviceId,
localEngine: localEngine,
localEngineHost: localEngineHost,
localEngineSrcPath: localEngineSrcPath,
terminateStrayDartProcesses: terminateStrayDartProcesses,
silent: silent,
taskArgs: taskArgs,
isolateParams: isolateParams,
useEmulator: useEmulator,
);
print('Task result:');
print(const JsonEncoder.withIndent(' ').convert(result));
section('Finished task "$taskName"');
if (resultsPath != null) {
final Cocoon cocoon = Cocoon();
await cocoon.writeTaskResultToFile(
builderName: luciBuilder,
gitBranch: gitBranch,
result: result,
resultsPath: resultsPath,
);
}
return result;
}
/// Runs a task in a separate Dart VM and collects the result using the VM
/// service protocol.
///
/// [taskName] is the name of the task. The corresponding task executable is
/// expected to be found under `bin/tasks`.
///
/// Running the task in [silent] mode will suppress standard output from task
/// processes and only print standard errors.
///
/// [taskArgs] are passed to the task executable for additional configuration.
Future<TaskResult> runTask(
String taskName, {
bool terminateStrayDartProcesses = false,
bool silent = false,
String? localEngine,
String? localEngineHost,
String? localWebSdk,
String? localEngineSrcPath,
String? deviceId,
List<String>? taskArgs,
bool useEmulator = false,
@visibleForTesting Map<String, String>? isolateParams,
}) async {
final String taskExecutable = 'bin/tasks/$taskName.dart';
if (!file(taskExecutable).existsSync()) {
throw 'Executable Dart file not found: $taskExecutable';
}
if (useEmulator) {
taskArgs ??= <String>[];
taskArgs
..add('--android-emulator')
..add('--browser-name=android-chrome');
}
stdout.writeln('Starting process for task: [$taskName]');
final Process runner = await startProcess(
dartBin,
<String>[
'--disable-dart-dev',
'--enable-vm-service=0', // zero causes the system to choose a free port
'--no-pause-isolates-on-exit',
if (localEngine != null) '-DlocalEngine=$localEngine',
if (localEngineHost != null) '-DlocalEngineHost=$localEngineHost',
if (localWebSdk != null) '-DlocalWebSdk=$localWebSdk',
if (localEngineSrcPath != null) '-DlocalEngineSrcPath=$localEngineSrcPath',
taskExecutable,
...?taskArgs,
],
environment: <String, String>{
if (deviceId != null)
DeviceIdEnvName: deviceId,
},
);
bool runnerFinished = false;
unawaited(runner.exitCode.whenComplete(() {
runnerFinished = true;
}));
final Completer<Uri> uri = Completer<Uri>();
final StreamSubscription<String> stdoutSub = runner.stdout
.transform<String>(const Utf8Decoder())
.transform<String>(const LineSplitter())
.listen((String line) {
if (!uri.isCompleted) {
final Uri? serviceUri = parseServiceUri(line, prefix: RegExp('The Dart VM service is listening on '));
if (serviceUri != null) {
uri.complete(serviceUri);
}
}
if (!silent) {
stdout.writeln('[${DateTime.now()}] [STDOUT] $line');
}
});
final StreamSubscription<String> stderrSub = runner.stderr
.transform<String>(const Utf8Decoder())
.transform<String>(const LineSplitter())
.listen((String line) {
stderr.writeln('[${DateTime.now()}] [STDERR] $line');
});
try {
final ConnectionResult result = await _connectToRunnerIsolate(await uri.future);
print('[$taskName] Connected to VM server.');
isolateParams = isolateParams == null ? <String, String>{} : Map<String, String>.of(isolateParams);
isolateParams['runProcessCleanup'] = terminateStrayDartProcesses.toString();
final Map<String, dynamic> taskResultJson = (await result.vmService.callServiceExtension(
'ext.cocoonRunTask',
args: isolateParams,
isolateId: result.isolate.id,
)).json!;
final TaskResult taskResult = TaskResult.fromJson(taskResultJson);
final int exitCode = await runner.exitCode;
print('[$taskName] Process terminated with exit code $exitCode.');
return taskResult;
} catch (error, stack) {
print('[$taskName] Task runner system failed with exception!\n$error\n$stack');
rethrow;
} finally {
if (!runnerFinished) {
print('[$taskName] Terminating process...');
runner.kill(ProcessSignal.sigkill);
}
await stdoutSub.cancel();
await stderrSub.cancel();
}
}
Future<ConnectionResult> _connectToRunnerIsolate(Uri vmServiceUri) async {
final List<String> pathSegments = <String>[
// Add authentication code.
if (vmServiceUri.pathSegments.isNotEmpty) vmServiceUri.pathSegments[0],
'ws',
];
final String url = vmServiceUri.replace(scheme: 'ws', pathSegments: pathSegments).toString();
final Stopwatch stopwatch = Stopwatch()..start();
while (true) {
try {
// Make sure VM server is up by successfully opening and closing a socket.
await (await WebSocket.connect(url)).close();
// Look up the isolate.
final VmService client = await vmServiceConnectUri(url);
VM vm = await client.getVM();
while (vm.isolates!.isEmpty) {
await Future<void>.delayed(const Duration(seconds: 1));
vm = await client.getVM();
}
final IsolateRef isolate = vm.isolates!.first;
final Response response = await client.callServiceExtension('ext.cocoonRunnerReady', isolateId: isolate.id);
if (response.json!['response'] != 'ready') {
throw 'not ready yet';
}
return ConnectionResult(client, isolate);
} catch (error) {
if (stopwatch.elapsed > const Duration(seconds: 10)) {
print('VM service still not ready after ${stopwatch.elapsed}: $error\nContinuing to retry...');
}
await Future<void>.delayed(const Duration(milliseconds: 50));
}
}
}
class ConnectionResult {
ConnectionResult(this.vmService, this.isolate);
final VmService vmService;
final IsolateRef isolate;
}
/// The cocoon client sends an invalid VM service response, we need to intercept it.
Future<VmService> vmServiceConnectUri(String wsUri, {Log? log}) async {
final WebSocket socket = await WebSocket.connect(wsUri);
final StreamController<dynamic> controller = StreamController<dynamic>();
final Completer<dynamic> streamClosedCompleter = Completer<dynamic>();
socket.listen(
(dynamic data) {
final Map<String, dynamic> rawData = json.decode(data as String) as Map<String, dynamic> ;
if (rawData['result'] == 'ready') {
rawData['result'] = <String, dynamic>{'response': 'ready'};
controller.add(json.encode(rawData));
} else {
controller.add(data);
}
},
onError: (Object err, StackTrace stackTrace) => controller.addError(err, stackTrace),
onDone: () => streamClosedCompleter.complete(),
);
return VmService(
controller.stream,
(String message) => socket.add(message),
log: log,
disposeHandler: () => socket.close(),
streamClosed: streamClosedCompleter.future,
);
}