debounce_data_stream.dart 2.02 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

import 'dart:async';
import 'dart:typed_data';

/// Merges the values in a stream that are sent less than [duration] apart.
///
/// To minimize latency, the merged stream will always emit the first value that
/// is sent after a pause of at least [duration] long. After the first message,
/// all values that are sent within [duration] will be merged into one.
Stream<Uint8List> debounceDataStream(Stream<Uint8List> stream, [Duration duration = const Duration(milliseconds: 100)]) {
  final StreamController<Uint8List> controller = StreamController<Uint8List>();
  final BytesBuilder buffer = BytesBuilder(copy: false);

  bool isDone = false;
  Timer? timer;

  // Called when timer triggers, sends out the buffered messages.
  void onTimer() {
    if (buffer.isNotEmpty) {
      controller.add(buffer.toBytes());
      buffer.clear();
      if (isDone) {
        controller.close();
      } else {
        // Start another timer even if we have nothing to send right now, so
        // that outgoing messages are at least [duration] apart.
        timer = Timer(duration, onTimer);
      }
    } else {
      timer = null;
    }
  }

  controller.onListen = () {
    final StreamSubscription<Uint8List> subscription = stream.listen((Uint8List data) {
      if (timer == null) {
        controller.add(data);
        // Start the timer to make sure that the next message is at least [duration] apart.
        timer = Timer(duration, onTimer);
      } else {
        buffer.add(data);
      }
    }, onError: (Object error, StackTrace stackTrace) {
      // Forward the error.
      controller.addError(error, stackTrace);
    }, onDone: () {
      isDone = true;
      // Delay closing the channel if we still have buffered data.
      if (timer == null) {
        controller.close();
      }
    });

    controller.onCancel = () {
      subscription.cancel();
    };
  };

  return controller.stream;
}