// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

import 'dart:async';
import 'dart:typed_data';

import 'package:meta/meta.dart';

import 'base/common.dart';
import 'base/io.dart';
import 'base/logger.dart';
import 'base/utils.dart';
import 'convert.dart';

/// A single message passed through the [DaemonConnection].
class DaemonMessage {
  DaemonMessage(this.data, [this.binary]);

  /// Content of the JSON message in the message.
  final Map<String, Object?> data;

  /// Stream of the binary content of the message.
  ///
  /// Must be listened to if binary data is present.
  final Stream<List<int>>? binary;
}

/// Data of an event passed through the [DaemonConnection].
class DaemonEventData {
  DaemonEventData(this.eventName, this.data, [this.binary]);

  /// The name of the event.
  final String eventName;

  /// The data of the event.
  final Object? data;

  /// Stream of the binary content of the event.
  ///
  /// Must be listened to if binary data is present.
  final Stream<List<int>>? binary;
}

const String _binaryLengthKey = '_binaryLength';

enum _InputStreamParseState {
  json,
  binary,
}

/// Converts a binary stream to a stream of [DaemonMessage].
///
/// The daemon JSON-RPC protocol is defined as follows: every single line of
/// text that starts with `[{` and ends with `}]` will be parsed as a JSON
/// message. The array should contain only one single object which contains the
/// message data.
///
/// If the JSON object contains the key [_binaryLengthKey] with an integer
/// value (will be refered to as N), the following N bytes after the newline
/// character will contain the binary part of the message.
@visibleForTesting
class DaemonInputStreamConverter {
  DaemonInputStreamConverter(this.inputStream) {
    // Lazily listen to the input stream.
    _controller.onListen = () {
      final StreamSubscription<List<int>> subscription = inputStream.listen((List<int> chunk) {
        _processChunk(chunk);
      }, onError: (Object error, StackTrace stackTrace) {
        _controller.addError(error, stackTrace);
      }, onDone: () {
        unawaited(_controller.close());
      });

      _controller.onCancel = subscription.cancel;
      // We should not handle onPause or onResume. When the stream is paused, we
      // still need to read from the input stream.
    };
  }

  final Stream<List<int>> inputStream;

  final StreamController<DaemonMessage> _controller = StreamController<DaemonMessage>();
  Stream<DaemonMessage> get convertedStream => _controller.stream;

  // Internal states
  /// The current parse state, whether we are expecting JSON or binary data.
  _InputStreamParseState state = _InputStreamParseState.json;

  /// The binary stream that is being transferred.
  late StreamController<List<int>> currentBinaryStream;

  /// Remaining length in bytes that have to be sent to the binary stream.
  int remainingBinaryLength = 0;

  /// Buffer to hold the current line of input data.
  final BytesBuilder bytesBuilder = BytesBuilder(copy: false);

  // Processes a single chunk received in the input stream.
  void _processChunk(List<int> chunk) {

    int start = 0;
    while (start < chunk.length) {
      if (state == _InputStreamParseState.json) {
        start += _processChunkInJsonMode(chunk, start);
      } else if (state == _InputStreamParseState.binary) {
        final int bytesSent = _addBinaryChunk(chunk, start, remainingBinaryLength);
        start += bytesSent;
        remainingBinaryLength -= bytesSent;

        if (remainingBinaryLength <= 0) {
          assert(remainingBinaryLength == 0);

          unawaited(currentBinaryStream.close());
          state = _InputStreamParseState.json;
        }
      }
    }
  }

  /// Processes a chunk in JSON mode, and returns the number of bytes processed.
  int _processChunkInJsonMode(List<int> chunk, int start) {
    const int LF = 10; // The '\n' character

    // Search for newline character.
    final int indexOfNewLine = chunk.indexOf(LF, start);
    if (indexOfNewLine < 0) {
      bytesBuilder.add(chunk.sublist(start));
      return chunk.length - start;
    }

    bytesBuilder.add(chunk.sublist(start, indexOfNewLine + 1));

    // Process chunk here
    final Uint8List combinedChunk = bytesBuilder.takeBytes();
    String jsonString = utf8.decode(combinedChunk).trim();
    if (jsonString.startsWith('[{') && jsonString.endsWith('}]')) {
      jsonString = jsonString.substring(1, jsonString.length - 1);
      final Map<String, Object?>? value = castStringKeyedMap(json.decode(jsonString));
      if (value != null) {
        // Check if we need to consume another binary blob.
        if (value[_binaryLengthKey] != null) {
          remainingBinaryLength = value[_binaryLengthKey]! as int;
          currentBinaryStream = StreamController<List<int>>();
          state = _InputStreamParseState.binary;
          _controller.add(DaemonMessage(value, currentBinaryStream.stream));
        } else {
          _controller.add(DaemonMessage(value));
        }
      }
    }

    return indexOfNewLine + 1 - start;
  }

  int _addBinaryChunk(List<int> chunk, int start, int maximumSizeToRead) {
    if (start == 0 && chunk.length <= remainingBinaryLength) {
      currentBinaryStream.add(chunk);
      return chunk.length;
    } else {
      final int chunkRemainingLength = chunk.length - start;
      final int sizeToRead = chunkRemainingLength < remainingBinaryLength ? chunkRemainingLength : remainingBinaryLength;
      currentBinaryStream.add(chunk.sublist(start, start + sizeToRead));
      return sizeToRead;
    }
  }
}

/// A stream that a [DaemonConnection] uses to communicate with each other.
class DaemonStreams {
  DaemonStreams(
    Stream<List<int>> rawInputStream,
    StreamSink<List<int>> outputSink, {
    required Logger logger,
  }) :
    _outputSink = outputSink,
    inputStream = DaemonInputStreamConverter(rawInputStream).convertedStream,
    _logger = logger;

  /// Creates a [DaemonStreams] that uses stdin and stdout as the underlying streams.
  DaemonStreams.fromStdio(Stdio stdio, { required Logger logger })
    : this(stdio.stdin, stdio.stdout, logger: logger);

  /// Creates a [DaemonStreams] that uses [Socket] as the underlying streams.
  DaemonStreams.fromSocket(Socket socket, { required Logger logger })
    : this(socket, socket, logger: logger);

  /// Connects to a server and creates a [DaemonStreams] from the connection as the underlying streams.
  factory DaemonStreams.connect(String host, int port, { required Logger logger }) {
    final Future<Socket> socketFuture = Socket.connect(host, port);
    final StreamController<List<int>> inputStreamController = StreamController<List<int>>();
    final StreamController<List<int>> outputStreamController = StreamController<List<int>>();
    socketFuture.then((Socket socket) {
      inputStreamController.addStream(socket);
      socket.addStream(outputStreamController.stream);
    }).onError((Object error, StackTrace stackTrace) {
      logger.printError('Socket error: $error');
      logger.printTrace('$stackTrace');
      // Propagate the error to the streams.
      inputStreamController.addError(error, stackTrace);
      unawaited(outputStreamController.close());
    });
    return DaemonStreams(inputStreamController.stream, outputStreamController.sink, logger: logger);
  }

  final StreamSink<List<int>> _outputSink;
  final Logger _logger;

  /// Stream that contains input to the [DaemonConnection].
  final Stream<DaemonMessage> inputStream;

  /// Outputs a message through the connection.
  void send(Map<String, Object?> message, [ List<int>? binary ]) {
    try {
      if (binary != null) {
        message[_binaryLengthKey] = binary.length;
      }
      _outputSink.add(utf8.encode('[${json.encode(message)}]\n'));
      if (binary != null) {
        _outputSink.add(binary);
      }
    } on StateError catch (error) {
      _logger.printError('Failed to write daemon command response: $error');
      // Failed to send, close the connection
      _outputSink.close();
    } on IOException catch (error) {
      _logger.printError('Failed to write daemon command response: $error');
      // Failed to send, close the connection
      _outputSink.close();
    }
  }

  /// Cleans up any resources used.
  Future<void> dispose() async {
    unawaited(_outputSink.close());
  }
}

/// Connection between a flutter daemon and a client.
class DaemonConnection {
  DaemonConnection({
    required DaemonStreams daemonStreams,
    required Logger logger,
  }): _logger = logger,
      _daemonStreams = daemonStreams {
    _commandSubscription = daemonStreams.inputStream.listen(
      _handleMessage,
      onError: (Object error, StackTrace stackTrace) {
        // We have to listen for on error otherwise the error on the socket
        // will end up in the Zone error handler.
        // Do nothing here and let the stream close handlers handle shutting
        // down the daemon.
      }
    );
  }

  final DaemonStreams _daemonStreams;

  final Logger _logger;

  late final StreamSubscription<DaemonMessage> _commandSubscription;

  int _outgoingRequestId = 0;
  final Map<String, Completer<Object?>> _outgoingRequestCompleters = <String, Completer<Object?>>{};

  final StreamController<DaemonEventData> _events = StreamController<DaemonEventData>.broadcast();
  final StreamController<DaemonMessage> _incomingCommands = StreamController<DaemonMessage>();

  /// A stream that contains all the incoming requests.
  Stream<DaemonMessage> get incomingCommands => _incomingCommands.stream;

  /// Listens to the event with the event name [eventToListen].
  Stream<DaemonEventData> listenToEvent(String eventToListen) {
    return _events.stream
      .where((DaemonEventData event) => event.eventName == eventToListen);
  }

  /// Sends a request to the other end of the connection.
  ///
  /// Returns a [Future] that resolves with the content.
  Future<Object?> sendRequest(String method, [Object? params, List<int>? binary]) async {
    final String id = '${++_outgoingRequestId}';
    final Completer<Object?> completer = Completer<Object?>();
    _outgoingRequestCompleters[id] = completer;
    final Map<String, Object?> data = <String, Object?>{
      'id': id,
      'method': method,
      if (params != null) 'params': params,
    };
    _logger.printTrace('-> Sending to daemon, id = $id, method = $method');
    _daemonStreams.send(data, binary);
    return completer.future;
  }

  /// Sends a response to the other end of the connection.
  void sendResponse(Object id, [Object? result]) {
    _daemonStreams.send(<String, Object?>{
      'id': id,
      if (result != null) 'result': result,
    });
  }

  /// Sends an error response to the other end of the connection.
  void sendErrorResponse(Object id, Object? error, StackTrace trace) {
    _daemonStreams.send(<String, Object?>{
      'id': id,
      'error': error,
      'trace': '$trace',
    });
  }

  /// Sends an event to the client.
  void sendEvent(String name, [ Object? params, List<int>? binary ]) {
    _daemonStreams.send(<String, Object?>{
      'event': name,
      if (params != null) 'params': params,
    }, binary);
  }

  /// Handles the input from the stream.
  ///
  /// There are three kinds of data: Request, Response, Event.
  ///
  /// Request:
  /// {"id": <Object>. "method": <String>, "params": <optional, Object?>}
  ///
  /// Response:
  /// {"id": <Object>. "result": <optional, Object?>} for a successful response.
  /// {"id": <Object>. "error": <Object>, "stackTrace": <String>} for an error response.
  ///
  /// Event:
  /// {"event": <String>. "params": <optional, Object?>}
  void _handleMessage(DaemonMessage message) {
    final Map<String, Object?> data = message.data;
    if (data['id'] != null) {
      if (data['method'] == null) {
        // This is a response to previously sent request.
        final String id = data['id']! as String;
        if (data['error'] != null) {
          // This is an error response.
          _logger.printTrace('<- Error response received from daemon, id = $id');
          final Object error = data['error']!;
          final String stackTrace = data['stackTrace'] as String? ?? '';
          _outgoingRequestCompleters.remove(id)?.completeError(error, StackTrace.fromString(stackTrace));
        } else {
          _logger.printTrace('<- Response received from daemon, id = $id');
          final Object? result = data['result'];
          _outgoingRequestCompleters.remove(id)?.complete(result);
        }
      } else {
        _incomingCommands.add(message);
      }
    } else if (data['event'] != null) {
      // This is an event
      _logger.printTrace('<- Event received: ${data['event']}');
      final Object? eventName = data['event'];
      if (eventName is String) {
        _events.add(DaemonEventData(
          eventName,
          data['params'],
          message.binary,
        ));
      } else {
        throwToolExit('event name received is not string!');
      }
    } else {
      _logger.printError('Unknown data received from daemon');
    }
  }

  /// Cleans up any resources used in the connection.
  Future<void> dispose() async {
    await _commandSubscription.cancel();
    await _daemonStreams.dispose();
    unawaited(_events.close());
    unawaited(_incomingCommands.close());
  }
}