Unverified Commit 12c190f7 authored by Zachary Anderson's avatar Zachary Anderson Committed by GitHub

[flutter_tool] Stream artifact downloads to files (#44360)

parent b10a5d68
...@@ -5,8 +5,10 @@ ...@@ -5,8 +5,10 @@
import 'dart:async'; import 'dart:async';
import '../base/context.dart'; import '../base/context.dart';
import '../convert.dart';
import '../globals.dart'; import '../globals.dart';
import 'common.dart'; import 'common.dart';
import 'file_system.dart';
import 'io.dart'; import 'io.dart';
import 'platform.dart'; import 'platform.dart';
...@@ -14,16 +16,39 @@ const int kNetworkProblemExitCode = 50; ...@@ -14,16 +16,39 @@ const int kNetworkProblemExitCode = 50;
typedef HttpClientFactory = HttpClient Function(); typedef HttpClientFactory = HttpClient Function();
/// Download a file from the given URL and return the bytes. /// Download a file from the given URL.
Future<List<int>> fetchUrl(Uri url, {int maxAttempts}) async { ///
/// If a destination file is not provided, returns the bytes.
///
/// If a destination file is provided, streams the bytes to that file and
/// returns an empty list.
///
/// If [maxAttempts] is exceeded, returns null.
Future<List<int>> fetchUrl(Uri url, {
int maxAttempts,
File destFile,
}) async {
int attempts = 0; int attempts = 0;
int durationSeconds = 1; int durationSeconds = 1;
while (true) { while (true) {
attempts += 1; attempts += 1;
final List<int> result = await _attempt(url); _MemoryIOSink memorySink;
if (result != null) { IOSink sink;
return result; if (destFile == null) {
memorySink = _MemoryIOSink();
sink = memorySink;
} else {
sink = destFile.openWrite();
}
final bool result = await _attempt(
url,
destSink: sink,
);
if (result) {
return memorySink?.writes?.takeBytes() ?? <int>[];
} }
if (maxAttempts != null && attempts >= maxAttempts) { if (maxAttempts != null && attempts >= maxAttempts) {
printStatus('Download failed -- retry $attempts'); printStatus('Download failed -- retry $attempts');
return null; return null;
...@@ -38,10 +63,14 @@ Future<List<int>> fetchUrl(Uri url, {int maxAttempts}) async { ...@@ -38,10 +63,14 @@ Future<List<int>> fetchUrl(Uri url, {int maxAttempts}) async {
} }
/// Check if the given URL points to a valid endpoint. /// Check if the given URL points to a valid endpoint.
Future<bool> doesRemoteFileExist(Uri url) async => Future<bool> doesRemoteFileExist(Uri url) async => await _attempt(url, onlyHeaders: true);
(await _attempt(url, onlyHeaders: true)) != null;
Future<List<int>> _attempt(Uri url, { bool onlyHeaders = false }) async { // Returns true on success and false on failure.
Future<bool> _attempt(Uri url, {
IOSink destSink,
bool onlyHeaders = false,
}) async {
assert(onlyHeaders || destSink != null);
printTrace('Downloading: $url'); printTrace('Downloading: $url');
HttpClient httpClient; HttpClient httpClient;
if (context.get<HttpClientFactory>() != null) { if (context.get<HttpClientFactory>() != null) {
...@@ -81,19 +110,19 @@ Future<List<int>> _attempt(Uri url, { bool onlyHeaders = false }) async { ...@@ -81,19 +110,19 @@ Future<List<int>> _attempt(Uri url, { bool onlyHeaders = false }) async {
); );
} on SocketException catch (error) { } on SocketException catch (error) {
printTrace('Download error: $error'); printTrace('Download error: $error');
return null; return false;
} on HttpException catch (error) { } on HttpException catch (error) {
printTrace('Download error: $error'); printTrace('Download error: $error');
return null; return false;
} }
assert(response != null); assert(response != null);
// If we're making a HEAD request, we're only checking to see if the URL is // If we're making a HEAD request, we're only checking to see if the URL is
// valid. // valid.
if (onlyHeaders) { if (onlyHeaders) {
return (response.statusCode == 200) ? <int>[] : null; return response.statusCode == HttpStatus.ok;
} }
if (response.statusCode != 200) { if (response.statusCode != HttpStatus.ok) {
if (response.statusCode > 0 && response.statusCode < 500) { if (response.statusCode > 0 && response.statusCode < 500) {
throwToolExit( throwToolExit(
'Download failed.\n' 'Download failed.\n'
...@@ -104,15 +133,79 @@ Future<List<int>> _attempt(Uri url, { bool onlyHeaders = false }) async { ...@@ -104,15 +133,79 @@ Future<List<int>> _attempt(Uri url, { bool onlyHeaders = false }) async {
} }
// 5xx errors are server errors and we can try again // 5xx errors are server errors and we can try again
printTrace('Download error: ${response.statusCode} ${response.reasonPhrase}'); printTrace('Download error: ${response.statusCode} ${response.reasonPhrase}');
return null; return false;
} }
printTrace('Received response from server, collecting bytes...'); printTrace('Received response from server, collecting bytes...');
try { try {
final BytesBuilder responseBody = BytesBuilder(copy: false); assert(destSink != null);
await response.forEach(responseBody.add); await response.forEach(destSink.add);
return responseBody.takeBytes(); return true;
} on IOException catch (error) { } on IOException catch (error) {
printTrace('Download error: $error'); printTrace('Download error: $error');
return null; return false;
} finally {
await destSink?.flush();
await destSink?.close();
}
}
/// An IOSink that collects whatever is written to it.
class _MemoryIOSink implements IOSink {
@override
Encoding encoding = utf8;
final BytesBuilder writes = BytesBuilder(copy: false);
@override
void add(List<int> data) {
writes.add(data);
}
@override
Future<void> addStream(Stream<List<int>> stream) {
final Completer<void> completer = Completer<void>();
stream.listen(add).onDone(completer.complete);
return completer.future;
} }
@override
void writeCharCode(int charCode) {
add(<int>[charCode]);
}
@override
void write(Object obj) {
add(encoding.encode('$obj'));
}
@override
void writeln([ Object obj = '' ]) {
add(encoding.encode('$obj\n'));
}
@override
void writeAll(Iterable<dynamic> objects, [ String separator = '' ]) {
bool addSeparator = false;
for (dynamic object in objects) {
if (addSeparator) {
write(separator);
}
write(object);
addSeparator = true;
}
}
@override
void addError(dynamic error, [ StackTrace stackTrace ]) {
throw UnimplementedError();
}
@override
Future<void> get done => close();
@override
Future<void> close() async { }
@override
Future<void> flush() async { }
} }
...@@ -1242,8 +1242,7 @@ String flattenNameSubdirs(Uri url) { ...@@ -1242,8 +1242,7 @@ String flattenNameSubdirs(Uri url) {
/// Download a file from the given [url] and write it to [location]. /// Download a file from the given [url] and write it to [location].
Future<void> _downloadFile(Uri url, File location) async { Future<void> _downloadFile(Uri url, File location) async {
_ensureExists(location.parent); _ensureExists(location.parent);
final List<int> fileBytes = await fetchUrl(url); await fetchUrl(url, destFile: location);
location.writeAsBytesSync(fileBytes, flush: true);
} }
Future<bool> _doesRemoteExist(String message, Uri url) async { Future<bool> _doesRemoteExist(String message, Uri url) async {
......
...@@ -3,8 +3,12 @@ ...@@ -3,8 +3,12 @@
// found in the LICENSE file. // found in the LICENSE file.
import 'dart:async'; import 'dart:async';
import 'dart:convert';
import 'dart:typed_data'; import 'dart:typed_data';
import 'package:file/file.dart';
import 'package:file/memory.dart';
import 'package:flutter_tools/src/base/file_system.dart';
import 'package:flutter_tools/src/base/io.dart' as io; import 'package:flutter_tools/src/base/io.dart' as io;
import 'package:flutter_tools/src/base/net.dart'; import 'package:flutter_tools/src/base/net.dart';
import 'package:flutter_tools/src/base/platform.dart'; import 'package:flutter_tools/src/base/platform.dart';
...@@ -14,6 +18,36 @@ import '../../src/common.dart'; ...@@ -14,6 +18,36 @@ import '../../src/common.dart';
import '../../src/context.dart'; import '../../src/context.dart';
void main() { void main() {
group('successful fetch', () {
const String responseString = 'response string';
List<int> responseData;
setUp(() {
responseData = utf8.encode(responseString);
});
testUsingContext('fetchUrl() gets the data', () async {
final List<int> data = await fetchUrl(Uri.parse('http://example.invalid/'));
expect(data, equals(responseData));
}, overrides: <Type, Generator>{
HttpClientFactory: () => () => FakeHttpClient(200, data: responseString),
});
testUsingContext('fetchUrl(destFile) writes the data to a file', () async {
final File destFile = fs.file('dest_file')..createSync();
final List<int> data = await fetchUrl(
Uri.parse('http://example.invalid/'),
destFile: destFile,
);
expect(data, equals(<int>[]));
expect(destFile.readAsStringSync(), equals(responseString));
}, overrides: <Type, Generator>{
FileSystem: () => MemoryFileSystem(),
HttpClientFactory: () => () => FakeHttpClient(200, data: responseString),
ProcessManager: () => FakeProcessManager.any(),
});
});
testUsingContext('retry from 500', () async { testUsingContext('retry from 500', () async {
String error; String error;
FakeAsync().run((FakeAsync time) { FakeAsync().run((FakeAsync time) {
...@@ -249,13 +283,14 @@ class FakeHttpClientThrowing implements io.HttpClient { ...@@ -249,13 +283,14 @@ class FakeHttpClientThrowing implements io.HttpClient {
} }
class FakeHttpClient implements io.HttpClient { class FakeHttpClient implements io.HttpClient {
FakeHttpClient(this.statusCode); FakeHttpClient(this.statusCode, { this.data });
final int statusCode; final int statusCode;
final String data;
@override @override
Future<io.HttpClientRequest> getUrl(Uri url) async { Future<io.HttpClientRequest> getUrl(Uri url) async {
return FakeHttpClientRequest(statusCode); return FakeHttpClientRequest(statusCode, data: data);
} }
@override @override
...@@ -286,13 +321,14 @@ class FakeHttpClientThrowingRequest implements io.HttpClient { ...@@ -286,13 +321,14 @@ class FakeHttpClientThrowingRequest implements io.HttpClient {
} }
class FakeHttpClientRequest implements io.HttpClientRequest { class FakeHttpClientRequest implements io.HttpClientRequest {
FakeHttpClientRequest(this.statusCode); FakeHttpClientRequest(this.statusCode, { this.data });
final int statusCode; final int statusCode;
final String data;
@override @override
Future<io.HttpClientResponse> close() async { Future<io.HttpClientResponse> close() async {
return FakeHttpClientResponse(statusCode); return FakeHttpClientResponse(statusCode, data: data);
} }
@override @override
...@@ -318,11 +354,13 @@ class FakeHttpClientRequestThrowing implements io.HttpClientRequest { ...@@ -318,11 +354,13 @@ class FakeHttpClientRequestThrowing implements io.HttpClientRequest {
} }
class FakeHttpClientResponse implements io.HttpClientResponse { class FakeHttpClientResponse implements io.HttpClientResponse {
FakeHttpClientResponse(this.statusCode); FakeHttpClientResponse(this.statusCode, { this.data });
@override @override
final int statusCode; final int statusCode;
final String data;
@override @override
String get reasonPhrase => '<reason phrase>'; String get reasonPhrase => '<reason phrase>';
...@@ -333,13 +371,24 @@ class FakeHttpClientResponse implements io.HttpClientResponse { ...@@ -333,13 +371,24 @@ class FakeHttpClientResponse implements io.HttpClientResponse {
void onDone(), void onDone(),
bool cancelOnError, bool cancelOnError,
}) { }) {
return Stream<Uint8List>.fromFuture(Future<Uint8List>.error(const io.SocketException('test'))) if (data == null) {
.listen(onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError); return Stream<Uint8List>.fromFuture(Future<Uint8List>.error(
const io.SocketException('test'),
)).listen(onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError);
} else {
return Stream<Uint8List>.fromFuture(Future<Uint8List>.value(
utf8.encode(data) as Uint8List,
)).listen(onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
} }
@override @override
Future<dynamic> forEach(void Function(Uint8List element) action) { Future<dynamic> forEach(void Function(Uint8List element) action) async {
if (data == null) {
return Future<void>.error(const io.SocketException('test')); return Future<void>.error(const io.SocketException('test'));
} else {
return Future<void>.microtask(() => action(utf8.encode(data)));
}
} }
@override @override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment