// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

import 'dart:async';
import 'dart:math';
import 'dart:typed_data';

import 'package:crypto/crypto.dart';
import 'package:meta/meta.dart';

import '../base/file_system.dart';
import '../build_system/hash.dart';
import '../convert.dart';

/// Adler-32 and MD5 hashes of blocks in files.
class BlockHashes {
  BlockHashes({
    required this.blockSize,
    required this.totalSize,
    required this.adler32,
    required this.md5,
    required this.fileMd5,
  });

  /// The block size used to generate the hashes.
  final int blockSize;

  /// Total size of the file.
  final int totalSize;

  /// List of adler32 hashes of each block in the file.
  final List<int> adler32;

  /// List of MD5 hashes of each block in the file.
  final List<String> md5;

  /// MD5 hash of the whole file.
  final String fileMd5;

  Map<String, Object> toJson() => <String, Object>{
    'blockSize': blockSize,
    'totalSize': totalSize,
    'adler32': base64.encode(Uint8List.view(Uint32List.fromList(adler32).buffer)),
    'md5': md5,
    'fileMd5': fileMd5,
  };

  static BlockHashes fromJson(Map<String, Object?> obj) {
    return BlockHashes(
      blockSize: obj['blockSize']! as int,
      totalSize: obj['totalSize']! as int,
      adler32: Uint32List.view(base64.decode(obj['adler32']! as String).buffer),
      md5: (obj['md5']! as List<Object>).cast<String>(),
      fileMd5: obj['fileMd5']! as String,
    );
  }
}

/// Converts a stream of bytes, into a stream of bytes of fixed chunk size.
@visibleForTesting
Stream<Uint8List> convertToChunks(Stream<Uint8List> source, int chunkSize) {
  final BytesBuilder bytesBuilder = BytesBuilder(copy: false);
  final StreamController<Uint8List> controller = StreamController<Uint8List>();
  final StreamSubscription<Uint8List> subscription = source.listen((Uint8List chunk) {
    int start = 0;
    while (start < chunk.length) {
      final int sizeToTake = min(chunkSize - bytesBuilder.length, chunk.length - start);
      assert(sizeToTake > 0);
      assert(sizeToTake <= chunkSize);

      final Uint8List sublist = chunk.sublist(start, start + sizeToTake);
      start += sizeToTake;

      if (bytesBuilder.isEmpty && sizeToTake == chunkSize) {
        controller.add(sublist);
      } else {
        bytesBuilder.add(sublist);
        assert(bytesBuilder.length <= chunkSize);
        if (bytesBuilder.length == chunkSize) {
          controller.add(bytesBuilder.takeBytes());
        }
      }
    }
  }, onDone: () {
    if (controller.hasListener && !controller.isClosed) {
      if (bytesBuilder.isNotEmpty) {
        controller.add(bytesBuilder.takeBytes());
      }
      controller.close();
    }
  }, onError: (Object error, StackTrace stackTrace) {
    controller.addError(error, stackTrace);
  });

  controller.onCancel = subscription.cancel;
  controller.onPause = subscription.pause;
  controller.onResume = subscription.resume;

  return controller.stream;
}

const int _adler32Prime = 65521;

/// Helper function to calculate Adler32 hash of a binary.
@visibleForTesting
int adler32Hash(List<int> binary) {
  // The maximum integer that can be stored in the `int` data type.
  const int maxInt = 0x1fffffffffffff;
  // maxChunkSize is the maximum number of bytes we can sum without
  // performing the modulus operation, without overflow.
  // n * (n + 1) / 2 * 255 < maxInt
  // n < sqrt(maxInt / 255) - 1
  final int maxChunkSize = sqrt(maxInt / 255).floor() - 1;

  int a = 1;
  int b = 0;

  final int length = binary.length;
  for (int i = 0; i < length; i += maxChunkSize) {
    final int end = i + maxChunkSize < length ? i + maxChunkSize : length;
    for (final int c in binary.getRange(i, end)) {
      a += c;
      b += a;
    }
    a %= _adler32Prime;
    b %= _adler32Prime;
  }

  return ((b & 0xffff) << 16) | (a & 0xffff);
}

/// Helper to calculate rolling Adler32 hash of a file.
@visibleForTesting
class RollingAdler32 {
  RollingAdler32(this.blockSize): _buffer = Uint8List(blockSize);

  /// Block size of the rolling hash calculation.
  final int blockSize;

  int processedBytes = 0;

  final Uint8List _buffer;
  int _cur = 0;
  int _a = 1;
  int _b = 0;

  /// The current rolling hash value.
  int get hash => ((_b & 0xffff) << 16) | (_a & 0xffff);

  /// Push a new character into the rolling chunk window, and returns the
  /// current hash value.
  int push(int char) {
    processedBytes++;

    if (processedBytes > blockSize) {
      final int prev = _buffer[_cur];
      _b -= prev * blockSize + 1;
      _a -= prev;
    }

    _a += char;
    _b += _a;

    _buffer[_cur] = char;
    _cur++;
    if (_cur == blockSize) {
      _cur = 0;
    }

    _a %= _adler32Prime;
    _b %= _adler32Prime;

    return hash;
  }

  /// Returns a [Uint8List] of size [blockSize] that was used to calculate the
  /// current Adler32 hash.
  Uint8List currentBlock() {
    if (processedBytes < blockSize) {
      return Uint8List.sublistView(_buffer, 0, processedBytes);
    } else if (_cur == 0) {
      return _buffer;
    } else {
      final BytesBuilder builder = BytesBuilder(copy:false)
        ..add(Uint8List.sublistView(_buffer, _cur))
        ..add(Uint8List.sublistView(_buffer, 0, _cur));
      return builder.takeBytes();
    }
  }

  void reset() {
    _a = 1;
    _b = 0;
    processedBytes = 0;
  }
}

/// Helper for rsync-like file transfer.
///
/// The algorithm works as follows.
///
/// First, in the destination device, calculate hashes of the every block of
/// the same size. Two hashes are used, Adler-32 for the rolling hash, and MD5
/// as a hash with a lower chance of collision.
///
/// The block size is chosen to balance between the amount of data required in
/// the initial transmission, and the amount of data needed for rebuilding the
/// file.
///
/// Next, on the machine that contains the source file, we calculate the
/// rolling hash of the source file, for every possible position. If the hash
/// is found on the block hashes, we then compare the MD5 of the block. If both
/// the Adler-32 and MD5 hash match, we consider that the block is identical.
///
/// For each block that can be found, we will generate the instruction asking
/// the destination machine to read block from the destination block. For
/// blocks that can't be found, we will transfer the content of the blocks.
///
/// On the receiving end, it will build a copy of the source file from the
/// given instructions.
class FileTransfer {
  /// Calculate hashes of blocks in the file.
  Future<BlockHashes> calculateBlockHashesOfFile(File file, { int? blockSize }) async {
    final int totalSize = await file.length();
    blockSize ??= max(sqrt(totalSize).ceil(), 2560);

    final Stream<Uint8List> fileContentStream = file.openRead().map((List<int> chunk) => Uint8List.fromList(chunk));

    final List<int> adler32Results = <int>[];
    final List<String> md5Results = <String>[];
    await for (final Uint8List chunk in convertToChunks(fileContentStream, blockSize)) {
      adler32Results.add(adler32Hash(chunk));
      md5Results.add(base64.encode(md5.convert(chunk).bytes));
    }

    // Handle whole file md5 separately. Md5Hash requires the chunk size to be a multiple of 64.
    final String fileMd5 = await _md5OfFile(file);

    return BlockHashes(
      blockSize: blockSize,
      totalSize: totalSize,
      adler32: adler32Results,
      md5: md5Results,
      fileMd5: fileMd5,
    );
  }

  /// Compute the instructions to rebuild the source [file] with the block
  /// hashes of the destination file.
  ///
  /// Returns an empty list if the destination file is exactly the same as the
  /// source file.
  Future<List<FileDeltaBlock>> computeDelta(File file, BlockHashes hashes) async {
    // Skip computing delta if the destination file matches the source file.
    if (await file.length() == hashes.totalSize && await _md5OfFile(file) == hashes.fileMd5) {
      return <FileDeltaBlock>[];
    }

    final Stream<List<int>> fileContentStream = file.openRead();
    final int blockSize = hashes.blockSize;

    // Generate a lookup for adler32 hash to block index.
    final Map<int, List<int>> adler32ToBlockIndex = <int, List<int>>{};
    for (int i = 0; i < hashes.adler32.length; i++) {
      (adler32ToBlockIndex[hashes.adler32[i]] ??= <int>[]).add(i);
    }

    final RollingAdler32 adler32 = RollingAdler32(blockSize);

    // Number of bytes read.
    int size = 0;

    // Offset of the beginning of the current block.
    int start = 0;

    final List<FileDeltaBlock> blocks = <FileDeltaBlock>[];

    await for (final List<int> chunk in fileContentStream) {
      for (final int c in chunk) {
        final int hash = adler32.push(c);
        size++;

        if (size - start < blockSize) {
          // Ignore if we have not processed enough bytes.
          continue;
        }

        if (!adler32ToBlockIndex.containsKey(hash)) {
          // Adler32 hash of the current block does not match the destination file.
          continue;
        }

        // The indices of possible matching blocks.
        final List<int> blockIndices = adler32ToBlockIndex[hash]!;
        final String md5Hash = base64.encode(md5.convert(adler32.currentBlock()).bytes);

        // Verify if any of our findings actually matches the destination block by comparing its MD5.
        for (final int blockIndex in blockIndices) {
          if (hashes.md5[blockIndex] != md5Hash) {
            // Adler-32 hash collision. This is not an actual match.
            continue;
          }

          // Found matching entry, generate instruction for reconstructing the file.

          // Copy the previously unmatched data from the source file.
          if (size - start > blockSize) {
            blocks.add(FileDeltaBlock.fromSource(start: start, size: size - start - blockSize));
          }

          start = size;

          // Try to extend the previous entry.
          if (blocks.isNotEmpty && blocks.last.copyFromDestination) {
            final int lastBlockIndex = (blocks.last.start + blocks.last.size) ~/ blockSize;
            if (hashes.md5[lastBlockIndex] == md5Hash) {
              // We can extend the previous entry.
              final FileDeltaBlock last = blocks.removeLast();
              blocks.add(FileDeltaBlock.fromDestination(start: last.start, size: last.size + blockSize));
              break;
            }
          }

          blocks.add(FileDeltaBlock.fromDestination(start: blockIndex * blockSize, size: blockSize));
          break;
        }
      }
    }

    // For the remaining content that is not matched, copy from the source.
    if (start < size) {
      blocks.add(FileDeltaBlock.fromSource(start: start, size: size - start));
    }

    return blocks;
  }

