// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

import 'dart:async';

import 'package:flutter_tools/src/base/logger.dart';
import 'package:flutter_tools/src/convert.dart';
import 'package:flutter_tools/src/daemon.dart';

import '../src/common.dart';

class FakeDaemonStreams implements DaemonStreams {
  final StreamController<DaemonMessage> inputs = StreamController<DaemonMessage>();
  final StreamController<DaemonMessage> outputs = StreamController<DaemonMessage>();

  @override
  Stream<DaemonMessage> get inputStream {
    return inputs.stream;
  }

  @override
  void send(Map<String, dynamic> message, [ List<int>? binary ]) {
    outputs.add(DaemonMessage(message, binary != null ? Stream<List<int>>.value(binary) : null));
  }

  @override
  Future<void> dispose() async {
    await inputs.close();
    // In some tests, outputs have no listeners. We don't wait for outputs to close.
    unawaited(outputs.close());
  }
}

void main() {
  late BufferLogger bufferLogger;
  late FakeDaemonStreams daemonStreams;
  late DaemonConnection daemonConnection;
  setUp(() {
    bufferLogger = BufferLogger.test();
    daemonStreams = FakeDaemonStreams();
    daemonConnection = DaemonConnection(
      daemonStreams: daemonStreams,
      logger: bufferLogger,
    );
  });

  tearDown(() async {
    await daemonConnection.dispose();
  });

  group('DaemonConnection receiving end', () {
    testWithoutContext('redirects input to incoming commands', () async {
      final Map<String, dynamic> commandToSend = <String, dynamic>{'id': 0, 'method': 'some_method'};
      daemonStreams.inputs.add(DaemonMessage(commandToSend));

      final DaemonMessage commandReceived = await daemonConnection.incomingCommands.first;
      await daemonStreams.dispose();

      expect(commandReceived.data, commandToSend);
    });

    testWithoutContext('listenToEvent can receive the right events', () async {
      final Future<List<DaemonEventData>> events = daemonConnection.listenToEvent('event1').toList();

      daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'event': 'event1', 'params': '1'}));
      daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'event': 'event2', 'params': '2'}));
      daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'event': 'event1', 'params': null}));
      daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'event': 'event1', 'params': 3}));

      await pumpEventQueue();
      await daemonConnection.dispose();

      expect((await events).map((DaemonEventData event) => event.data).toList(), <dynamic>['1', null, 3]);
    });
  });

  group('DaemonConnection sending end', () {
    testWithoutContext('sending requests', () async {
      unawaited(daemonConnection.sendRequest('some_method', 'param'));
      final DaemonMessage message = await daemonStreams.outputs.stream.first;
      expect(message.data['id'], isNotNull);
      expect(message.data['method'], 'some_method');
      expect(message.data['params'], 'param');
    });

    testWithoutContext('sending requests without param', () async {
      unawaited(daemonConnection.sendRequest('some_method'));
      final DaemonMessage message = await daemonStreams.outputs.stream.first;
      expect(message.data['id'], isNotNull);
      expect(message.data['method'], 'some_method');
      expect(message.data['params'], isNull);
    });

    testWithoutContext('sending response', () async {
      daemonConnection.sendResponse('1', 'some_data');
      final DaemonMessage message = await daemonStreams.outputs.stream.first;
      expect(message.data['id'], '1');
      expect(message.data['method'], isNull);
      expect(message.data['error'], isNull);
      expect(message.data['result'], 'some_data');
    });

    testWithoutContext('sending response without data', () async {
      daemonConnection.sendResponse('1');
      final DaemonMessage message = await daemonStreams.outputs.stream.first;
      expect(message.data['id'], '1');
      expect(message.data['method'], isNull);
      expect(message.data['error'], isNull);
      expect(message.data['result'], isNull);
    });

    testWithoutContext('sending error response', () async {
      daemonConnection.sendErrorResponse('1', 'error', StackTrace.fromString('stack trace'));
      final DaemonMessage message = await daemonStreams.outputs.stream.first;
      expect(message.data['id'], '1');
      expect(message.data['method'], isNull);
      expect(message.data['error'], 'error');
      expect(message.data['trace'], 'stack trace');
    });

    testWithoutContext('sending events', () async {
      daemonConnection.sendEvent('some_event', '123');
      final DaemonMessage message = await daemonStreams.outputs.stream.first;
      expect(message.data['id'], isNull);
      expect(message.data['event'], 'some_event');
      expect(message.data['params'], '123');
    });

    testWithoutContext('sending events without params', () async {
      daemonConnection.sendEvent('some_event');
      final DaemonMessage message = await daemonStreams.outputs.stream.first;
      expect(message.data['id'], isNull);
      expect(message.data['event'], 'some_event');
      expect(message.data['params'], isNull);
    });
  });

  group('DaemonConnection request and response', () {
    testWithoutContext('receiving response from requests', () async {
      final Future<dynamic> requestFuture = daemonConnection.sendRequest('some_method', 'param');
      final DaemonMessage message = await daemonStreams.outputs.stream.first;

      expect(message.data['id'], isNotNull);
      expect(message.data['method'], 'some_method');
      expect(message.data['params'], 'param');

      final String id = message.data['id']! as String;
      daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': id, 'result': '123'}));
      expect(await requestFuture, '123');
    });

    testWithoutContext('receiving response from requests without result', () async {
      final Future<dynamic> requestFuture = daemonConnection.sendRequest('some_method', 'param');
      final DaemonMessage message = await daemonStreams.outputs.stream.first;

      expect(message.data['id'], isNotNull);
      expect(message.data['method'], 'some_method');
      expect(message.data['params'], 'param');

      final String id = message.data['id']! as String;
      daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': id}));
      expect(await requestFuture, null);
    });

    testWithoutContext('receiving error response from requests without result', () async {
      final Future<dynamic> requestFuture = daemonConnection.sendRequest('some_method', 'param');
      final DaemonMessage message = await daemonStreams.outputs.stream.first;

      expect(message.data['id'], isNotNull);
      expect(message.data['method'], 'some_method');
      expect(message.data['params'], 'param');

      final String id = message.data['id']! as String;
      daemonStreams.inputs.add(DaemonMessage(<String, dynamic>{'id': id, 'error': 'some_error', 'trace': 'stack trace'}));
      expect(requestFuture, throwsA('some_error'));
    });
  });

  group('DaemonInputStreamConverter', () {
    Map<String, Object?> testCommand(int id, [int? binarySize]) => <String, Object?>{
      'id': id,
      'method': 'test',
      if (binarySize != null)
        '_binaryLength': binarySize,
    };
    List<int> testCommandBinary(int id, [int? binarySize]) => utf8.encode('[${json.encode(testCommand(id, binarySize))}]\n');

    testWithoutContext('can parse a single message', () async {
      final Stream<List<int>> inputStream = Stream<List<int>>.fromIterable(<List<int>>[
        testCommandBinary(10),
      ]);
      final DaemonInputStreamConverter converter = DaemonInputStreamConverter(inputStream);
      final Stream<DaemonMessage> outputStream = converter.convertedStream;
      final List<DaemonMessage> outputs = await outputStream.toList();
      expect(outputs, hasLength(1));
      expect(outputs[0].data, testCommand(10));
      expect(outputs[0].binary, null);
    });

    testWithoutContext('can parse multiple messages', () async {
      final Stream<List<int>> inputStream = Stream<List<int>>.fromIterable(<List<int>>[
        testCommandBinary(10),
        testCommandBinary(20),
      ]);
      final DaemonInputStreamConverter converter = DaemonInputStreamConverter(inputStream);
      final Stream<DaemonMessage> outputStream = converter.convertedStream;
      final List<DaemonMessage> outputs = await outputStream.toList();
      expect(outputs, hasLength(2));
      expect(outputs[0].data, testCommand(10));
      expect(outputs[0].binary, null);
      expect(outputs[1].data, testCommand(20));
      expect(outputs[1].binary, null);
    });

    testWithoutContext('can parse multiple messages while ignoring non json data in between', () async {
      final Stream<List<int>> inputStream = Stream<List<int>>.fromIterable(<List<int>>[
        testCommandBinary(10),
        utf8.encode('This is not a json data...\n'),
        testCommandBinary(20),
      ]);
      final DaemonInputStreamConverter converter = DaemonInputStreamConverter(inputStream);
      final Stream<DaemonMessage> outputStream = converter.convertedStream;
      final List<DaemonMessage> outputs = await outputStream.toList();
      expect(outputs, hasLength(2));
      expect(outputs[0].data, testCommand(10));
      expect(outputs[0].binary, null);
      expect(outputs[1].data, testCommand(20));
      expect(outputs[1].binary, null);
    });

    testWithoutContext('can parse multiple messages even when they are split in multiple packets', () async {
      final List<int> binary1 = testCommandBinary(10);
      final List<int> binary2 = testCommandBinary(20);
      final Stream<List<int>> inputStream = Stream<List<int>>.fromIterable(<List<int>>[
        binary1.sublist(0, 5),
        binary1.sublist(5, 15),
        binary1.sublist(15) + binary2.sublist(0, 13),
        binary2.sublist(13),
      ]);
      final DaemonInputStreamConverter converter = DaemonInputStreamConverter(inputStream);
      final Stream<DaemonMessage> outputStream = converter.convertedStream;
      final List<DaemonMessage> outputs = await outputStream.toList();
      expect(outputs, hasLength(2));
      expect(outputs[0].data, testCommand(10));
      expect(outputs[0].binary, null);
      expect(outputs[1].data, testCommand(20));
      expect(outputs[1].binary, null);
    });

    testWithoutContext('can parse multiple messages even when they are combined in a single packet', () async {
      final List<int> binary1 = testCommandBinary(10);
      final List<int> binary2 = testCommandBinary(20);
      final Stream<List<int>> inputStream = Stream<List<int>>.fromIterable(<List<int>>[
        binary1 + binary2,
      ]);
      final DaemonInputStreamConverter converter = DaemonInputStreamConverter(inputStream);
      final Stream<DaemonMessage> outputStream = converter.convertedStream;
      final List<DaemonMessage> outputs = await outputStream.toList();
      expect(outputs, hasLength(2));
      expect(outputs[0].data, testCommand(10));
      expect(outputs[0].binary, null);
      expect(outputs[1].data, testCommand(20));
      expect(outputs[1].binary, null);
    });

    testWithoutContext('can parse a single message with binary stream', () async {
      final List<int> binary = <int>[1,2,3,4,5];
      final Stream<List<int>> inputStream = Stream<List<int>>.fromIterable(<List<int>>[
        testCommandBinary(10, binary.length),
        binary,
      ]);
      final DaemonInputStreamConverter converter = DaemonInputStreamConverter(inputStream);
      final Stream<DaemonMessage> outputStream = converter.convertedStream;
      final List<_DaemonMessageAndBinary> allOutputs = await _readAllBinaries(outputStream);
      expect(allOutputs, hasLength(1));
      expect(allOutputs[0].message.data, testCommand(10, binary.length));
      expect(allOutputs[0].binary, binary);
    });

    testWithoutContext('can parse a single message with binary stream when messages are combined in a single packet', () async {
      final List<int> binary = <int>[1,2,3,4,5];
      final Stream<List<int>> inputStream = Stream<List<int>>.fromIterable(<List<int>>[
        testCommandBinary(10, binary.length) + binary,
      ]);
      final DaemonInputStreamConverter converter = DaemonInputStreamConverter(inputStream);
      final Stream<DaemonMessage> outputStream = converter.convertedStream;
      final List<_DaemonMessageAndBinary> allOutputs = await _readAllBinaries(outputStream);
      expect(allOutputs, hasLength(1));
      expect(allOutputs[0].message.data, testCommand(10, binary.length));
      expect(allOutputs[0].binary, binary);
    });

    testWithoutContext('can parse multiple messages with binary stream', () async {
      final List<int> binary1 = <int>[1,2,3,4,5];
      final List<int> binary2 = <int>[6,7,8,9,10,11,12];
      final Stream<List<int>> inputStream = Stream<List<int>>.fromIterable(<List<int>>[
        testCommandBinary(10, binary1.length),
        binary1,
        testCommandBinary(20, binary2.length),
        binary2,
      ]);
      final DaemonInputStreamConverter converter = DaemonInputStreamConverter(inputStream);
      final Stream<DaemonMessage> outputStream = converter.convertedStream;
      final List<_DaemonMessageAndBinary> allOutputs = await _readAllBinaries(outputStream);
      expect(allOutputs, hasLength(2));
      expect(allOutputs[0].message.data, testCommand(10, binary1.length));
      expect(allOutputs[0].binary, binary1);
      expect(allOutputs[1].message.data, testCommand(20, binary2.length));
      expect(allOutputs[1].binary, binary2);
    });

    testWithoutContext('can parse multiple messages with binary stream when messages are split', () async {
      final List<int> binary1 = <int>[1,2,3,4,5];
      final List<int> message1 = testCommandBinary(10, binary1.length);
      final List<int> binary2 = <int>[6,7,8,9,10,11,12];
      final List<int> message2 = testCommandBinary(20, binary2.length);
      final Stream<List<int>> inputStream = Stream<List<int>>.fromIterable(<List<int>>[
        message1.sublist(0, 10),
        message1.sublist(10) + binary1 + message2.sublist(0, 5),
        message2.sublist(5) + binary2.sublist(0, 3),
        binary2.sublist(3, 5),
        binary2.sublist(5),
      ]);
      final DaemonInputStreamConverter converter = DaemonInputStreamConverter(inputStream);
      final Stream<DaemonMessage> outputStream = converter.convertedStream;
      final List<_DaemonMessageAndBinary> allOutputs = await _readAllBinaries(outputStream);
      expect(allOutputs, hasLength(2));
      expect(allOutputs[0].message.data, testCommand(10, binary1.length));
      expect(allOutputs[0].binary, binary1);
      expect(allOutputs[1].message.data, testCommand(20, binary2.length));
      expect(allOutputs[1].binary, binary2);
    });
  });

  group('DaemonStreams', () {
    final Map<String, Object?> testCommand = <String, Object?>{
      'id': 100,
      'method': 'test',
    };
    late StreamController<List<int>> inputStream;
    late StreamController<List<int>> outputStream;
    late DaemonStreams daemonStreams;
    setUp(() {
      inputStream = StreamController<List<int>>();
      outputStream = StreamController<List<int>>();
      daemonStreams = DaemonStreams(inputStream.stream, outputStream.sink, logger: bufferLogger);
    });

    testWithoutContext('parses the message received on the stream', () async {
      inputStream.add(utf8.encode('[${jsonEncode(testCommand)}]\n'));
      final DaemonMessage command = await daemonStreams.inputStream.first;
      expect(command.data, testCommand);
      expect(command.binary, null);
    });

    testWithoutContext('sends the encoded message through the sink', () async {
      daemonStreams.send(testCommand);
      final List<int> commands = await outputStream.stream.first;
      expect(commands, utf8.encode('[${jsonEncode(testCommand)}]\n'));
    });

    testWithoutContext('dispose closes the sink', () async {
      await daemonStreams.dispose();
      expect(outputStream.isClosed, true);
    });

    testWithoutContext('handles sending to a closed sink', () async {
      // Unless the stream is listened to, the call to .close() will never
      // complete
      outputStream.stream.listen((List<int> _) {});
      await outputStream.sink.close();
      daemonStreams.send(testCommand);
      expect(
        bufferLogger.errorText,
        contains(
          'Failed to write daemon command response: Bad state: Cannot add event after closing',
        ),
      );
    });
  });
}

class _DaemonMessageAndBinary {
  _DaemonMessageAndBinary(this.message, this.binary);
  final DaemonMessage message;
  final List<int>? binary;
}

Future<List<_DaemonMessageAndBinary>> _readAllBinaries(Stream<DaemonMessage> inputStream) async {
  final StreamIterator<DaemonMessage> iterator = StreamIterator<DaemonMessage>(inputStream);
  final List<_DaemonMessageAndBinary> outputs = <_DaemonMessageAndBinary>[];
  while (await iterator.moveNext()) {
    List<int>? binary;
    if (iterator.current.binary != null) {
      binary = await iterator.current.binary!.reduce((List<int> a, List<int> b) => a + b);
    }
    outputs.add(_DaemonMessageAndBinary(iterator.current, binary));
  }
  return outputs;
}