consolidate_response.dart 4.6 KB
Newer Older
1 2 3 4 5
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

import 'dart:async';
6
import 'dart:convert';
7 8 9
import 'dart:io';
import 'dart:typed_data';

10 11
/// Signature for getting notified when chunks of bytes are received while
/// consolidating the bytes of an [HttpClientResponse] into a [Uint8List].
12
///
13 14 15 16 17 18
/// The `cumulative` parameter will contain the total number of bytes received
/// thus far. If the response has been gzipped, this number will be the number
/// of compressed bytes that have been received _across the wire_.
///
/// The `total` parameter will contain the _expected_ total number of bytes to
/// be received across the wire (extracted from the value of the
19
/// `Content-Length` HTTP response header), or null if the size of the response
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
/// body is not known in advance (this is common for HTTP chunked transfer
/// encoding, which itself is common when a large amount of data is being
/// returned to the client and the total size of the response may not be known
/// until the request has been fully processed).
///
/// This is used in [consolidateHttpClientResponseBytes].
typedef BytesReceivedCallback = void Function(int cumulative, int total);

/// Efficiently converts the response body of an [HttpClientResponse] into a
/// [Uint8List].
///
/// The future returned will forward any error emitted by `response`.
///
/// The `onBytesReceived` callback, if specified, will be invoked for every
/// chunk of bytes that is received while consolidating the response bytes.
/// If the callback throws an error, processing of the response will halt, and
/// the returned future will complete with the error that was thrown by the
/// callback. For more information on how to interpret the parameters to the
/// callback, see the documentation on [BytesReceivedCallback].
///
/// If the `response` is gzipped and the `autoUncompress` parameter is true,
/// this will automatically un-compress the bytes in the returned list if it
/// hasn't already been done via [HttpClient.autoUncompress]. To get compressed
/// bytes from this method (assuming the response is sending compressed bytes),
/// set both [HttpClient.autoUncompress] to false and the `autoUncompress`
/// parameter to false.
Future<Uint8List> consolidateHttpClientResponseBytes(
  HttpClientResponse response, {
  bool autoUncompress = true,
  BytesReceivedCallback onBytesReceived,
}) {
  assert(autoUncompress != null);
52
  final Completer<Uint8List> completer = Completer<Uint8List>.sync();
53 54 55 56

  final _OutputBuffer output = _OutputBuffer();
  ByteConversionSink sink = output;
  int expectedContentLength = response.contentLength;
57 58
  if (expectedContentLength == -1)
    expectedContentLength = null;
59 60 61 62 63 64 65 66
  switch (response.compressionState) {
    case HttpClientResponseCompressionState.compressed:
      if (autoUncompress) {
        // We need to un-compress the bytes as they come in.
        sink = gzip.decoder.startChunkedConversion(output);
      }
      break;
    case HttpClientResponseCompressionState.decompressed:
67 68
      // response.contentLength will not match our bytes stream, so we declare
      // that we don't know the expected content length.
69
      expectedContentLength = null;
70 71 72 73
      break;
    case HttpClientResponseCompressionState.notCompressed:
      // Fall-through.
      break;
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
  }

  int bytesReceived = 0;
  StreamSubscription<List<int>> subscription;
  subscription = response.listen((List<int> chunk) {
    sink.add(chunk);
    if (onBytesReceived != null) {
      bytesReceived += chunk.length;
      try {
        onBytesReceived(bytesReceived, expectedContentLength);
      } catch (error, stackTrace) {
        completer.completeError(error, stackTrace);
        subscription.cancel();
        return;
      }
    }
90
  }, onDone: () {
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
    sink.close();
    completer.complete(output.bytes);
  }, onError: completer.completeError, cancelOnError: true);

  return completer.future;
}

class _OutputBuffer extends ByteConversionSinkBase {
  List<List<int>> _chunks = <List<int>>[];
  int _contentLength = 0;
  Uint8List _bytes;

  @override
  void add(List<int> chunk) {
    assert(_bytes == null);
    _chunks.add(chunk);
    _contentLength += chunk.length;
  }

  @override
  void close() {
    if (_bytes != null) {
      // We've already been closed; this is a no-op
      return;
    }
    _bytes = Uint8List(_contentLength);
117
    int offset = 0;
118 119
    for (List<int> chunk in _chunks) {
      _bytes.setRange(offset, offset + chunk.length, chunk);
120
      offset += chunk.length;
121
    }
122 123
    _chunks = null;
  }
124

125 126 127 128
  Uint8List get bytes {
    assert(_bytes != null);
    return _bytes;
  }
129
}