flutter_platform.dart 24.8 KB
Newer Older
Ian Hickson's avatar
Ian Hickson committed
1
// Copyright 2014 The Flutter Authors. All rights reserved.
2 3 4
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

5 6
// @dart = 2.8

7 8
import 'dart:async';

9
import 'package:meta/meta.dart';
10
import 'package:package_config/package_config.dart';
11
import 'package:stream_channel/stream_channel.dart';
12
import 'package:test_core/src/platform.dart'; // ignore: implementation_imports
13

14
import '../base/common.dart';
15
import '../base/file_system.dart';
16
import '../base/io.dart';
17
import '../cache.dart';
18
import '../compile.dart';
19
import '../convert.dart';
20
import '../dart/language_version.dart';
21
import '../device.dart';
22
import '../globals_null_migrated.dart' as globals;
23
import '../project.dart';
24
import '../test/test_wrapper.dart';
25 26 27

import 'flutter_tester_device.dart';
import 'font_config_manager.dart';
28
import 'integration_test_device.dart';
29
import 'test_compiler.dart';
30
import 'test_config.dart';
31
import 'test_device.dart';
32
import 'watcher.dart';
33

34 35
/// The address at which our WebSocket server resides and at which the sky_shell
/// processes will host the Observatory server.
36
final Map<InternetAddressType, InternetAddress> _kHosts = <InternetAddressType, InternetAddress>{
37 38
  InternetAddressType.IPv4: InternetAddress.loopbackIPv4,
  InternetAddressType.IPv6: InternetAddress.loopbackIPv6,
39
};
40

41 42
typedef PlatformPluginRegistration = void Function(FlutterPlatform platform);

43 44
/// Configure the `test` package to work with Flutter.
///
45
/// On systems where each [FlutterPlatform] is only used to run one test suite
46
/// (that is, one Dart file with a `*_test.dart` file name and a single `void
47
/// main()`), you can set an observatory port explicitly.
48
FlutterPlatform installHook({
49
  TestWrapper testWrapper = const TestWrapper(),
50
  @required String shellPath,
51
  @required DebuggingOptions debuggingOptions,
52
  TestWatcher watcher,
53 54
  bool enableObservatory = false,
  bool machine = false,
55
  String precompiledDillPath,
56
  Map<String, String> precompiledDillFiles,
57
  bool updateGoldens = false,
58
  bool buildTestAssets = false,
59
  InternetAddressType serverType = InternetAddressType.IPv4,
60
  Uri projectRootDirectory,
61
  FlutterProject flutterProject,
62
  String icudtlPath,
63
  PlatformPluginRegistration platformPluginRegistration,
64 65
  Device integrationTestDevice,
  String integrationTestUserIdentifier,
66
}) {
67
  assert(testWrapper != null);
68
  assert(enableObservatory || (!debuggingOptions.startPaused && debuggingOptions.hostVmServicePort == null));
69 70 71

  // registerPlatformPlugin can be injected for testing since it's not very mock-friendly.
  platformPluginRegistration ??= (FlutterPlatform platform) {
72
    testWrapper.registerPlatformPlugin(
73
      <Runtime>[Runtime.vm],
74
      () {
75
        return platform;
76
      },
77 78 79 80
    );
  };
  final FlutterPlatform platform = FlutterPlatform(
    shellPath: shellPath,
81
    debuggingOptions: debuggingOptions,
82 83 84 85 86 87 88 89 90 91 92
    watcher: watcher,
    machine: machine,
    enableObservatory: enableObservatory,
    host: _kHosts[serverType],
    precompiledDillPath: precompiledDillPath,
    precompiledDillFiles: precompiledDillFiles,
    updateGoldens: updateGoldens,
    buildTestAssets: buildTestAssets,
    projectRootDirectory: projectRootDirectory,
    flutterProject: flutterProject,
    icudtlPath: icudtlPath,
93 94
    integrationTestDevice: integrationTestDevice,
    integrationTestUserIdentifier: integrationTestUserIdentifier,
95
  );
96 97
  platformPluginRegistration(platform);
  return platform;
98 99
}

100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
/// Generates the bootstrap entry point script that will be used to launch an
/// individual test file.
///
/// The [testUrl] argument specifies the path to the test file that is being
/// launched.
///
/// The [host] argument specifies the address at which the test harness is
/// running.
///
/// If [testConfigFile] is specified, it must follow the conventions of test
/// configuration files as outlined in the [flutter_test] library. By default,
/// the test file will be launched directly.
///
/// The [updateGoldens] argument will set the [autoUpdateGoldens] global
/// variable in the [flutter_test] package before invoking the test.
115 116 117 118
///
/// The [integrationTest] argument can be specified to generate the bootstrap
/// for integration tests.
///
119 120
// NOTE: this API is used by the fuchsia source tree, do not add new
// required or position parameters.
121 122 123 124
String generateTestBootstrap({
  @required Uri testUrl,
  @required InternetAddress host,
  File testConfigFile,
125
  bool updateGoldens = false,
126
  String languageVersionHeader = '',
127
  bool nullSafety = false,
128
  bool flutterTestDep = true,
129
  bool integrationTest = false,
130 131 132 133 134
}) {
  assert(testUrl != null);
  assert(host != null);
  assert(updateGoldens != null);

135
  final String websocketUrl = host.type == InternetAddressType.IPv4
136 137 138 139
      ? 'ws://${host.address}'
      : 'ws://[${host.address}]';
  final String encodedWebsocketUrl = Uri.encodeComponent(websocketUrl);

140
  final StringBuffer buffer = StringBuffer();
141
  buffer.write('''
142
$languageVersionHeader
143
import 'dart:async';
144 145
import 'dart:convert';  // flutter_ignore: dart_convert_import
import 'dart:io';  // flutter_ignore: dart_io_import
146
import 'dart:isolate';
147 148 149
''');
  if (flutterTestDep) {
    buffer.write('''
150
import 'package:flutter_test/flutter_test.dart';
151 152 153 154 155 156
''');
  }
  if (integrationTest) {
    buffer.write('''
import 'package:integration_test/integration_test.dart';
import 'dart:developer' as developer;
157 158 159
''');
  }
  buffer.write('''
160
import 'package:test_api/src/remote_listener.dart';
161
import 'package:stream_channel/stream_channel.dart';
162
import 'package:stack_trace/stack_trace.dart';
163 164

import '$testUrl' as test;
165
''');
166 167
  if (testConfigFile != null) {
    buffer.write('''
168
import '${Uri.file(testConfigFile.path)}' as test_config;
169
''');
170 171 172
  }
  buffer.write('''

173
/// Returns a serialized test suite.
174 175
StreamChannel<dynamic> serializeSuite(Function getMain()) {
  return RemoteListener.start(getMain);
176 177
}

178 179 180 181 182 183 184
Future<void> _testMain() async {
''');
  if (integrationTest) {
    buffer.write('''
  IntegrationTestWidgetsFlutterBinding.ensureInitialized();
''');
  }
185 186 187
  // Don't propagate the return value of `test.main` here. If the `main`
  // function on users` test is annotated with `@doNotStore`, it will cause an
  // analyzer error otherwise.
188
  buffer.write('''
189
  await Future(test.main);
190 191
}

192 193 194 195 196 197 198 199 200 201
/// Capture any top-level errors (mostly lazy syntax errors, since other are
/// caught below) and report them to the parent isolate.
void catchIsolateErrors() {
  final ReceivePort errorPort = ReceivePort();
  // Treat errors non-fatal because otherwise they'll be double-printed.
  Isolate.current.setErrorsFatal(false);
  Isolate.current.addErrorListener(errorPort.sendPort);
  errorPort.listen((dynamic message) {
    // Masquerade as an IsolateSpawnException because that's what this would
    // be if the error had been detected statically.
202 203 204 205
    final IsolateSpawnException error = IsolateSpawnException(
        message[0] as String);
    final Trace stackTrace = message[1] == null ?
        Trace(const <Frame>[]) : Trace.parse(message[1] as String);
206 207 208 209
    Zone.current.handleUncaughtError(error, stackTrace);
  });
}

210
void main() {
211
  String serverPort = Platform.environment['SERVER_PORT'] ?? '';
212
  String server = Uri.decodeComponent('$encodedWebsocketUrl:\$serverPort');
213
  StreamChannel<dynamic> testChannel = serializeSuite(() {
214
    catchIsolateErrors();
215
''');
216 217
  if (flutterTestDep) {
    buffer.write('''
218 219
    goldenFileComparator = LocalFileComparator(Uri.parse('$testUrl'));
    autoUpdateGoldenFiles = $updateGoldens;
220 221
''');
  }
222 223
  if (testConfigFile != null) {
    buffer.write('''
224
    return () => test_config.testExecutable(_testMain);
225 226 227
''');
  } else {
    buffer.write('''
228
    return _testMain;
229 230 231 232
''');
  }
  buffer.write('''
  });
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
''');
  if (integrationTest) {
    buffer.write('''
  final callback = (method, params) async {
    testChannel.sink.add(json.decode(params['$kIntegrationTestData'] as String));

    // Result is ignored but null is not accepted here.
    return developer.ServiceExtensionResponse.result('{}');
  };

  developer.registerExtension('$kIntegrationTestMethod', callback);

  testChannel.stream.listen((x) {
    developer.postEvent(
      '$kIntegrationTestExtension',
      {'$kIntegrationTestData': json.encode(x)},
    );
  });
  ''');
  } else {
    buffer.write('''
254
  WebSocket.connect(server).then((WebSocket socket) {
255 256 257 258 259
    socket.map((dynamic message) {
      // We're only communicating with string encoded JSON.
      return json.decode(message as String);
    }).pipe(testChannel.sink);
    socket.addStream(testChannel.stream.map(json.encode));
260
  });
261
''');
262 263 264 265
  }
  buffer.write('''
}
  ''');
266 267 268
  return buffer.toString();
}

269
typedef Finalizer = Future<void> Function();
270

271 272 273
/// The flutter test platform used to integrate with package:test.
class FlutterPlatform extends PlatformPlugin {
  FlutterPlatform({
274
    @required this.shellPath,
275
    @required this.debuggingOptions,
276 277 278 279 280
    this.watcher,
    this.enableObservatory,
    this.machine,
    this.host,
    this.precompiledDillPath,
281
    this.precompiledDillFiles,
282
    this.updateGoldens,
283
    this.buildTestAssets,
284
    this.projectRootDirectory,
285
    this.flutterProject,
286
    this.icudtlPath,
287 288
    this.integrationTestDevice,
    this.integrationTestUserIdentifier,
289 290
  }) : assert(shellPath != null);

291
  final String shellPath;
292
  final DebuggingOptions debuggingOptions;
293 294
  final TestWatcher watcher;
  final bool enableObservatory;
295
  final bool machine;
296
  final InternetAddress host;
297
  final String precompiledDillPath;
298
  final Map<String, String> precompiledDillFiles;
299
  final bool updateGoldens;
300
  final bool buildTestAssets;
301
  final Uri projectRootDirectory;
302
  final FlutterProject flutterProject;
303
  final String icudtlPath;
304

305 306 307 308 309 310 311 312 313
  /// The device to run the test on for Integration Tests.
  ///
  /// If this is null, the test will run as a regular test with the Flutter
  /// Tester; otherwise it will run as a Integration Test on this device.
  final Device integrationTestDevice;
  bool get _isIntegrationTest => integrationTestDevice != null;

  final String integrationTestUserIdentifier;

314
  final FontConfigManager _fontConfigManager = FontConfigManager();
315 316 317

  /// The test compiler produces dill files for each test main.
  ///
318
  /// To speed up compilation, each compile is initialized from an existing
319 320
  /// dill file from previous runs, if possible.
  TestCompiler compiler;
321

322 323 324 325 326 327 328
  // Each time loadChannel() is called, we spin up a local WebSocket server,
  // then spin up the engine in a subprocess. We pass the engine a Dart file
  // that connects to our WebSocket server, then we proxy JSON messages from
  // the test harness to the engine and back again. If at any time the engine
  // crashes, we inject an error into that stream. When the process closes,
  // we clean everything up.

329 330
  int _testCount = 0;

331 332 333 334 335 336 337 338 339
  @override
  Future<RunnerSuite> load(
    String path,
    SuitePlatform platform,
    SuiteConfiguration suiteConfig,
    Object message,
  ) async {
    // loadChannel may throw an exception. That's fine; it will cause the
    // LoadSuite to emit an error, which will be presented to the user.
340 341
    // Except for the Declarer error, which is a specific test incompatibility
    // error we need to catch.
342 343 344 345
    final StreamChannel<dynamic> channel = loadChannel(path, platform);
    final RunnerSuiteController controller = deserializeSuite(path, platform,
      suiteConfig, const PluginEnvironment(), channel, message);
    return controller.suite;
346 347
  }

348
  @override
349
  StreamChannel<dynamic> loadChannel(String path, SuitePlatform platform) {
350 351
    if (_testCount > 0) {
      // Fail if there will be a port conflict.
352
      if (debuggingOptions.hostVmServicePort != null) {
353
        throwToolExit('installHook() was called with an observatory port or debugger mode enabled, but then more than one test suite was run.');
354
      }
355
      // Fail if we're passing in a precompiled entry-point.
356
      if (precompiledDillPath != null) {
357
        throwToolExit('installHook() was called with a precompiled test entry-point, but then more than one test suite was run.');
358
      }
359
    }
360

361
    final int ourTestCount = _testCount;
362
    _testCount += 1;
363 364 365 366
    final StreamController<dynamic> localController = StreamController<dynamic>();
    final StreamController<dynamic> remoteController = StreamController<dynamic>();
    final Completer<_AsyncError> testCompleteCompleter = Completer<_AsyncError>();
    final _FlutterPlatformStreamSinkWrapper<dynamic> remoteSink = _FlutterPlatformStreamSinkWrapper<dynamic>(
367 368 369
      remoteController.sink,
      testCompleteCompleter.future,
    );
370
    final StreamChannel<dynamic> localChannel = StreamChannel<dynamic>.withGuarantees(
371 372 373
      remoteController.stream,
      localController.sink,
    );
374
    final StreamChannel<dynamic> remoteChannel = StreamChannel<dynamic>.withGuarantees(
375 376 377
      localController.stream,
      remoteSink,
    );
378
    testCompleteCompleter.complete(_startTest(path, localChannel, ourTestCount));
379
    return remoteChannel;
380
  }
381

382 383 384 385 386 387 388 389 390
  Future<String> _compileExpressionService(
    String isolateId,
    String expression,
    List<String> definitions,
    List<String> typeDefinitions,
    String libraryUri,
    String klass,
    bool isStatic,
  ) async {
391 392 393 394 395 396
    if (compiler == null || compiler.compiler == null) {
      throw 'Compiler is not set up properly to compile $expression';
    }
    final CompilerOutput compilerOutput =
      await compiler.compiler.compileExpression(expression, definitions,
        typeDefinitions, libraryUri, klass, isStatic);
397 398
    if (compilerOutput != null && compilerOutput.expressionData != null) {
      return base64.encode(compilerOutput.expressionData);
399 400 401 402
    }
    throw 'Failed to compile $expression';
  }

403
  TestDevice _createTestDevice(int ourTestCount) {
404 405 406 407 408 409 410 411
    if (_isIntegrationTest) {
      return IntegrationTestTestDevice(
        id: ourTestCount,
        debuggingOptions: debuggingOptions,
        device: integrationTestDevice,
        userIdentifier: integrationTestUserIdentifier,
      );
    }
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429
    return FlutterTesterTestDevice(
      id: ourTestCount,
      platform: globals.platform,
      fileSystem: globals.fs,
      processManager: globals.processManager,
      logger: globals.logger,
      shellPath: shellPath,
      enableObservatory: enableObservatory,
      machine: machine,
      debuggingOptions: debuggingOptions,
      host: host,
      buildTestAssets: buildTestAssets,
      flutterProject: flutterProject,
      icudtlPath: icudtlPath,
      compileExpression: _compileExpressionService,
      fontConfigManager: _fontConfigManager
    );
  }
430

431
  Future<_AsyncError> _startTest(
432
    String testPath,
433
    StreamChannel<dynamic> testHarnessChannel,
434 435
    int ourTestCount,
  ) async {
436
    globals.printTrace('test $ourTestCount: starting test $testPath');
437

438
    _AsyncError outOfBandError; // error that we couldn't send to the harness that we need to send via our future
439

440
    final List<Finalizer> finalizers = <Finalizer>[]; // Will be run in reverse order.
441 442
    bool controllerSinkClosed = false;
    try {
443
      // Callback can't throw since it's just setting a variable.
444
      unawaited(testHarnessChannel.sink.done.whenComplete(() {
445
        controllerSinkClosed = true;
446
      }));
447

448
      // If a kernel file is given, then use that to launch the test.
449 450 451 452 453
      // If mapping is provided, look kernel file from mapping.
      // If all fails, create a "listener" dart that invokes actual test.
      String mainDart;
      if (precompiledDillPath != null) {
        mainDart = precompiledDillPath;
454 455 456 457 458 459 460 461 462
        // When start paused is specified, it means that the user is likely
        // running this with a debugger attached. Initialize the resident
        // compiler in this case.
        if (debuggingOptions.startPaused) {
          compiler ??= TestCompiler(debuggingOptions.buildInfo, flutterProject, precompiledDillPath: precompiledDillPath);
          final Uri testUri = globals.fs.file(testPath).uri;
          // Trigger a compilation to initialize the resident compiler.
          unawaited(compiler.compile(testUri));
        }
463 464
      } else if (precompiledDillFiles != null) {
        mainDart = precompiledDillFiles[testPath];
465 466
      } else {
        mainDart = _createListenerDart(finalizers, ourTestCount, testPath);
467

468 469 470 471 472
        // Integration test device takes care of the compilation.
        if (integrationTestDevice == null) {
          // Lazily instantiate compiler so it is built only if it is actually used.
          compiler ??= TestCompiler(debuggingOptions.buildInfo, flutterProject);
          mainDart = await compiler.compile(globals.fs.file(mainDart).uri);
473

474 475 476 477
          if (mainDart == null) {
            testHarnessChannel.sink.addError('Compilation failed for testPath=$testPath');
            return null;
          }
478
        }
479
      }
480

481 482
      globals.printTrace('test $ourTestCount: starting test device');
      final TestDevice testDevice = _createTestDevice(ourTestCount);
483
      final Future<StreamChannel<String>> remoteChannelFuture = testDevice.start(mainDart);
484
      finalizers.add(() async {
485 486
        globals.printTrace('test $ourTestCount: ensuring test device is terminated.');
        await testDevice.kill();
487
      });
488

489 490 491 492 493 494 495 496 497 498
      // At this point, these things can happen:
      // A. The test device could crash, in which case [testDevice.finished]
      // will complete.
      // B. The test device could connect to us, in which case
      // [remoteChannelFuture] will complete.
      globals.printTrace('test $ourTestCount: awaiting connection to test device');
      await Future.any<void>(<Future<void>>[
        testDevice.finished,
        () async {
          final Uri processObservatoryUri = await testDevice.observatoryUri;
499 500
          if (processObservatoryUri != null) {
            globals.printTrace('test $ourTestCount: Observatory uri is available at $processObservatoryUri');
501 502
          } else {
            globals.printTrace('test $ourTestCount: Observatory uri is not available');
503
          }
504
          watcher?.handleStartedDevice(processObservatoryUri);
505

506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524
          final StreamChannel<String> remoteChannel = await remoteChannelFuture;
          globals.printTrace('test $ourTestCount: connected to test device, now awaiting test result');

          await _pipeHarnessToRemote(
            id: ourTestCount,
            harnessChannel: testHarnessChannel,
            remoteChannel: remoteChannel,
          );

          globals.printTrace('test $ourTestCount: finished');
          await watcher?.handleFinishedTest(testDevice);
        }()
      ]);
    } on Exception catch (error, stackTrace) {
      Object reportedError = error;
      StackTrace reportedStackTrace = stackTrace;
      if (error is TestDeviceException) {
        reportedError = error.message;
        reportedStackTrace = error.stackTrace;
525
      }
526

527
      globals.printTrace('test $ourTestCount: error caught during test; ${controllerSinkClosed ? "reporting to console" : "sending to test framework"}');
528
      if (!controllerSinkClosed) {
529
        testHarnessChannel.sink.addError(reportedError, reportedStackTrace);
530
      } else {
531 532
        globals.printError('unhandled error during test:\n$testPath\n$reportedError\n$reportedStackTrace');
        outOfBandError ??= _AsyncError(reportedError, reportedStackTrace);
533 534
      }
    } finally {
535
      globals.printTrace('test $ourTestCount: cleaning up...');
536
      // Finalizers are treated like a stack; run them in reverse order.
537
      for (final Finalizer finalizer in finalizers.reversed) {
538 539
        try {
          await finalizer();
540
        } on Exception catch (error, stack) {
541
          globals.printTrace('test $ourTestCount: error while cleaning up; ${controllerSinkClosed ? "reporting to console" : "sending to test framework"}');
542
          if (!controllerSinkClosed) {
543
            testHarnessChannel.sink.addError(error, stack);
544
          } else {
545
            globals.printError('unhandled error during finalization of test:\n$testPath\n$error\n$stack');
546
            outOfBandError ??= _AsyncError(error, stack);
547 548 549
          }
        }
      }
550
      if (!controllerSinkClosed) {
551
        // Waiting below with await.
552
        unawaited(testHarnessChannel.sink.close());
553
        globals.printTrace('test $ourTestCount: waiting for controller sink to close');
554
        await testHarnessChannel.sink.done;
555 556 557
      }
    }
    assert(controllerSinkClosed);
558
    if (outOfBandError != null) {
559
      globals.printTrace('test $ourTestCount: finished with out-of-band failure');
560
    } else {
561
      globals.printTrace('test $ourTestCount: finished');
562
    }
563
    return outOfBandError;
564 565
  }

566
  String _createListenerDart(
567
    List<Finalizer> finalizers,
568 569 570
    int ourTestCount,
    String testPath,
  ) {
571
    // Prepare a temporary directory to store the Dart file that will talk to us.
572
    final Directory tempDir = globals.fs.systemTempDirectory.createTempSync('flutter_test_listener.');
573
    finalizers.add(() async {
574
      globals.printTrace('test $ourTestCount: deleting temporary directory');
575
      tempDir.deleteSync(recursive: true);
576 577 578
    });

    // Prepare the Dart file that will talk to us and start the test.
579
    final File listenerFile = globals.fs.file('${tempDir.path}/listener.dart');
580 581
    listenerFile.createSync();
    listenerFile.writeAsStringSync(_generateTestMain(
582
      testUrl: globals.fs.path.toUri(globals.fs.path.absolute(testPath)),
583 584 585 586
    ));
    return listenerFile.path;
  }

587
  String _generateTestMain({
588
    Uri testUrl,
589
  }) {
590
    assert(testUrl.scheme == 'file');
591
    final File file = globals.fs.file(testUrl);
592 593
    final PackageConfig packageConfig = debuggingOptions.buildInfo.packageConfig;

594 595
    final LanguageVersion languageVersion = determineLanguageVersion(
      file,
596
      packageConfig[flutterProject?.manifest?.appName],
597
      Cache.flutterRoot,
598
    );
599 600
    return generateTestBootstrap(
      testUrl: testUrl,
601
      testConfigFile: findTestConfigFile(globals.fs.file(testUrl), globals.logger),
602 603
      host: host,
      updateGoldens: updateGoldens,
604
      flutterTestDep: packageConfig['flutter_test'] != null,
605 606
      languageVersionHeader: '// @dart=${languageVersion.major}.${languageVersion.minor}',
      integrationTest: _isIntegrationTest,
607
    );
608 609
  }

610 611 612
  @override
  Future<dynamic> close() async {
    if (compiler != null) {
613
      await compiler.dispose();
614 615
      compiler = null;
    }
616
    await _fontConfigManager.dispose();
617
  }
618
}
619

620 621 622 623 624 625 626 627 628 629 630 631 632
// The [_shellProcessClosed] future can't have errors thrown on it because it
// crosses zones (it's fed in a zone created by the test package, but listened
// to by a parent zone, the same zone that calls [close] below).
//
// This is because Dart won't let errors that were fed into a Future in one zone
// propagate to listeners in another zone. (Specifically, the zone in which the
// future was completed with the error, and the zone in which the listener was
// registered, are what matters.)
//
// Because of this, the [_shellProcessClosed] future takes an [_AsyncError]
// object as a result. If it's null, it's as if it had completed correctly; if
// it's non-null, it contains the error and stack trace of the actual error, as
// if it had completed with that error.
633 634
class _FlutterPlatformStreamSinkWrapper<S> implements StreamSink<S> {
  _FlutterPlatformStreamSinkWrapper(this._parent, this._shellProcessClosed);
635

636
  final StreamSink<S> _parent;
637
  final Future<_AsyncError> _shellProcessClosed;
638 639

  @override
640
  Future<void> get done => _done.future;
641
  final Completer<void> _done = Completer<void>();
642 643 644

  @override
  Future<dynamic> close() {
645
    Future.wait<dynamic>(<Future<dynamic>>[
646 647
      _parent.close(),
      _shellProcessClosed,
648
    ]).then<void>(
649 650 651
      (List<dynamic> futureResults) {
        assert(futureResults.length == 2);
        assert(futureResults.first == null);
652 653 654
        final dynamic lastResult = futureResults.last;
        if (lastResult is _AsyncError) {
          _done.completeError(lastResult.error, lastResult.stack);
655
        } else {
656
          assert(lastResult == null);
657 658
          _done.complete();
        }
659
      },
660
      onError: _done.completeError,
661 662 663 664 665 666 667
    );
    return done;
  }

  @override
  void add(S event) => _parent.add(event);
  @override
668
  void addError(dynamic errorEvent, [ StackTrace stackTrace ]) => _parent.addError(errorEvent, stackTrace);
669 670 671
  @override
  Future<dynamic> addStream(Stream<S> stream) => _parent.addStream(stream);
}
672 673 674 675 676 677

@immutable
class _AsyncError {
  const _AsyncError(this.error, this.stack);
  final dynamic error;
  final StackTrace stack;
678
}
679

680
/// Bridges the package:test harness and the remote device.
681
///
682 683 684 685 686 687
/// The returned future completes when either side is closed, which also
/// indicates when the tests have finished.
Future<void> _pipeHarnessToRemote({
  @required int id,
  @required StreamChannel<dynamic> harnessChannel,
  @required StreamChannel<String> remoteChannel,
688
}) async {
689
  globals.printTrace('test $id: Waiting for test harness or tests to finish');
690 691

  await Future.any<void>(<Future<void>>[
692 693 694 695 696 697 698 699 700 701 702 703
    harnessChannel.stream
      .map<String>(json.encode)
      .pipe(remoteChannel.sink)
      .then<void>((void value) {
        globals.printTrace('test $id: Test process is no longer needed by test harness');
      }),
    remoteChannel.stream
      .map<dynamic>(json.decode)
      .pipe(harnessChannel.sink)
      .then<void>((void value) {
        globals.printTrace('test $id: Test harness is no longer needed by test process');
      }),
704 705
  ]);
}