Commit 487f28fd authored by John McCutchan's avatar John McCutchan Committed by GitHub

Speedup devFS writing and improve status information (#5287)

parent e3f6d153
...@@ -8,6 +8,7 @@ import 'dart:io'; ...@@ -8,6 +8,7 @@ import 'dart:io';
import 'package:path/path.dart' as path; import 'package:path/path.dart' as path;
import 'base/logger.dart';
import 'dart/package_map.dart'; import 'dart/package_map.dart';
import 'asset.dart'; import 'asset.dart';
import 'globals.dart'; import 'globals.dart';
...@@ -29,8 +30,8 @@ class DevFSEntry { ...@@ -29,8 +30,8 @@ class DevFSEntry {
final File file; final File file;
FileStat _fileStat; FileStat _fileStat;
// When we updated the DevFS, did we see this entry? // When we scanned for files, did this still exist?
bool _wasSeen = false; bool _exists = false;
DateTime get lastModified => _fileStat?.modified; DateTime get lastModified => _fileStat?.modified;
bool get stillExists { bool get stillExists {
if (_isSourceEntry) if (_isSourceEntry)
...@@ -75,6 +76,18 @@ class DevFSEntry { ...@@ -75,6 +76,18 @@ class DevFSEntry {
return bundleEntry.contentsAsBytes(); return bundleEntry.contentsAsBytes();
return file.readAsBytes(); return file.readAsBytes();
} }
Stream<List<int>> contentsAsStream() {
if (_isSourceEntry) {
return new Stream<List<int>>.fromIterable(
<List<int>>[bundleEntry.contentsAsBytes()]);
}
return file.openRead();
}
Stream<List<int>> contentsAsCompressedStream() {
return contentsAsStream().transform(GZIP.encoder);
}
} }
...@@ -92,7 +105,7 @@ abstract class DevFSOperations { ...@@ -92,7 +105,7 @@ abstract class DevFSOperations {
/// An implementation of [DevFSOperations] that speaks to the /// An implementation of [DevFSOperations] that speaks to the
/// service protocol. /// service protocol.
class ServiceProtocolDevFSOperations implements DevFSOperations { class ServiceProtocolDevFSOperations implements DevFSOperations {
final Observatory serviceProtocol; final Observatory serviceProtocol;
ServiceProtocolDevFSOperations(this.serviceProtocol); ServiceProtocolDevFSOperations(this.serviceProtocol);
...@@ -148,22 +161,94 @@ class ServiceProtocolDevFSOperations implements DevFSOperations { ...@@ -148,22 +161,94 @@ class ServiceProtocolDevFSOperations implements DevFSOperations {
} }
} }
class _DevFSHttpWriter {
_DevFSHttpWriter(this.fsName, Observatory serviceProtocol)
: httpAddress = serviceProtocol.httpAddress;
final String fsName;
final Uri httpAddress;
static const int kMaxInFlight = 6;
int _inFlight = 0;
List<DevFSEntry> _outstanding;
Completer<Null> _completer;
HttpClient _client;
int _done;
int _max;
Future<Null> write(Set<DevFSEntry> entries,
{DevFSProgressReporter progressReporter}) async {
_client = new HttpClient();
_client.maxConnectionsPerHost = kMaxInFlight;
_completer = new Completer<Null>();
_outstanding = entries.toList();
_done = 0;
_max = _outstanding.length;
_scheduleWrites(progressReporter);
await _completer.future;
_client.close();
}
void _scheduleWrites(DevFSProgressReporter progressReporter) {
while (_inFlight < kMaxInFlight) {
if (_outstanding.length == 0) {
// Finished.
break;
}
DevFSEntry entry = _outstanding.removeLast();
_scheduleWrite(entry, progressReporter);
_inFlight++;
}
}
Future<Null> _scheduleWrite(DevFSEntry entry,
DevFSProgressReporter progressReporter) async {
HttpClientRequest request = await _client.putUrl(httpAddress);
request.headers.removeAll(HttpHeaders.ACCEPT_ENCODING);
request.headers.add('dev_fs_name', fsName);
request.headers.add('dev_fs_path', entry.devicePath);
Stream<List<int>> contents = entry.contentsAsCompressedStream();
await request.addStream(contents);
HttpClientResponse response = await request.close();
await response.drain();
if (progressReporter != null) {
_done++;
progressReporter(_done, _max);
}
_inFlight--;
if ((_outstanding.length == 0) && (_inFlight == 0)) {
_completer.complete(null);
} else {
_scheduleWrites(progressReporter);
}
}
}
class DevFS { class DevFS {
/// Create a [DevFS] named [fsName] for the local files in [directory]. /// Create a [DevFS] named [fsName] for the local files in [directory].
DevFS(Observatory serviceProtocol, DevFS(Observatory serviceProtocol,
this.fsName, String fsName,
this.rootDirectory) this.rootDirectory)
: _operations = new ServiceProtocolDevFSOperations(serviceProtocol); : _operations = new ServiceProtocolDevFSOperations(serviceProtocol),
_httpWriter = new _DevFSHttpWriter(fsName, serviceProtocol),
fsName = fsName;
DevFS.operations(this._operations, DevFS.operations(this._operations,
this.fsName, this.fsName,
this.rootDirectory); this.rootDirectory)
: _httpWriter = null;
final DevFSOperations _operations; final DevFSOperations _operations;
final _DevFSHttpWriter _httpWriter;
final String fsName; final String fsName;
final Directory rootDirectory; final Directory rootDirectory;
final Map<String, DevFSEntry> _entries = <String, DevFSEntry>{}; final Map<String, DevFSEntry> _entries = <String, DevFSEntry>{};
final List<Future<Response>> _pendingWrites = new List<Future<Response>>(); final Set<DevFSEntry> _dirtyEntries = new Set<DevFSEntry>();
final Set<DevFSEntry> _deletedEntries = new Set<DevFSEntry>();
final List<Future<Response>> _pendingOperations = new List<Future<Response>>();
int _bytes = 0; int _bytes = 0;
int get bytes => _bytes; int get bytes => _bytes;
Uri _baseUri; Uri _baseUri;
...@@ -180,22 +265,33 @@ class DevFS { ...@@ -180,22 +265,33 @@ class DevFS {
return await _operations.destroy(fsName); return await _operations.destroy(fsName);
} }
Future<dynamic> update({ DevFSProgressReporter progressReporter, void _reset() {
AssetBundle bundle, // Reset the dirty byte count.
bool bundleDirty: false }) async {
_bytes = 0; _bytes = 0;
// Mark all entries as not seen. // Mark all entries as possibly deleted.
_entries.forEach((String path, DevFSEntry entry) { _entries.forEach((String path, DevFSEntry entry) {
entry._wasSeen = false; entry._exists = false;
}); });
// Clear the dirt entries list.
_dirtyEntries.clear();
// Clear the deleted entries list.
_deletedEntries.clear();
}
Future<dynamic> update({ DevFSProgressReporter progressReporter,
AssetBundle bundle,
bool bundleDirty: false }) async {
_reset();
printTrace('DevFS: Starting sync from $rootDirectory'); printTrace('DevFS: Starting sync from $rootDirectory');
// Send the root and lib directories. Status status;
status = logger.startProgress('Scanning project files...');
Directory directory = rootDirectory; Directory directory = rootDirectory;
await _syncDirectory(directory, recursive: true); await _scanDirectory(directory, recursive: true);
printTrace('DevFS: Syncing of $rootDirectory finished'); status.stop(showElapsedTime: true);
status = logger.startProgress('Scanning package files...');
String packagesFilePath = path.join(rootDirectory.path, kPackagesFileName); String packagesFilePath = path.join(rootDirectory.path, kPackagesFileName);
StringBuffer sb; StringBuffer sb;
// Send the packages.
if (FileSystemEntity.isFileSync(packagesFilePath)) { if (FileSystemEntity.isFileSync(packagesFilePath)) {
PackageMap packageMap = new PackageMap(kPackagesFileName); PackageMap packageMap = new PackageMap(kPackagesFileName);
...@@ -205,107 +301,129 @@ class DevFS { ...@@ -205,107 +301,129 @@ class DevFS {
if (uri.toString() == 'lib/') if (uri.toString() == 'lib/')
continue; continue;
Directory directory = new Directory.fromUri(uri); Directory directory = new Directory.fromUri(uri);
printTrace('DevFS: Syncing package $packageName started'); bool packageExists =
bool packageWritten = await _scanDirectory(directory,
await _syncDirectory(directory,
directoryName: 'packages/$packageName', directoryName: 'packages/$packageName',
recursive: true); recursive: true);
if (packageWritten) { if (packageExists) {
sb ??= new StringBuffer(); sb ??= new StringBuffer();
sb.writeln('$packageName:packages/$packageName'); sb.writeln('$packageName:packages/$packageName');
} }
printTrace('DevFS: Syncing package $packageName finished');
} }
} }
status.stop(showElapsedTime: true);
if (bundle != null) { if (bundle != null) {
status = logger.startProgress('Scanning asset files...');
// Synchronize asset bundle. // Synchronize asset bundle.
for (AssetBundleEntry entry in bundle.entries) { for (AssetBundleEntry entry in bundle.entries) {
// We write the assets into 'build/flx' so that they are in the // We write the assets into 'build/flx' so that they are in the
// same location in DevFS and the iOS simulator. // same location in DevFS and the iOS simulator.
final String devicePath = path.join('build/flx', entry.archivePath); final String devicePath = path.join('build/flx', entry.archivePath);
if (!bundleDirty && entry.isStringEntry) { _scanBundleEntry(devicePath, entry, bundleDirty);
// When the bundle isn't dirty, we do not need to sync string
// entries.
continue;
}
_syncBundleEntry(devicePath, entry);
} }
status.stop(showElapsedTime: true);
} }
// Handle deletions. // Handle deletions.
status = logger.startProgress('Scanning for deleted files...');
final List<String> toRemove = new List<String>(); final List<String> toRemove = new List<String>();
_entries.forEach((String path, DevFSEntry entry) { _entries.forEach((String path, DevFSEntry entry) {
if (!entry._wasSeen) { if (!entry._exists) {
_deleteEntry(path, entry); _deletedEntries.add(entry);
toRemove.add(path); toRemove.add(path);
} }
}); });
for (int i = 0; i < toRemove.length; i++) { for (int i = 0; i < toRemove.length; i++) {
_entries.remove(toRemove[i]); _entries.remove(toRemove[i]);
} }
// Send the assets. status.stop(showElapsedTime: true);
printTrace('DevFS: Waiting for sync of ${_pendingWrites.length} files '
'to finish'); if (_deletedEntries.length > 0) {
status = logger.startProgress('Removing deleted files...');
for (DevFSEntry entry in _deletedEntries) {
Future<Response> operation = _operations.deleteFile(fsName, entry);
if (operation != null)
_pendingOperations.add(operation);
}
await Future.wait(_pendingOperations);
_pendingOperations.clear();
_deletedEntries.clear();
status.stop(showElapsedTime: true);
} else {
printStatus("No files to remove.");
}
if (progressReporter != null) { if (_dirtyEntries.length > 0) {
final int max = _pendingWrites.length; status = logger.startProgress('Updating files...');
int complete = 0; if (_httpWriter != null) {
_pendingWrites.forEach((Future<dynamic> f) => f.then((dynamic v) { await _httpWriter.write(_dirtyEntries,
complete += 1; progressReporter: progressReporter);
progressReporter(complete, max); } else {
})); // Make service protocol requests for each.
for (DevFSEntry entry in _dirtyEntries) {
Future<Response> operation = _operations.writeFile(fsName, entry);
if (operation != null)
_pendingOperations.add(operation);
}
if (progressReporter != null) {
final int max = _pendingOperations.length;
int complete = 0;
_pendingOperations.forEach((Future<dynamic> f) => f.then((dynamic v) {
complete += 1;
progressReporter(complete, max);
}));
}
await Future.wait(_pendingOperations, eagerError: true);
_pendingOperations.clear();
}
_dirtyEntries.clear();
status.stop(showElapsedTime: true);
} else {
printStatus("No files to update.");
} }
await Future.wait(_pendingWrites, eagerError: true);
_pendingWrites.clear();
if (sb != null) if (sb != null)
await _operations.writeSource(fsName, '.packages', sb.toString()); await _operations.writeSource(fsName, '.packages', sb.toString());
printTrace('DevFS: Sync finished'); printTrace('DevFS: Sync finished');
// NB: You must call flush after a printTrace if you want to be printed // NB: You must call flush after a printTrace if you want to be printed
// immediately. // immediately.
logger.flush(); logger.flush();
} }
void _deleteEntry(String path, DevFSEntry entry) { void _scanFile(String devicePath, File file) {
_pendingWrites.add(_operations.deleteFile(fsName, entry));
}
void _syncFile(String devicePath, File file) {
DevFSEntry entry = _entries[devicePath]; DevFSEntry entry = _entries[devicePath];
if (entry == null) { if (entry == null) {
// New file. // New file.
entry = new DevFSEntry(devicePath, file); entry = new DevFSEntry(devicePath, file);
_entries[devicePath] = entry; _entries[devicePath] = entry;
} }
entry._wasSeen = true; entry._exists = true;
bool needsWrite = entry.isModified; bool needsWrite = entry.isModified;
if (needsWrite) { if (needsWrite) {
_bytes += entry.size; if (_dirtyEntries.add(entry))
Future<dynamic> pendingWrite = _operations.writeFile(fsName, entry); _bytes += entry.size;
if (pendingWrite != null) {
_pendingWrites.add(pendingWrite);
} else {
printTrace('DevFS: Failed to sync "$devicePath"');
}
} }
} }
void _syncBundleEntry(String devicePath, AssetBundleEntry assetBundleEntry) { void _scanBundleEntry(String devicePath,
AssetBundleEntry assetBundleEntry,
bool bundleDirty) {
DevFSEntry entry = _entries[devicePath]; DevFSEntry entry = _entries[devicePath];
if (entry == null) { if (entry == null) {
// New file. // New file.
entry = new DevFSEntry.bundle(devicePath, assetBundleEntry); entry = new DevFSEntry.bundle(devicePath, assetBundleEntry);
_entries[devicePath] = entry; _entries[devicePath] = entry;
} }
entry._wasSeen = true; entry._exists = true;
if (!bundleDirty && assetBundleEntry.isStringEntry) {
// String bundle entries are synthetic files that only change if the
// bundle itself changes. Skip them if the bundle is not dirty.
return;
}
bool needsWrite = entry.isModified; bool needsWrite = entry.isModified;
if (needsWrite) { if (needsWrite) {
_bytes += entry.size; if (_dirtyEntries.add(entry))
Future<dynamic> pendingWrite = _operations.writeFile(fsName, entry); _bytes += entry.size;
if (pendingWrite != null) {
_pendingWrites.add(pendingWrite);
} else {
printTrace('DevFS: Failed to sync "$devicePath"');
}
} }
} }
...@@ -321,7 +439,7 @@ class DevFS { ...@@ -321,7 +439,7 @@ class DevFS {
return false; return false;
} }
Future<bool> _syncDirectory(Directory directory, Future<bool> _scanDirectory(Directory directory,
{String directoryName, {String directoryName,
bool recursive: false, bool recursive: false,
bool ignoreDotFiles: true}) async { bool ignoreDotFiles: true}) async {
...@@ -346,7 +464,7 @@ class DevFS { ...@@ -346,7 +464,7 @@ class DevFS {
final String devicePath = final String devicePath =
path.join(prefix, path.relative(file.path, from: directory.path)); path.join(prefix, path.relative(file.path, from: directory.path));
if (!_shouldIgnore(devicePath)) if (!_shouldIgnore(devicePath))
_syncFile(devicePath, file); _scanFile(devicePath, file);
} }
} catch (e) { } catch (e) {
// Ignore directory and error. // Ignore directory and error.
......
...@@ -363,7 +363,7 @@ class IOSSimulator extends Device { ...@@ -363,7 +363,7 @@ class IOSSimulator extends Device {
bool get supportsHotMode => true; bool get supportsHotMode => true;
@override @override
bool get needsDevFS => false; bool get needsDevFS => true;
_IOSSimulatorLogReader _logReader; _IOSSimulatorLogReader _logReader;
_IOSSimulatorDevicePortForwarder _portForwarder; _IOSSimulatorDevicePortForwarder _portForwarder;
......
...@@ -13,7 +13,7 @@ import 'globals.dart'; ...@@ -13,7 +13,7 @@ import 'globals.dart';
// TODO(johnmccutchan): Rename this class to ServiceProtocol or VmService. // TODO(johnmccutchan): Rename this class to ServiceProtocol or VmService.
class Observatory { class Observatory {
Observatory._(this.peer, this.port) { Observatory._(this.peer, this.port, this.httpAddress) {
peer.registerMethod('streamNotify', (rpc.Parameters event) { peer.registerMethod('streamNotify', (rpc.Parameters event) {
_handleStreamNotify(event.asMap); _handleStreamNotify(event.asMap);
}); });
...@@ -33,9 +33,10 @@ class Observatory { ...@@ -33,9 +33,10 @@ class Observatory {
WebSocket ws = await WebSocket.connect(uri.toString()); WebSocket ws = await WebSocket.connect(uri.toString());
rpc.Peer peer = new rpc.Peer(new IOWebSocketChannel(ws)); rpc.Peer peer = new rpc.Peer(new IOWebSocketChannel(ws));
peer.listen(); peer.listen();
return new Observatory._(peer, port); Uri httpAddress = new Uri(scheme: 'http', host: '127.0.0.1', port: port);
return new Observatory._(peer, port, httpAddress);
} }
final Uri httpAddress;
final rpc.Peer peer; final rpc.Peer peer;
final int port; final int port;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment