Unverified Commit 8eb7b729 authored by Todd Volkert's avatar Todd Volkert Committed by GitHub

Add onBytesReceived callback to consolidateHttpClientResponseBytes() (#32853)

This will allow us to plumb the chunks in a chunked response
up to the higher levels of the framework to notify interested
parties of network loading progress.

https://github.com/flutter/flutter/issues/32374
parent 2b15b244
...@@ -3,31 +3,121 @@ ...@@ -3,31 +3,121 @@
// found in the LICENSE file. // found in the LICENSE file.
import 'dart:async'; import 'dart:async';
import 'dart:convert';
import 'dart:io'; import 'dart:io';
import 'dart:typed_data'; import 'dart:typed_data';
/// Efficiently converts the response body of an [HttpClientResponse] into a [Uint8List]. /// Signature for getting notified when chunks of bytes are received while
/// consolidating the bytes of an [HttpClientResponse] into a [Uint8List].
/// ///
/// The future returned will forward all errors emitted by [response]. /// The `cumulative` parameter will contain the total number of bytes received
Future<Uint8List> consolidateHttpClientResponseBytes(HttpClientResponse response) { /// thus far. If the response has been gzipped, this number will be the number
// response.contentLength is not trustworthy when GZIP is involved /// of compressed bytes that have been received _across the wire_.
// or other cases where an intermediate transformer has been applied ///
// to the stream. /// The `total` parameter will contain the _expected_ total number of bytes to
/// be received across the wire (extracted from the value of the
/// `Content-Length` HTTP response header), or -1 if the size of the response
/// body is not known in advance (this is common for HTTP chunked transfer
/// encoding, which itself is common when a large amount of data is being
/// returned to the client and the total size of the response may not be known
/// until the request has been fully processed).
///
/// This is used in [consolidateHttpClientResponseBytes].
typedef BytesReceivedCallback = void Function(int cumulative, int total);
/// Efficiently converts the response body of an [HttpClientResponse] into a
/// [Uint8List].
///
/// The future returned will forward any error emitted by `response`.
///
/// The `onBytesReceived` callback, if specified, will be invoked for every
/// chunk of bytes that is received while consolidating the response bytes.
/// If the callback throws an error, processing of the response will halt, and
/// the returned future will complete with the error that was thrown by the
/// callback. For more information on how to interpret the parameters to the
/// callback, see the documentation on [BytesReceivedCallback].
///
/// If the `response` is gzipped and the `autoUncompress` parameter is true,
/// this will automatically un-compress the bytes in the returned list if it
/// hasn't already been done via [HttpClient.autoUncompress]. To get compressed
/// bytes from this method (assuming the response is sending compressed bytes),
/// set both [HttpClient.autoUncompress] to false and the `autoUncompress`
/// parameter to false.
// TODO(tvolkert): Remove the [client] param once https://github.com/dart-lang/sdk/issues/36971 is fixed.
Future<Uint8List> consolidateHttpClientResponseBytes(
HttpClientResponse response, {
HttpClient client,
bool autoUncompress = true,
BytesReceivedCallback onBytesReceived,
}) {
assert(autoUncompress != null);
final Completer<Uint8List> completer = Completer<Uint8List>.sync(); final Completer<Uint8List> completer = Completer<Uint8List>.sync();
final List<List<int>> chunks = <List<int>>[];
int contentLength = 0; final _OutputBuffer output = _OutputBuffer();
response.listen((List<int> chunk) { ByteConversionSink sink = output;
chunks.add(chunk); int expectedContentLength = response.contentLength;
contentLength += chunk.length; if (response.headers?.value(HttpHeaders.contentEncodingHeader) == 'gzip') {
if (client?.autoUncompress ?? true) {
// response.contentLength will not match our bytes stream, so we declare
// that we don't know the expected content length.
expectedContentLength = -1;
} else if (autoUncompress) {
// We need to un-compress the bytes as they come in.
sink = gzip.decoder.startChunkedConversion(output);
}
}
int bytesReceived = 0;
StreamSubscription<List<int>> subscription;
subscription = response.listen((List<int> chunk) {
sink.add(chunk);
if (onBytesReceived != null) {
bytesReceived += chunk.length;
try {
onBytesReceived(bytesReceived, expectedContentLength);
} catch (error, stackTrace) {
completer.completeError(error, stackTrace);
subscription.cancel();
return;
}
}
}, onDone: () { }, onDone: () {
final Uint8List bytes = Uint8List(contentLength); sink.close();
completer.complete(output.bytes);
}, onError: completer.completeError, cancelOnError: true);
return completer.future;
}
class _OutputBuffer extends ByteConversionSinkBase {
List<List<int>> _chunks = <List<int>>[];
int _contentLength = 0;
Uint8List _bytes;
@override
void add(List<int> chunk) {
assert(_bytes == null);
_chunks.add(chunk);
_contentLength += chunk.length;
}
@override
void close() {
if (_bytes != null) {
// We've already been closed; this is a no-op
return;
}
_bytes = Uint8List(_contentLength);
int offset = 0; int offset = 0;
for (List<int> chunk in chunks) { for (List<int> chunk in _chunks) {
bytes.setRange(offset, offset + chunk.length, chunk); _bytes.setRange(offset, offset + chunk.length, chunk);
offset += chunk.length; offset += chunk.length;
} }
completer.complete(bytes); _chunks = null;
}, onError: completer.completeError, cancelOnError: true); }
return completer.future; Uint8List get bytes {
assert(_bytes != null);
return _bytes;
}
} }
...@@ -14,10 +14,17 @@ void main() { ...@@ -14,10 +14,17 @@ void main() {
group(consolidateHttpClientResponseBytes, () { group(consolidateHttpClientResponseBytes, () {
final List<int> chunkOne = <int>[0, 1, 2, 3, 4, 5]; final List<int> chunkOne = <int>[0, 1, 2, 3, 4, 5];
final List<int> chunkTwo = <int>[6, 7, 8, 9, 10]; final List<int> chunkTwo = <int>[6, 7, 8, 9, 10];
MockHttpClient client;
MockHttpClientResponse response; MockHttpClientResponse response;
MockHttpHeaders headers;
setUp(() { setUp(() {
client = MockHttpClient();
response = MockHttpClientResponse(); response = MockHttpClientResponse();
headers = MockHttpHeaders();
when(client.autoUncompress).thenReturn(true);
when(response.headers).thenReturn(headers);
when(headers.value(HttpHeaders.contentEncodingHeader)).thenReturn(null);
when(response.listen( when(response.listen(
any, any,
onDone: anyNamed('onDone'), onDone: anyNamed('onDone'),
...@@ -43,7 +50,7 @@ void main() { ...@@ -43,7 +50,7 @@ void main() {
when(response.contentLength) when(response.contentLength)
.thenReturn(chunkOne.length + chunkTwo.length); .thenReturn(chunkOne.length + chunkTwo.length);
final List<int> bytes = final List<int> bytes =
await consolidateHttpClientResponseBytes(response); await consolidateHttpClientResponseBytes(response, client: client);
expect(bytes, <int>[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); expect(bytes, <int>[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
}); });
...@@ -51,7 +58,7 @@ void main() { ...@@ -51,7 +58,7 @@ void main() {
test('Converts a compressed HttpClientResponse with contentLength to bytes', () async { test('Converts a compressed HttpClientResponse with contentLength to bytes', () async {
when(response.contentLength).thenReturn(chunkOne.length); when(response.contentLength).thenReturn(chunkOne.length);
final List<int> bytes = final List<int> bytes =
await consolidateHttpClientResponseBytes(response); await consolidateHttpClientResponseBytes(response, client: client);
expect(bytes, <int>[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); expect(bytes, <int>[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
}); });
...@@ -59,11 +66,31 @@ void main() { ...@@ -59,11 +66,31 @@ void main() {
test('Converts an HttpClientResponse without contentLength to bytes', () async { test('Converts an HttpClientResponse without contentLength to bytes', () async {
when(response.contentLength).thenReturn(-1); when(response.contentLength).thenReturn(-1);
final List<int> bytes = final List<int> bytes =
await consolidateHttpClientResponseBytes(response); await consolidateHttpClientResponseBytes(response, client: client);
expect(bytes, <int>[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); expect(bytes, <int>[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
}); });
test('Notifies onBytesReceived for every chunk of bytes', () async {
final int syntheticTotal = (chunkOne.length + chunkTwo.length) * 2;
when(response.contentLength).thenReturn(syntheticTotal);
final List<int> records = <int>[];
await consolidateHttpClientResponseBytes(
response,
client: client,
onBytesReceived: (int cumulative, int total) {
records.addAll(<int>[cumulative, total]);
},
);
expect(records, <int>[
chunkOne.length,
syntheticTotal,
chunkOne.length + chunkTwo.length,
syntheticTotal,
]);
});
test('forwards errors from HttpClientResponse', () async { test('forwards errors from HttpClientResponse', () async {
when(response.listen( when(response.listen(
any, any,
...@@ -87,10 +114,108 @@ void main() { ...@@ -87,10 +114,108 @@ void main() {
}); });
when(response.contentLength).thenReturn(-1); when(response.contentLength).thenReturn(-1);
expect(consolidateHttpClientResponseBytes(response), expect(consolidateHttpClientResponseBytes(response, client: client),
throwsA(isInstanceOf<Exception>())); throwsA(isInstanceOf<Exception>()));
}); });
test('Propagates error to Future return value if onBytesReceived throws', () async {
when(response.contentLength).thenReturn(-1);
final Future<List<int>> result = consolidateHttpClientResponseBytes(
response,
client: client,
onBytesReceived: (int cumulative, int total) {
throw 'misbehaving callback';
},
);
expect(result, throwsA(equals('misbehaving callback')));
});
group('when gzipped', () {
final List<int> gzipped = gzip.encode(chunkOne.followedBy(chunkTwo).toList());
final List<int> gzippedChunkOne = gzipped.sublist(0, gzipped.length ~/ 2);
final List<int> gzippedChunkTwo = gzipped.sublist(gzipped.length ~/ 2);
setUp(() {
when(headers.value(HttpHeaders.contentEncodingHeader)).thenReturn('gzip');
when(response.listen(
any,
onDone: anyNamed('onDone'),
onError: anyNamed('onError'),
cancelOnError: anyNamed('cancelOnError'),
)).thenAnswer((Invocation invocation) {
final void Function(List<int>) onData = invocation.positionalArguments[0];
final void Function(Object) onError = invocation.namedArguments[#onError];
final void Function() onDone = invocation.namedArguments[#onDone];
final bool cancelOnError = invocation.namedArguments[#cancelOnError];
return Stream<List<int>>.fromIterable(
<List<int>>[gzippedChunkOne, gzippedChunkTwo]).listen(
onData,
onDone: onDone,
onError: onError,
cancelOnError: cancelOnError,
);
});
});
test('Uncompresses GZIP bytes if autoUncompress is true and response.autoUncompress is false', () async {
when(client.autoUncompress).thenReturn(false);
when(response.contentLength).thenReturn(gzipped.length);
final List<int> bytes = await consolidateHttpClientResponseBytes(response, client: client);
expect(bytes, <int>[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
});
test('returns gzipped bytes if autoUncompress is false and response.autoUncompress is false', () async {
when(client.autoUncompress).thenReturn(false);
when(response.contentLength).thenReturn(gzipped.length);
final List<int> bytes = await consolidateHttpClientResponseBytes(response, client: client, autoUncompress: false);
expect(bytes, gzipped);
});
test('Notifies onBytesReceived with gzipped numbers', () async {
when(client.autoUncompress).thenReturn(false);
when(response.contentLength).thenReturn(gzipped.length);
final List<int> records = <int>[];
await consolidateHttpClientResponseBytes(
response,
client: client,
onBytesReceived: (int cumulative, int total) {
records.addAll(<int>[cumulative, total]);
},
);
expect(records, <int>[
gzippedChunkOne.length,
gzipped.length,
gzipped.length,
gzipped.length,
]);
});
test('Notifies onBytesReceived with expectedContentLength of -1 if response.autoUncompress is true', () async {
final int syntheticTotal = (chunkOne.length + chunkTwo.length) * 2;
when(response.contentLength).thenReturn(syntheticTotal);
final List<int> records = <int>[];
await consolidateHttpClientResponseBytes(
response,
client: client,
onBytesReceived: (int cumulative, int total) {
records.addAll(<int>[cumulative, total]);
},
);
expect(records, <int>[
gzippedChunkOne.length,
-1,
gzipped.length,
-1,
]);
});
});
}); });
} }
class MockHttpClient extends Mock implements HttpClient {}
class MockHttpClientResponse extends Mock implements HttpClientResponse {} class MockHttpClientResponse extends Mock implements HttpClientResponse {}
class MockHttpHeaders extends Mock implements HttpHeaders {}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment