Unverified Commit c1caa24a authored by Lau Ching Jun's avatar Lau Ching Jun Committed by GitHub

Optimize file transfer when using proxied devices. (#139968)

List of changes:
1. Optimizations in FileTransfer. a. Use `stream.forEach` instead of `await for`. b. Type cast `List<int>` to `Uint8List` instead of using `Uint8List.fromList` results in (presumably) fewer copy and faster execution. c. Iterate through `Uint8List` with regular for loop instead of for-in loop.
2. Precache the block hashes of a file, and reuse it on subsequent runs.
parent a9c40a2a
......@@ -156,6 +156,7 @@ class Daemon {
this.connection, {
this.notifyingLogger,
this.logToStdout = false,
FileTransfer fileTransfer = const FileTransfer(),
}) {
// Set up domains.
registerDomain(daemonDomain = DaemonDomain(this));
......@@ -163,7 +164,7 @@ class Daemon {
registerDomain(deviceDomain = DeviceDomain(this));
registerDomain(emulatorDomain = EmulatorDomain(this));
registerDomain(devToolsDomain = DevToolsDomain(this));
registerDomain(proxyDomain = ProxyDomain(this));
registerDomain(proxyDomain = ProxyDomain(this, fileTransfer: fileTransfer));
// Start listening.
_commandSubscription = connection.incomingCommands.listen(
......@@ -1412,7 +1413,10 @@ class EmulatorDomain extends Domain {
}
class ProxyDomain extends Domain {
ProxyDomain(Daemon daemon) : super(daemon, 'proxy') {
ProxyDomain(Daemon daemon, {
required FileTransfer fileTransfer,
}) : _fileTransfer = fileTransfer,
super(daemon, 'proxy') {
registerHandlerWithBinary('writeTempFile', writeTempFile);
registerHandler('calculateFileHashes', calculateFileHashes);
registerHandlerWithBinary('updateFile', updateFile);
......@@ -1421,6 +1425,8 @@ class ProxyDomain extends Domain {
registerHandlerWithBinary('write', write);
}
final FileTransfer _fileTransfer;
final Map<String, Socket> _forwardedConnections = <String, Socket>{};
int _id = 0;
......@@ -1435,12 +1441,26 @@ class ProxyDomain extends Domain {
/// Calculate rolling hashes for a file in the local temporary directory.
Future<Map<String, Object?>?> calculateFileHashes(Map<String, Object?> args) async {
final String path = _getStringArg(args, 'path', required: true)!;
final bool cacheResult = _getBoolArg(args, 'cacheResult') ?? false;
final File file = tempDirectory.childFile(path);
if (!await file.exists()) {
return null;
}
final BlockHashes result = await FileTransfer().calculateBlockHashesOfFile(file);
return result.toJson();
final File hashFile = file.parent.childFile('${file.basename}.hashes');
if (hashFile.existsSync() && hashFile.statSync().modified.isAfter(file.statSync().modified)) {
// If the cached hash file is newer than the file, assume that the cached
// is up to date. Return the cached result directly.
final String cachedJson = await hashFile.readAsString();
return json.decode(cachedJson) as Map<String, Object?>;
}
final BlockHashes result = await _fileTransfer.calculateBlockHashesOfFile(file);
final Map<String, Object?> resultObject = result.toJson();
if (cacheResult) {
await hashFile.writeAsString(json.encode(resultObject));
}
return resultObject;
}
Future<bool?> updateFile(Map<String, Object?> args, Stream<List<int>>? binary) async {
......@@ -1451,7 +1471,7 @@ class ProxyDomain extends Domain {
}
final List<Map<String, Object?>> deltaJson = (args['delta']! as List<Object?>).cast<Map<String, Object?>>();
final List<FileDeltaBlock> delta = FileDeltaBlock.fromJsonList(deltaJson);
final bool result = await FileTransfer().rebuildFile(file, delta, binary!);
final bool result = await _fileTransfer.rebuildFile(file, delta, binary!);
return result;
}
......
......@@ -41,9 +41,11 @@ class ProxiedDevices extends PollingDeviceDiscovery {
bool deltaFileTransfer = true,
bool enableDdsProxy = false,
required Logger logger,
FileTransfer fileTransfer = const FileTransfer(),
}) : _deltaFileTransfer = deltaFileTransfer,
_enableDdsProxy = enableDdsProxy,
_logger = logger,
_fileTransfer = fileTransfer,
super('Proxied devices');
/// [DaemonConnection] used to communicate with the daemon.
......@@ -55,6 +57,8 @@ class ProxiedDevices extends PollingDeviceDiscovery {
final bool _enableDdsProxy;
final FileTransfer _fileTransfer;
@override
bool get supportsPlatform => true;
......@@ -117,6 +121,7 @@ class ProxiedDevices extends PollingDeviceDiscovery {
supportsFastStart: _cast<bool>(capabilities['fastStart']),
supportsHardwareRendering: _cast<bool>(capabilities['hardwareRendering']),
logger: _logger,
fileTransfer: _fileTransfer,
);
}
}
......@@ -149,6 +154,7 @@ class ProxiedDevice extends Device {
required this.supportsFastStart,
required bool supportsHardwareRendering,
required Logger logger,
FileTransfer fileTransfer = const FileTransfer(),
}): _deltaFileTransfer = deltaFileTransfer,
_enableDdsProxy = enableDdsProxy,
_isLocalEmulator = isLocalEmulator,
......@@ -157,6 +163,7 @@ class ProxiedDevice extends Device {
_supportsHardwareRendering = supportsHardwareRendering,
_targetPlatform = targetPlatform,
_logger = logger,
_fileTransfer = fileTransfer,
super(id,
category: category,
platformType: platformType,
......@@ -171,6 +178,8 @@ class ProxiedDevice extends Device {
final bool _enableDdsProxy;
final FileTransfer _fileTransfer;
@override
final String name;
......@@ -359,7 +368,7 @@ class ProxiedDevice extends Device {
Map<String, Object?>? rollingHashResultJson;
if (_deltaFileTransfer) {
rollingHashResultJson = _cast<Map<String, Object?>?>(await connection.sendRequest('proxy.calculateFileHashes', args));
rollingHashResultJson = _cast<Map<String, Object?>?>(await connection.sendRequest('proxy.calculateFileHashes', args));
}
if (rollingHashResultJson == null) {
......@@ -371,12 +380,12 @@ class ProxiedDevice extends Device {
await connection.sendRequest('proxy.writeTempFile', args, await binary.readAsBytes());
} else {
final BlockHashes rollingHashResult = BlockHashes.fromJson(rollingHashResultJson);
final List<FileDeltaBlock> delta = await FileTransfer().computeDelta(binary, rollingHashResult);
final List<FileDeltaBlock> delta = await _fileTransfer.computeDelta(binary, rollingHashResult);
// Delta is empty if the file does not need to be updated
if (delta.isNotEmpty) {
final List<Map<String, Object>> deltaJson = delta.map((FileDeltaBlock block) => block.toJson()).toList();
final Uint8List buffer = await FileTransfer().binaryForRebuilding(binary, delta);
final Uint8List buffer = await _fileTransfer.binaryForRebuilding(binary, delta);
await connection.sendRequest('proxy.updateFile', <String, Object>{
'path': fileName,
......@@ -385,6 +394,19 @@ class ProxiedDevice extends Device {
}
}
if (_deltaFileTransfer) {
// Ask the daemon to precache the hash content for subsequent runs.
// Wait for several seconds for the app to be launched, to not interfere
// with whatever the daemon is doing.
unawaited(() async {
await Future<void>.delayed(const Duration(seconds: 60));
await connection.sendRequest('proxy.calculateFileHashes', <String, Object>{
'path': fileName,
'cacheResult': true,
});
}());
}
final String id = _cast<String>(await connection.sendRequest('device.uploadApplicationPackage', <String, Object>{
'targetPlatform': getNameForTargetPlatform(_targetPlatform),
'applicationBinary': fileName,
......
......@@ -104,7 +104,7 @@ const int _adler32Prime = 65521;
/// Helper function to calculate Adler32 hash of a binary.
@visibleForTesting
int adler32Hash(List<int> binary) {
int adler32Hash(Uint8List binary) {
// The maximum integer that can be stored in the `int` data type.
const int maxInt = 0x1fffffffffffff;
// maxChunkSize is the maximum number of bytes we can sum without
......@@ -119,8 +119,8 @@ int adler32Hash(List<int> binary) {
final int length = binary.length;
for (int i = 0; i < length; i += maxChunkSize) {
final int end = i + maxChunkSize < length ? i + maxChunkSize : length;
for (final int c in binary.getRange(i, end)) {
a += c;
for (int j = i; j < end; j++) {
a += binary[j];
b += a;
}
a %= _adler32Prime;
......@@ -220,19 +220,22 @@ class RollingAdler32 {
/// On the receiving end, it will build a copy of the source file from the
/// given instructions.
class FileTransfer {
const FileTransfer();
/// Calculate hashes of blocks in the file.
Future<BlockHashes> calculateBlockHashesOfFile(File file, { int? blockSize }) async {
final int totalSize = await file.length();
blockSize ??= max(sqrt(totalSize).ceil(), 2560);
final Stream<Uint8List> fileContentStream = file.openRead().map((List<int> chunk) => Uint8List.fromList(chunk));
final Stream<Uint8List> fileContentStream = file.openRead().map((List<int> chunk) => chunk is Uint8List ? chunk : Uint8List.fromList(chunk));
final List<int> adler32Results = <int>[];
final List<String> md5Results = <String>[];
await for (final Uint8List chunk in convertToChunks(fileContentStream, blockSize)) {
await convertToChunks(fileContentStream, blockSize).forEach((Uint8List chunk) {
adler32Results.add(adler32Hash(chunk));
md5Results.add(base64.encode(md5.convert(chunk).bytes));
}
});
// Handle whole file md5 separately. Md5Hash requires the chunk size to be a multiple of 64.
final String fileMd5 = await _md5OfFile(file);
......@@ -276,8 +279,9 @@ class FileTransfer {
final List<FileDeltaBlock> blocks = <FileDeltaBlock>[];
await for (final List<int> chunk in fileContentStream) {
for (final int c in chunk) {
await fileContentStream.forEach((List<int> chunk) {
for (int i = 0; i < chunk.length; i++) {
final int c = chunk[i];
final int hash = adler32.push(c);
size++;
......@@ -326,7 +330,7 @@ class FileTransfer {
break;
}
}
}
});
// For the remaining content that is not matched, copy from the source.
if (start < size) {
......@@ -401,7 +405,7 @@ class FileTransfer {
Future<String> _md5OfFile(File file) async {
final Md5Hash fileMd5Hash = Md5Hash();
await file.openRead().forEach((List<int> chunk) => fileMd5Hash.addChunk(Uint8List.fromList(chunk)));
await file.openRead().forEach((List<int> chunk) => fileMd5Hash.addChunk(chunk is Uint8List ? chunk : Uint8List.fromList(chunk)));
return base64.encode(fileMd5Hash.finalize().buffer.asUint8List());
}
}
......
......@@ -63,7 +63,7 @@ void main() {
group('adler32Hash', () {
test('works correctly', () {
final int hash = adler32Hash(utf8.encode('abcdefg'));
final int hash = adler32Hash(Uint8List.fromList(utf8.encode('abcdefg')));
expect(hash, 0x0adb02bd);
});
});
......@@ -72,19 +72,19 @@ void main() {
test('works correctly without rolling', () {
final RollingAdler32 adler32 = RollingAdler32(7);
utf8.encode('abcdefg').forEach(adler32.push);
expect(adler32.hash, adler32Hash(utf8.encode('abcdefg')));
expect(adler32.hash, adler32Hash(Uint8List.fromList(utf8.encode('abcdefg'))));
});
test('works correctly after rolling once', () {
final RollingAdler32 adler32 = RollingAdler32(7);
utf8.encode('12abcdefg').forEach(adler32.push);
expect(adler32.hash, adler32Hash(utf8.encode('abcdefg')));
expect(adler32.hash, adler32Hash(Uint8List.fromList(utf8.encode('abcdefg'))));
});
test('works correctly after rolling multiple cycles', () {
final RollingAdler32 adler32 = RollingAdler32(7);
utf8.encode('1234567890123456789abcdefg').forEach(adler32.push);
expect(adler32.hash, adler32Hash(utf8.encode('abcdefg')));
expect(adler32.hash, adler32Hash(Uint8List.fromList(utf8.encode('abcdefg'))));
});
test('works correctly after reset', () {
......@@ -92,7 +92,7 @@ void main() {
utf8.encode('1234567890123456789abcdefg').forEach(adler32.push);
adler32.reset();
utf8.encode('abcdefg').forEach(adler32.push);
expect(adler32.hash, adler32Hash(utf8.encode('abcdefg')));
expect(adler32.hash, adler32Hash(Uint8List.fromList(utf8.encode('abcdefg'))));
});
test('currentBlock returns the correct entry when read less than one block', () {
......@@ -133,7 +133,7 @@ void main() {
test('calculateBlockHashesOfFile works normally', () async {
final File file = fileSystem.file('test')..writeAsStringSync(content1);
final BlockHashes hashes = await FileTransfer().calculateBlockHashesOfFile(file, blockSize: 4);
final BlockHashes hashes = await const FileTransfer().calculateBlockHashesOfFile(file, blockSize: 4);
expect(hashes.blockSize, 4);
expect(hashes.totalSize, content1.length);
expect(hashes.adler32, hasLength(5));
......@@ -159,8 +159,8 @@ void main() {
final File file1 = fileSystem.file('file1')..writeAsStringSync(content1);
final File file2 = fileSystem.file('file1')..writeAsStringSync(content1);
final BlockHashes hashes = await FileTransfer().calculateBlockHashesOfFile(file1, blockSize: 4);
final List<FileDeltaBlock> delta = await FileTransfer().computeDelta(file2, hashes);
final BlockHashes hashes = await const FileTransfer().calculateBlockHashesOfFile(file1, blockSize: 4);
final List<FileDeltaBlock> delta = await const FileTransfer().computeDelta(file2, hashes);
expect(delta, isEmpty);
});
......@@ -169,21 +169,21 @@ void main() {
final File file1 = fileSystem.file('file1')..writeAsStringSync(content1);
final File file2 = fileSystem.file('file2')..writeAsStringSync(content2);
final BlockHashes hashes = await FileTransfer().calculateBlockHashesOfFile(file1, blockSize: 4);
final List<FileDeltaBlock> delta = await FileTransfer().computeDelta(file2, hashes);
final BlockHashes hashes = await const FileTransfer().calculateBlockHashesOfFile(file1, blockSize: 4);
final List<FileDeltaBlock> delta = await const FileTransfer().computeDelta(file2, hashes);
expect(delta, expectedDelta);
});
test('binaryForRebuilding returns the correct binary', () async {
final File file = fileSystem.file('file')..writeAsStringSync(content2);
final List<int> binaryForRebuilding = await FileTransfer().binaryForRebuilding(file, expectedDelta);
final List<int> binaryForRebuilding = await const FileTransfer().binaryForRebuilding(file, expectedDelta);
expect(binaryForRebuilding, utf8.encode(expectedBinaryForRebuilding));
});
test('rebuildFile can rebuild the correct file', () async {
final File file = fileSystem.file('file')..writeAsStringSync(content1);
await FileTransfer().rebuildFile(file, expectedDelta, Stream<List<int>>.fromIterable(<List<int>>[utf8.encode(expectedBinaryForRebuilding)]));
await const FileTransfer().rebuildFile(file, expectedDelta, Stream<List<int>>.fromIterable(<List<int>>[utf8.encode(expectedBinaryForRebuilding)]));
expect(file.readAsStringSync(), content2);
});
});
......
......@@ -6,12 +6,16 @@ import 'dart:async';
import 'dart:io';
import 'dart:typed_data';
import 'package:file/memory.dart';
import 'package:flutter_tools/src/application_package.dart';
import 'package:flutter_tools/src/base/dds.dart';
import 'package:flutter_tools/src/base/file_system.dart';
import 'package:flutter_tools/src/base/logger.dart';
import 'package:flutter_tools/src/base/utils.dart';
import 'package:flutter_tools/src/daemon.dart';
import 'package:flutter_tools/src/device.dart';
import 'package:flutter_tools/src/proxied_devices/devices.dart';
import 'package:flutter_tools/src/proxied_devices/file_transfer.dart';
import 'package:test/fake.dart';
import '../../src/common.dart';
......@@ -328,6 +332,167 @@ void main() {
expect(message.data['method'], 'device.stopApp');
expect(message.data['params'], <String, Object?>{'deviceId': 'device-id', 'userIdentifier': 'user-id'});
});
group('when launching an app with PrebuiltApplicationPackage', () {
late MemoryFileSystem fileSystem;
late FakePrebuiltApplicationPackage applicationPackage;
const List<int> fileContent = <int>[100, 120, 140];
setUp(() {
fileSystem = MemoryFileSystem.test()
..directory('dir').createSync()
..file('dir/foo').writeAsBytesSync(fileContent);
applicationPackage = FakePrebuiltApplicationPackage(fileSystem.file('dir/foo'));
});
testWithoutContext('transfers file to the daemon', () async {
bufferLogger = BufferLogger.test();
final ProxiedDevices proxiedDevices = ProxiedDevices(
clientDaemonConnection,
logger: bufferLogger,
deltaFileTransfer: false,
);
final ProxiedDevice device = proxiedDevices.deviceFromDaemonResult(fakeDevice);
final Stream<DaemonMessage> broadcastOutput = serverDaemonConnection.incomingCommands.asBroadcastStream();
final Future<String> resultFuture = device.applicationPackageId(applicationPackage);
// Send proxy.writeTempFile.
final DaemonMessage writeTempFileMessage = await broadcastOutput.first;
expect(writeTempFileMessage.data['id'], isNotNull);
expect(writeTempFileMessage.data['method'], 'proxy.writeTempFile');
expect(writeTempFileMessage.data['params'], <String, Object?>{
'path': 'foo',
});
expect(await writeTempFileMessage.binary?.first, fileContent);
serverDaemonConnection.sendResponse(writeTempFileMessage.data['id']!);
// Send device.uploadApplicationPackage.
final DaemonMessage uploadApplicationPackageMessage = await broadcastOutput.first;
expect(uploadApplicationPackageMessage.data['id'], isNotNull);
expect(uploadApplicationPackageMessage.data['method'], 'device.uploadApplicationPackage');
expect(uploadApplicationPackageMessage.data['params'], <String, Object?>{
'targetPlatform': 'android-arm',
'applicationBinary': 'foo',
});
serverDaemonConnection.sendResponse(uploadApplicationPackageMessage.data['id']!, 'test_id');
expect(await resultFuture, 'test_id');
});
testWithoutContext('transfers file to the daemon with delta turned on, file not exist on remote', () async {
bufferLogger = BufferLogger.test();
final FakeFileTransfer fileTransfer = FakeFileTransfer();
final ProxiedDevices proxiedDevices = ProxiedDevices(
clientDaemonConnection,
logger: bufferLogger,
fileTransfer: fileTransfer,
);
final ProxiedDevice device = proxiedDevices.deviceFromDaemonResult(fakeDevice);
final Stream<DaemonMessage> broadcastOutput = serverDaemonConnection.incomingCommands.asBroadcastStream();
final Future<String> resultFuture = device.applicationPackageId(applicationPackage);
// Send proxy.calculateFileHashes.
final DaemonMessage calculateFileHashesMessage = await broadcastOutput.first;
expect(calculateFileHashesMessage.data['id'], isNotNull);
expect(calculateFileHashesMessage.data['method'], 'proxy.calculateFileHashes');
expect(calculateFileHashesMessage.data['params'], <String, Object?>{
'path': 'foo',
});
serverDaemonConnection.sendResponse(calculateFileHashesMessage.data['id']!);
// Send proxy.writeTempFile.
final DaemonMessage writeTempFileMessage = await broadcastOutput.first;
expect(writeTempFileMessage.data['id'], isNotNull);
expect(writeTempFileMessage.data['method'], 'proxy.writeTempFile');
expect(writeTempFileMessage.data['params'], <String, Object?>{
'path': 'foo',
});
expect(await writeTempFileMessage.binary?.first, fileContent);
serverDaemonConnection.sendResponse(writeTempFileMessage.data['id']!);
// Send device.uploadApplicationPackage.
final DaemonMessage uploadApplicationPackageMessage = await broadcastOutput.first;
expect(uploadApplicationPackageMessage.data['id'], isNotNull);
expect(uploadApplicationPackageMessage.data['method'], 'device.uploadApplicationPackage');
expect(uploadApplicationPackageMessage.data['params'], <String, Object?>{
'targetPlatform': 'android-arm',
'applicationBinary': 'foo',
});
serverDaemonConnection.sendResponse(uploadApplicationPackageMessage.data['id']!, 'test_id');
expect(await resultFuture, 'test_id');
});
testWithoutContext('transfers file to the daemon with delta turned on, file exists on remote', () async {
bufferLogger = BufferLogger.test();
final FakeFileTransfer fileTransfer = FakeFileTransfer();
final BlockHashes blockHashes = BlockHashes(
blockSize: 10,
totalSize: 30,
adler32: <int>[1, 2, 3],
md5: <String>['a', 'b', 'c'],
fileMd5: 'abc',
);
const List<FileDeltaBlock> deltaBlocks = <FileDeltaBlock>[
FileDeltaBlock.fromSource(start: 10, size: 10),
FileDeltaBlock.fromDestination(start: 30, size: 40),
];
fileTransfer.binary = Uint8List.fromList(<int>[11, 12, 13]);
fileTransfer.delta = deltaBlocks;
final ProxiedDevices proxiedDevices = ProxiedDevices(
clientDaemonConnection,
logger: bufferLogger,
fileTransfer: fileTransfer,
);
final ProxiedDevice device = proxiedDevices.deviceFromDaemonResult(fakeDevice);
final Stream<DaemonMessage> broadcastOutput = serverDaemonConnection.incomingCommands.asBroadcastStream();
final Future<String> resultFuture = device.applicationPackageId(applicationPackage);
// Send proxy.calculateFileHashes.
final DaemonMessage calculateFileHashesMessage = await broadcastOutput.first;
expect(calculateFileHashesMessage.data['id'], isNotNull);
expect(calculateFileHashesMessage.data['method'], 'proxy.calculateFileHashes');
expect(calculateFileHashesMessage.data['params'], <String, Object?>{
'path': 'foo',
});
serverDaemonConnection.sendResponse(calculateFileHashesMessage.data['id']!, blockHashes.toJson());
// Send proxy.updateFile.
final DaemonMessage updateFileMessage = await broadcastOutput.first;
expect(updateFileMessage.data['id'], isNotNull);
expect(updateFileMessage.data['method'], 'proxy.updateFile');
expect(updateFileMessage.data['params'], <String, Object?>{
'path': 'foo',
'delta': <Map<String, Object>>[
<String, Object>{'size': 10},
<String, Object>{'start': 30, 'size': 40},
],
});
expect(await updateFileMessage.binary?.first, <int>[11, 12, 13]);
serverDaemonConnection.sendResponse(updateFileMessage.data['id']!);
// Send device.uploadApplicationPackage.
final DaemonMessage uploadApplicationPackageMessage = await broadcastOutput.first;
expect(uploadApplicationPackageMessage.data['id'], isNotNull);
expect(uploadApplicationPackageMessage.data['method'], 'device.uploadApplicationPackage');
expect(uploadApplicationPackageMessage.data['params'], <String, Object?>{
'targetPlatform': 'android-arm',
'applicationBinary': 'foo',
});
serverDaemonConnection.sendResponse(uploadApplicationPackageMessage.data['id']!, 'test_id');
expect(await resultFuture, 'test_id');
});
});
});
group('ProxiedDevices', () {
......@@ -714,3 +879,19 @@ class FakeDartDevelopmentService extends Fake implements DartDevelopmentService
@override
Future<void> shutdown() async => shutdownCalled = true;
}
class FakePrebuiltApplicationPackage extends Fake implements PrebuiltApplicationPackage {
FakePrebuiltApplicationPackage(this.applicationPackage);
@override
final FileSystemEntity applicationPackage;
}
class FakeFileTransfer extends Fake implements FileTransfer {
List<FileDeltaBlock>? delta;
Uint8List? binary;
@override
Future<List<FileDeltaBlock>> computeDelta(File file, BlockHashes hashes) async => delta!;
@override
Future<Uint8List> binaryForRebuilding(File file, List<FileDeltaBlock> delta) async => binary!;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment