consolidate_response.dart 4.62 KB
Newer Older
Ian Hickson's avatar
Ian Hickson committed
1
// Copyright 2014 The Flutter Authors. All rights reserved.
2 3 4 5
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

import 'dart:async';
6
import 'dart:convert';
7 8 9
import 'dart:io';
import 'dart:typed_data';

10 11
/// Signature for getting notified when chunks of bytes are received while
/// consolidating the bytes of an [HttpClientResponse] into a [Uint8List].
12
///
13 14 15 16 17 18
/// The `cumulative` parameter will contain the total number of bytes received
/// thus far. If the response has been gzipped, this number will be the number
/// of compressed bytes that have been received _across the wire_.
///
/// The `total` parameter will contain the _expected_ total number of bytes to
/// be received across the wire (extracted from the value of the
19
/// `Content-Length` HTTP response header), or null if the size of the response
20 21 22 23 24
/// body is not known in advance (this is common for HTTP chunked transfer
/// encoding, which itself is common when a large amount of data is being
/// returned to the client and the total size of the response may not be known
/// until the request has been fully processed).
///
25
/// This is used in [consolidateHttpClientResponseBytes].
26
typedef BytesReceivedCallback = void Function(int cumulative, int? total);
27 28

/// Efficiently converts the response body of an [HttpClientResponse] into a
29
/// [Uint8List].
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
///
/// The future returned will forward any error emitted by `response`.
///
/// The `onBytesReceived` callback, if specified, will be invoked for every
/// chunk of bytes that is received while consolidating the response bytes.
/// If the callback throws an error, processing of the response will halt, and
/// the returned future will complete with the error that was thrown by the
/// callback. For more information on how to interpret the parameters to the
/// callback, see the documentation on [BytesReceivedCallback].
///
/// If the `response` is gzipped and the `autoUncompress` parameter is true,
/// this will automatically un-compress the bytes in the returned list if it
/// hasn't already been done via [HttpClient.autoUncompress]. To get compressed
/// bytes from this method (assuming the response is sending compressed bytes),
/// set both [HttpClient.autoUncompress] to false and the `autoUncompress`
/// parameter to false.
46
Future<Uint8List> consolidateHttpClientResponseBytes(
47 48
  HttpClientResponse response, {
  bool autoUncompress = true,
49
  BytesReceivedCallback? onBytesReceived,
50 51
}) {
  assert(autoUncompress != null);
52
  final Completer<Uint8List> completer = Completer<Uint8List>.sync();
53 54 55

  final _OutputBuffer output = _OutputBuffer();
  ByteConversionSink sink = output;
56
  int? expectedContentLength = response.contentLength;
57 58
  if (expectedContentLength == -1)
    expectedContentLength = null;
59 60 61 62 63 64 65 66
  switch (response.compressionState) {
    case HttpClientResponseCompressionState.compressed:
      if (autoUncompress) {
        // We need to un-compress the bytes as they come in.
        sink = gzip.decoder.startChunkedConversion(output);
      }
      break;
    case HttpClientResponseCompressionState.decompressed:
67 68
      // response.contentLength will not match our bytes stream, so we declare
      // that we don't know the expected content length.
69
      expectedContentLength = null;
70 71 72 73
      break;
    case HttpClientResponseCompressionState.notCompressed:
      // Fall-through.
      break;
74 75 76
  }

  int bytesReceived = 0;
77
  late final StreamSubscription<List<int>> subscription;
78 79 80 81 82 83 84 85 86 87 88 89
  subscription = response.listen((List<int> chunk) {
    sink.add(chunk);
    if (onBytesReceived != null) {
      bytesReceived += chunk.length;
      try {
        onBytesReceived(bytesReceived, expectedContentLength);
      } catch (error, stackTrace) {
        completer.completeError(error, stackTrace);
        subscription.cancel();
        return;
      }
    }
90
  }, onDone: () {
91
    sink.close();
92
    completer.complete(output.bytes);
93 94 95 96 97 98
  }, onError: completer.completeError, cancelOnError: true);

  return completer.future;
}

class _OutputBuffer extends ByteConversionSinkBase {
99
  List<List<int>>? _chunks = <List<int>>[];
100
  int _contentLength = 0;
101
  Uint8List? _bytes;
102 103 104

  @override
  void add(List<int> chunk) {
105
    assert(_bytes == null);
106
    _chunks!.add(chunk);
107
    _contentLength += chunk.length;
108 109 110
  }

  @override
111 112 113 114 115 116 117
  void close() {
    if (_bytes != null) {
      // We've already been closed; this is a no-op
      return;
    }
    _bytes = Uint8List(_contentLength);
    int offset = 0;
118 119
    for (final List<int> chunk in _chunks!) {
      _bytes!.setRange(offset, offset + chunk.length, chunk);
120 121 122 123 124 125 126
      offset += chunk.length;
    }
    _chunks = null;
  }

  Uint8List get bytes {
    assert(_bytes != null);
127
    return _bytes!;
128
  }
129
}