// Copyright 2014 The Flutter Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. import 'dart:async'; import 'dart:math'; import 'dart:typed_data'; import 'package:crypto/crypto.dart'; import 'package:meta/meta.dart'; import '../base/file_system.dart'; import '../build_system/hash.dart'; import '../convert.dart'; /// Adler-32 and MD5 hashes of blocks in files. class BlockHashes { BlockHashes({ required this.blockSize, required this.totalSize, required this.adler32, required this.md5, required this.fileMd5, }); /// The block size used to generate the hashes. final int blockSize; /// Total size of the file. final int totalSize; /// List of adler32 hashes of each block in the file. final List<int> adler32; /// List of MD5 hashes of each block in the file. final List<String> md5; /// MD5 hash of the whole file. final String fileMd5; Map<String, Object> toJson() => <String, Object>{ 'blockSize': blockSize, 'totalSize': totalSize, 'adler32': base64.encode(Uint8List.view(Uint32List.fromList(adler32).buffer)), 'md5': md5, 'fileMd5': fileMd5, }; static BlockHashes fromJson(Map<String, Object?> obj) { return BlockHashes( blockSize: obj['blockSize']! as int, totalSize: obj['totalSize']! as int, adler32: Uint32List.view(base64.decode(obj['adler32']! as String).buffer), md5: (obj['md5']! as List<Object>).cast<String>(), fileMd5: obj['fileMd5']! as String, ); } } /// Converts a stream of bytes, into a stream of bytes of fixed chunk size. @visibleForTesting Stream<Uint8List> convertToChunks(Stream<Uint8List> source, int chunkSize) { final BytesBuilder bytesBuilder = BytesBuilder(copy: false); final StreamController<Uint8List> controller = StreamController<Uint8List>(); final StreamSubscription<Uint8List> subscription = source.listen((Uint8List chunk) { int start = 0; while (start < chunk.length) { final int sizeToTake = min(chunkSize - bytesBuilder.length, chunk.length - start); assert(sizeToTake > 0); assert(sizeToTake <= chunkSize); final Uint8List sublist = chunk.sublist(start, start + sizeToTake); start += sizeToTake; if (bytesBuilder.isEmpty && sizeToTake == chunkSize) { controller.add(sublist); } else { bytesBuilder.add(sublist); assert(bytesBuilder.length <= chunkSize); if (bytesBuilder.length == chunkSize) { controller.add(bytesBuilder.takeBytes()); } } } }, onDone: () { if (controller.hasListener && !controller.isClosed) { if (bytesBuilder.isNotEmpty) { controller.add(bytesBuilder.takeBytes()); } controller.close(); } }, onError: (Object error, StackTrace stackTrace) { controller.addError(error, stackTrace); }); controller.onCancel = subscription.cancel; controller.onPause = subscription.pause; controller.onResume = subscription.resume; return controller.stream; } const int _adler32Prime = 65521; /// Helper function to calculate Adler32 hash of a binary. @visibleForTesting int adler32Hash(List<int> binary) { // The maximum integer that can be stored in the `int` data type. const int maxInt = 0x1fffffffffffff; // maxChunkSize is the maximum number of bytes we can sum without // performing the modulus operation, without overflow. // n * (n + 1) / 2 * 255 < maxInt // n < sqrt(maxInt / 255) - 1 final int maxChunkSize = sqrt(maxInt / 255).floor() - 1; int a = 1; int b = 0; final int length = binary.length; for (int i = 0; i < length; i += maxChunkSize) { final int end = i + maxChunkSize < length ? i + maxChunkSize : length; for (final int c in binary.getRange(i, end)) { a += c; b += a; } a %= _adler32Prime; b %= _adler32Prime; } return ((b & 0xffff) << 16) | (a & 0xffff); } /// Helper to calculate rolling Adler32 hash of a file. @visibleForTesting class RollingAdler32 { RollingAdler32(this.blockSize): _buffer = Uint8List(blockSize); /// Block size of the rolling hash calculation. final int blockSize; int processedBytes = 0; final Uint8List _buffer; int _cur = 0; int _a = 1; int _b = 0; /// The current rolling hash value. int get hash => ((_b & 0xffff) << 16) | (_a & 0xffff); /// Push a new character into the rolling chunk window, and returns the /// current hash value. int push(int char) { processedBytes++; if (processedBytes > blockSize) { final int prev = _buffer[_cur]; _b -= prev * blockSize + 1; _a -= prev; } _a += char; _b += _a; _buffer[_cur] = char; _cur++; if (_cur == blockSize) { _cur = 0; } _a %= _adler32Prime; _b %= _adler32Prime; return hash; } /// Returns a [Uint8List] of size [blockSize] that was used to calculate the /// current Adler32 hash. Uint8List currentBlock() { if (processedBytes < blockSize) { return Uint8List.sublistView(_buffer, 0, processedBytes); } else if (_cur == 0) { return _buffer; } else { final BytesBuilder builder = BytesBuilder(copy:false) ..add(Uint8List.sublistView(_buffer, _cur)) ..add(Uint8List.sublistView(_buffer, 0, _cur)); return builder.takeBytes(); } } void reset() { _a = 1; _b = 0; processedBytes = 0; } } /// Helper for rsync-like file transfer. /// /// The algorithm works as follows. /// /// First, in the destination device, calculate hashes of the every block of /// the same size. Two hashes are used, Adler-32 for the rolling hash, and MD5 /// as a hash with a lower chance of collision. /// /// The block size is chosen to balance between the amount of data required in /// the initial transmission, and the amount of data needed for rebuilding the /// file. /// /// Next, on the machine that contains the source file, we calculate the /// rolling hash of the source file, for every possible position. If the hash /// is found on the block hashes, we then compare the MD5 of the block. If both /// the Adler-32 and MD5 hash match, we consider that the block is identical. /// /// For each block that can be found, we will generate the instruction asking /// the destination machine to read block from the destination block. For /// blocks that can't be found, we will transfer the content of the blocks. /// /// On the receiving end, it will build a copy of the source file from the /// given instructions. class FileTransfer { /// Calculate hashes of blocks in the file. Future<BlockHashes> calculateBlockHashesOfFile(File file, { int? blockSize }) async { final int totalSize = await file.length(); blockSize ??= max(sqrt(totalSize).ceil(), 2560); final Stream<Uint8List> fileContentStream = file.openRead().map((List<int> chunk) => Uint8List.fromList(chunk)); final List<int> adler32Results = <int>[]; final List<String> md5Results = <String>[]; await for (final Uint8List chunk in convertToChunks(fileContentStream, blockSize)) { adler32Results.add(adler32Hash(chunk)); md5Results.add(base64.encode(md5.convert(chunk).bytes)); } // Handle whole file md5 separately. Md5Hash requires the chunk size to be a multiple of 64. final String fileMd5 = await _md5OfFile(file); return BlockHashes( blockSize: blockSize, totalSize: totalSize, adler32: adler32Results, md5: md5Results, fileMd5: fileMd5, ); } /// Compute the instructions to rebuild the source [file] with the block /// hashes of the destination file. /// /// Returns an empty list if the destination file is exactly the same as the /// source file. Future<List<FileDeltaBlock>> computeDelta(File file, BlockHashes hashes) async { // Skip computing delta if the destination file matches the source file. if (await file.length() == hashes.totalSize && await _md5OfFile(file) == hashes.fileMd5) { return <FileDeltaBlock>[]; } final Stream<List<int>> fileContentStream = file.openRead(); final int blockSize = hashes.blockSize; // Generate a lookup for adler32 hash to block index. final Map<int, List<int>> adler32ToBlockIndex = <int, List<int>>{}; for (int i = 0; i < hashes.adler32.length; i++) { (adler32ToBlockIndex[hashes.adler32[i]] ??= <int>[]).add(i); } final RollingAdler32 adler32 = RollingAdler32(blockSize); // Number of bytes read. int size = 0; // Offset of the beginning of the current block. int start = 0; final List<FileDeltaBlock> blocks = <FileDeltaBlock>[]; await for (final List<int> chunk in fileContentStream) { for (final int c in chunk) { final int hash = adler32.push(c); size++; if (size - start < blockSize) { // Ignore if we have not processed enough bytes. continue; } if (!adler32ToBlockIndex.containsKey(hash)) { // Adler32 hash of the current block does not match the destination file. continue; } // The indices of possible matching blocks. final List<int> blockIndices = adler32ToBlockIndex[hash]!; final String md5Hash = base64.encode(md5.convert(adler32.currentBlock()).bytes); // Verify if any of our findings actually matches the destination block by comparing its MD5. for (final int blockIndex in blockIndices) { if (hashes.md5[blockIndex] != md5Hash) { // Adler-32 hash collision. This is not an actual match. continue; } // Found matching entry, generate instruction for reconstructing the file. // Copy the previously unmatched data from the source file. if (size - start > blockSize) { blocks.add(FileDeltaBlock.fromSource(start: start, size: size - start - blockSize)); } start = size; // Try to extend the previous entry. if (blocks.isNotEmpty && blocks.last.copyFromDestination) { final int lastBlockIndex = (blocks.last.start + blocks.last.size) ~/ blockSize; if (hashes.md5[lastBlockIndex] == md5Hash) { // We can extend the previous entry. final FileDeltaBlock last = blocks.removeLast(); blocks.add(FileDeltaBlock.fromDestination(start: last.start, size: last.size + blockSize)); break; } } blocks.add(FileDeltaBlock.fromDestination(start: blockIndex * blockSize, size: blockSize)); break; } } } // For the remaining content that is not matched, copy from the source. if (start < size) { blocks.add(FileDeltaBlock.fromSource(start: start, size: size - start)); } return blocks; } /// Generates the binary blocks that need to be transferred to the remote /// end to regenerate the file. Future<Uint8List> binaryForRebuilding(File file, List<FileDeltaBlock> delta) async { final RandomAccessFile binaryView = await file.open(); final Iterable<FileDeltaBlock> toTransfer = delta.where((FileDeltaBlock block) => !block.copyFromDestination); final int totalSize = toTransfer.map((FileDeltaBlock i) => i.size).reduce((int a, int b) => a + b); final Uint8List buffer = Uint8List(totalSize); int start = 0; for (final FileDeltaBlock current in toTransfer) { await binaryView.setPosition(current.start); await binaryView.readInto(buffer, start, start + current.size); start += current.size; } assert(start == buffer.length); return buffer; } /// Generate the new destination file from the source file, with the /// [blocks] and [binary] stream given. Future<bool> rebuildFile(File file, List<FileDeltaBlock> delta, Stream<List<int>> binary) async { final RandomAccessFile fileView = await file.open(); // Buffer used to hold the file content in memory. final BytesBuilder buffer = BytesBuilder(copy: false); final StreamIterator<List<int>> iterator = StreamIterator<List<int>>(binary); int currentIteratorStart = -1; bool iteratorMoveNextReturnValue = true; for (final FileDeltaBlock current in delta) { if (current.copyFromDestination) { await fileView.setPosition(current.start); buffer.add(await fileView.read(current.size)); } else { int toRead = current.size; while (toRead > 0) { if (currentIteratorStart >= 0 && currentIteratorStart < iterator.current.length) { final int size = iterator.current.length - currentIteratorStart; final int sizeToRead = min(toRead, size); buffer.add(iterator.current.sublist(currentIteratorStart, currentIteratorStart + sizeToRead)); currentIteratorStart += sizeToRead; toRead -= sizeToRead; } else { currentIteratorStart = 0; iteratorMoveNextReturnValue = await iterator.moveNext(); } } } } await file.writeAsBytes(buffer.takeBytes(), flush: true); // Drain the stream iterator if needed. while (iteratorMoveNextReturnValue) { iteratorMoveNextReturnValue = await iterator.moveNext(); } return true; } Future<String> _md5OfFile(File file) async { final Md5Hash fileMd5Hash = Md5Hash(); await file.openRead().forEach((List<int> chunk) => fileMd5Hash.addChunk(Uint8List.fromList(chunk))); return base64.encode(fileMd5Hash.finalize().buffer.asUint8List()); } } /// Represents a single line of instruction on how to generate the target file. @immutable class FileDeltaBlock { const FileDeltaBlock.fromSource({required this.start, required this.size}): copyFromDestination = false; const FileDeltaBlock.fromDestination({required this.start, required this.size}): copyFromDestination = true; /// If true, this block should be read from the destination file. final bool copyFromDestination; /// The size of the current block. final int size; /// Byte offset in the destination file from which the block should be read. final int start; Map<String, Object> toJson() => <String, Object> { if (copyFromDestination) 'start': start, 'size': size, }; static List<FileDeltaBlock> fromJsonList(List<Map<String, Object?>> jsonList) { return jsonList.map((Map<String, Object?> json) { if (json.containsKey('start')) { return FileDeltaBlock.fromDestination(start: json['start']! as int, size: json['size']! as int); } else { // The start position does not matter on the destination machine. return FileDeltaBlock.fromSource(start: 0, size: json['size']! as int); } }).toList(); } @override bool operator ==(Object other) { if (other is! FileDeltaBlock) { return false; } return other.copyFromDestination == copyFromDestination && other.size == size && other.start == start; } @override int get hashCode => Object.hash(copyFromDestination, size, start); }