// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

import 'dart:async';
import 'dart:convert';
import 'dart:io';

import 'package:meta/meta.dart';
import 'package:vm_service/vm_service.dart';

import 'cocoon.dart';
import 'devices.dart';
import 'task_result.dart';
import 'utils.dart';

/// Run a list of tasks.
///
/// For each task, an auto rerun will be triggered when task fails.
///
/// If the task succeeds the first time, it will be recorded as successful.
///
/// If the task fails first, but gets passed in the end, the
/// test will be recorded as successful but with a flake flag.
///
/// If the task fails all reruns, it will be recorded as failed.
Future<void> runTasks(
  List<String> taskNames, {
  bool exitOnFirstTestFailure = false,
  // terminateStrayDartProcesses defaults to false so that tests don't have to specify it.
  // It is set based on the --terminate-stray-dart-processes command line argument in
  // normal execution, and that flag defaults to true.
  bool terminateStrayDartProcesses = false,
  bool silent = false,
  String? deviceId,
  String? gitBranch,
  String? localEngine,
  String? localEngineSrcPath,
  String? luciBuilder,
  String? resultsPath,
  List<String>? taskArgs,
  @visibleForTesting Map<String, String>? isolateParams,
  @visibleForTesting Function(String) print = print,
  @visibleForTesting List<String>? logs,
}) async {
  for (final String taskName in taskNames) {
    TaskResult result = TaskResult.success(null);
    int retry = 0;
    while (retry <= Cocoon.retryNumber) {
      result = await rerunTask(
        taskName,
        deviceId: deviceId,
        localEngine: localEngine,
        localEngineSrcPath: localEngineSrcPath,
        terminateStrayDartProcesses: terminateStrayDartProcesses,
        silent: silent,
        taskArgs: taskArgs,
        resultsPath: resultsPath,
        gitBranch: gitBranch,
        luciBuilder: luciBuilder,
        isolateParams: isolateParams,
      );

      if (!result.succeeded) {
        retry += 1;
      } else {
        section('Flaky status for "$taskName"');
        if (retry > 0) {
          print('Total ${retry+1} executions: $retry failures and 1 false positive.');
          print('flaky: true');
          // TODO(ianh): stop ignoring this failure. We should set exitCode=1, and quit
          // if exitOnFirstTestFailure is true.
        } else {
          print('Test passed on first attempt.');
          print('flaky: false');
        }
        break;
      }
    }

    if (!result.succeeded) {
      section('Flaky status for "$taskName"');
      print('Consistently failed across all $retry executions.');
      print('flaky: false');
      exitCode = 1;
      if (exitOnFirstTestFailure) {
        return;
      }
    }
  }
}

/// A rerun wrapper for `runTask`.
///
/// This separates reruns in separate sections.
Future<TaskResult> rerunTask(
  String taskName, {
  String? deviceId,
  String? localEngine,
  String? localEngineSrcPath,
  bool terminateStrayDartProcesses = false,
  bool silent = false,
  List<String>? taskArgs,
  String? resultsPath,
  String? gitBranch,
  String? luciBuilder,
  @visibleForTesting Map<String, String>? isolateParams,
}) async {
  section('Running task "$taskName"');
  final TaskResult result = await runTask(
    taskName,
    deviceId: deviceId,
    localEngine: localEngine,
    localEngineSrcPath: localEngineSrcPath,
    terminateStrayDartProcesses: terminateStrayDartProcesses,
    silent: silent,
    taskArgs: taskArgs,
    isolateParams: isolateParams,
  );

  print('Task result:');
  print(const JsonEncoder.withIndent('  ').convert(result));
  section('Finished task "$taskName"');

  if (resultsPath != null) {
    final Cocoon cocoon = Cocoon();
    await cocoon.writeTaskResultToFile(
      builderName: luciBuilder,
      gitBranch: gitBranch,
      result: result,
      resultsPath: resultsPath,
    );
  }
  return result;
}

/// Runs a task in a separate Dart VM and collects the result using the VM
/// service protocol.
///
/// [taskName] is the name of the task. The corresponding task executable is
/// expected to be found under `bin/tasks`.
///
/// Running the task in [silent] mode will suppress standard output from task
/// processes and only print standard errors.
///
/// [taskArgs] are passed to the task executable for additional configuration.
Future<TaskResult> runTask(
  String taskName, {
  bool terminateStrayDartProcesses = false,
  bool silent = false,
  String? localEngine,
  String? localEngineSrcPath,
  String? deviceId,
  List<String>? taskArgs,
  @visibleForTesting Map<String, String>? isolateParams,
}) async {
  final String taskExecutable = 'bin/tasks/$taskName.dart';

  if (!file(taskExecutable).existsSync()) {
    throw 'Executable Dart file not found: $taskExecutable';
  }

  final Process runner = await startProcess(
    dartBin,
    <String>[
      '--disable-dart-dev',
      '--enable-vm-service=0', // zero causes the system to choose a free port
      '--no-pause-isolates-on-exit',
      if (localEngine != null) '-DlocalEngine=$localEngine',
      if (localEngineSrcPath != null) '-DlocalEngineSrcPath=$localEngineSrcPath',
      taskExecutable,
      ...?taskArgs,
    ],
    environment: <String, String>{
      if (deviceId != null)
        DeviceIdEnvName: deviceId,
    },
  );

  bool runnerFinished = false;

  unawaited(runner.exitCode.whenComplete(() {
    runnerFinished = true;
  }));

  final Completer<Uri> uri = Completer<Uri>();

  final StreamSubscription<String> stdoutSub = runner.stdout
      .transform<String>(const Utf8Decoder())
      .transform<String>(const LineSplitter())
      .listen((String line) {
    if (!uri.isCompleted) {
      final Uri? serviceUri = parseServiceUri(line, prefix: RegExp('(Observatory|The Dart VM service is) listening on '));
      if (serviceUri != null) {
        uri.complete(serviceUri);
      }
    }
    if (!silent) {
      stdout.writeln('[$taskName] [STDOUT] $line');
    }
  });

  final StreamSubscription<String> stderrSub = runner.stderr
      .transform<String>(const Utf8Decoder())
      .transform<String>(const LineSplitter())
      .listen((String line) {
    stderr.writeln('[$taskName] [STDERR] $line');
  });

  try {
    final ConnectionResult result = await _connectToRunnerIsolate(await uri.future);
    print('[$taskName] Connected to VM server.');
    isolateParams = isolateParams == null ? <String, String>{} : Map<String, String>.of(isolateParams);
    isolateParams['runProcessCleanup'] = terminateStrayDartProcesses.toString();
    final Map<String, dynamic> taskResultJson = (await result.vmService.callServiceExtension(
      'ext.cocoonRunTask',
      args: isolateParams,
      isolateId: result.isolate.id,
    )).json!;
    final TaskResult taskResult = TaskResult.fromJson(taskResultJson);
    final int exitCode = await runner.exitCode;
    print('[$taskName] Process terminated with exit code $exitCode.');
    return taskResult;
  } catch (error, stack) {
    print('[$taskName] Task runner system failed with exception!\n$error\n$stack');
    rethrow;
  } finally {
    if (!runnerFinished) {
      print('[$taskName] Terminating process...');
      runner.kill(ProcessSignal.sigkill);
    }
    await stdoutSub.cancel();
    await stderrSub.cancel();
  }
}

Future<ConnectionResult> _connectToRunnerIsolate(Uri vmServiceUri) async {
  final List<String> pathSegments = <String>[
    // Add authentication code.
    if (vmServiceUri.pathSegments.isNotEmpty) vmServiceUri.pathSegments[0],
    'ws',
  ];
  final String url = vmServiceUri.replace(scheme: 'ws', pathSegments: pathSegments).toString();
  final Stopwatch stopwatch = Stopwatch()..start();

  while (true) {
    try {
      // Make sure VM server is up by successfully opening and closing a socket.
      await (await WebSocket.connect(url)).close();

      // Look up the isolate.
      final VmService client = await vmServiceConnectUri(url);
      VM vm = await client.getVM();
      while (vm.isolates!.isEmpty) {
        await Future<void>.delayed(const Duration(seconds: 1));
        vm = await client.getVM();
      }
      final IsolateRef isolate = vm.isolates!.first;
      final Response response = await client.callServiceExtension('ext.cocoonRunnerReady', isolateId: isolate.id);
      if (response.json!['response'] != 'ready') {
        throw 'not ready yet';
      }
      return ConnectionResult(client, isolate);
    } catch (error) {
      if (stopwatch.elapsed > const Duration(seconds: 10)) {
        print('VM service still not ready after ${stopwatch.elapsed}: $error\nContinuing to retry...');
      }
      await Future<void>.delayed(const Duration(milliseconds: 50));
    }
  }
}

class ConnectionResult {
  ConnectionResult(this.vmService, this.isolate);

  final VmService vmService;
  final IsolateRef isolate;
}

/// The cocoon client sends an invalid VM service response, we need to intercept it.
Future<VmService> vmServiceConnectUri(String wsUri, {Log? log}) async {
  final WebSocket socket = await WebSocket.connect(wsUri);
  final StreamController<dynamic> controller = StreamController<dynamic>();
  final Completer<dynamic> streamClosedCompleter = Completer<dynamic>();
  socket.listen(
    (dynamic data) {
      final Map<String, dynamic> rawData = json.decode(data as String) as Map<String, dynamic> ;
      if (rawData['result'] == 'ready') {
        rawData['result'] = <String, dynamic>{'response': 'ready'};
        controller.add(json.encode(rawData));
      } else {
        controller.add(data);
      }
    },
    onError: (Object err, StackTrace stackTrace) => controller.addError(err, stackTrace),
    onDone: () => streamClosedCompleter.complete(),
  );

  return VmService(
    controller.stream,
    (String message) => socket.add(message),
    log: log,
    disposeHandler: () => socket.close(),
    streamClosed: streamClosedCompleter.future,
  );
}