  /// Generates the binary blocks that need to be transferred to the remote
  /// end to regenerate the file.
  Future<Uint8List> binaryForRebuilding(File file, List<FileDeltaBlock> delta) async {
    final RandomAccessFile binaryView = await file.open();
    final Iterable<FileDeltaBlock> toTransfer = delta.where((FileDeltaBlock block) => !block.copyFromDestination);
    final int totalSize = toTransfer.map((FileDeltaBlock i) => i.size).reduce((int a, int b) => a + b);
    final Uint8List buffer = Uint8List(totalSize);
    int start = 0;
    for (final FileDeltaBlock current in toTransfer) {
      await binaryView.setPosition(current.start);
      await binaryView.readInto(buffer, start, start + current.size);
      start += current.size;
    }

    assert(start == buffer.length);

    return buffer;
  }

  /// Generate the new destination file from the source file, with the
  /// [blocks] and [binary] stream given.
  Future<bool> rebuildFile(File file, List<FileDeltaBlock> delta, Stream<List<int>> binary) async {
    final RandomAccessFile fileView = await file.open();

    // Buffer used to hold the file content in memory.
    final BytesBuilder buffer = BytesBuilder(copy: false);

    final StreamIterator<List<int>> iterator = StreamIterator<List<int>>(binary);
    int currentIteratorStart = -1;

    bool iteratorMoveNextReturnValue = true;

    for (final FileDeltaBlock current in delta) {
      if (current.copyFromDestination) {
        await fileView.setPosition(current.start);
        buffer.add(await fileView.read(current.size));
      } else {
        int toRead = current.size;
        while (toRead > 0) {
          if (currentIteratorStart >= 0 && currentIteratorStart < iterator.current.length) {
            final int size = iterator.current.length - currentIteratorStart;
            final int sizeToRead = min(toRead, size);
            buffer.add(iterator.current.sublist(currentIteratorStart, currentIteratorStart + sizeToRead));
            currentIteratorStart += sizeToRead;
            toRead -= sizeToRead;
          } else {
            currentIteratorStart = 0;
            iteratorMoveNextReturnValue = await iterator.moveNext();
          }
        }
      }
    }

    await file.writeAsBytes(buffer.takeBytes(), flush: true);

    // Drain the stream iterator if needed.
    while (iteratorMoveNextReturnValue) {
      iteratorMoveNextReturnValue = await iterator.moveNext();
    }

    return true;
  }

  Future<String> _md5OfFile(File file) async {
    final Md5Hash fileMd5Hash = Md5Hash();
    await file.openRead().forEach((List<int> chunk) => fileMd5Hash.addChunk(Uint8List.fromList(chunk)));
    return base64.encode(fileMd5Hash.finalize().buffer.asUint8List());
  }
}

/// Represents a single line of instruction on how to generate the target file.
@immutable
class FileDeltaBlock {
  const FileDeltaBlock.fromSource({required this.start, required this.size}): copyFromDestination = false;
  const FileDeltaBlock.fromDestination({required this.start, required this.size}): copyFromDestination = true;

  /// If true, this block should be read from the destination file.
  final bool copyFromDestination;

  /// The size of the current block.
  final int size;

  /// Byte offset in the destination file from which the block should be read.
  final int start;

  Map<String, Object> toJson() => <String, Object> {
    if (copyFromDestination)
      'start': start,
    'size': size,
  };

  static List<FileDeltaBlock> fromJsonList(List<Map<String, Object?>> jsonList) {
    return jsonList.map((Map<String, Object?> json) {
      if (json.containsKey('start')) {
        return FileDeltaBlock.fromDestination(start: json['start']! as int, size: json['size']! as int);
      } else {
        // The start position does not matter on the destination machine.
        return FileDeltaBlock.fromSource(start: 0, size: json['size']! as int);
      }
    }).toList();
  }

  @override
  bool operator ==(Object other) {
    if (other is! FileDeltaBlock) {
      return false;
    }
    return other.copyFromDestination == copyFromDestination && other.size == size && other.start == start;
  }

  @override
  int get hashCode => Object.hash(copyFromDestination, size, start);
}