runner.dart 4.51 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

import 'dart:async';
import 'dart:convert';
import 'dart:io';

import 'package:vm_service_client/vm_service_client.dart';

import 'package:flutter_devicelab/framework/utils.dart';

/// Slightly longer than task timeout that gives the task runner a chance to
/// clean-up before forcefully quitting it.
const Duration taskTimeoutWithGracePeriod = const Duration(minutes: 11);

/// Runs a task in a separate Dart VM and collects the result using the VM
/// service protocol.
///
/// [taskName] is the name of the task. The corresponding task executable is
/// expected to be found under `bin/tasks`.
22 23 24 25
///
/// Running the task in [silent] mode will suppress standard output from task
/// processes and only print standard errors.
Future<Map<String, dynamic>> runTask(String taskName, { bool silent: false }) async {
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
  String taskExecutable = 'bin/tasks/$taskName.dart';

  if (!file(taskExecutable).existsSync())
    throw 'Executable Dart file not found: $taskExecutable';

  int vmServicePort = await _findAvailablePort();
  Process runner = await startProcess(dartBin, <String>[
    '--enable-vm-service=$vmServicePort',
    '--no-pause-isolates-on-exit',
    taskExecutable,
  ]);

  bool runnerFinished = false;

  runner.exitCode.then((_) {
    runnerFinished = true;
  });

  StreamSubscription<String> stdoutSub = runner.stdout
      .transform(new Utf8Decoder())
      .transform(new LineSplitter())
      .listen((String line) {
48 49 50
    if (!silent) {
      stdout.writeln('[$taskName] [STDOUT] $line');
    }
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
  });

  StreamSubscription<String> stderrSub = runner.stderr
      .transform(new Utf8Decoder())
      .transform(new LineSplitter())
      .listen((String line) {
    stderr.writeln('[$taskName] [STDERR] $line');
  });

  String waitingFor = 'connection';
  try {
    VMIsolate isolate = await _connectToRunnerIsolate(vmServicePort);
    waitingFor = 'task completion';
    Map<String, dynamic> taskResult =
        await isolate.invokeExtension('ext.cocoonRunTask').timeout(taskTimeoutWithGracePeriod);
    waitingFor = 'task process to exit';
    await runner.exitCode.timeout(const Duration(seconds: 1));
    return taskResult;
  } on TimeoutException catch (timeout) {
    runner.kill(ProcessSignal.SIGINT);
    return <String, dynamic>{
      'success': false,
      'reason': 'Timeout waiting for $waitingFor: ${timeout.message}',
    };
  } finally {
    if (!runnerFinished)
      runner.kill(ProcessSignal.SIGKILL);
    await stdoutSub.cancel();
    await stderrSub.cancel();
  }
}

Future<VMIsolate> _connectToRunnerIsolate(int vmServicePort) async {
  String url = 'ws://localhost:$vmServicePort/ws';
  DateTime started = new DateTime.now();

  // TODO(yjbanov): due to lack of imagination at the moment the handshake with
  //                the task process is very rudimentary and requires this small
  //                delay to let the task process open up the VM service port.
  //                Otherwise we almost always hit the non-ready case first and
  //                wait a whole 1 second, which is annoying.
  await new Future<Null>.delayed(const Duration(milliseconds: 100));

  while (true) {
    try {
      // Make sure VM server is up by successfully opening and closing a socket.
      await (await WebSocket.connect(url)).close();

      // Look up the isolate.
      VMServiceClient client = new VMServiceClient.connect(url);
      VM vm = await client.getVM();
      VMIsolate isolate = vm.isolates.single;
      String response = await isolate.invokeExtension('ext.cocoonRunnerReady');
      if (response != 'ready') throw 'not ready yet';
      return isolate;
    } catch (error) {
      const Duration connectionTimeout = const Duration(seconds: 2);
      if (new DateTime.now().difference(started) > connectionTimeout) {
        throw new TimeoutException(
          'Failed to connect to the task runner process',
          connectionTimeout,
        );
      }
      print('VM service not ready yet: $error');
      const Duration pauseBetweenRetries = const Duration(milliseconds: 200);
      print('Will retry in $pauseBetweenRetries.');
      await new Future<Null>.delayed(pauseBetweenRetries);
    }
  }
}

Future<int> _findAvailablePort() async {
  int port = 20000;
  while (true) {
    try {
      ServerSocket socket =
          await ServerSocket.bind(InternetAddress.LOOPBACK_IP_V4, port);
      await socket.close();
      return port;
    } catch (_) {
      port++;
    }
  }
}