Stream<E> asyncMap<E>(dynamic convert(T event))

Creates a new stream with each data event of this stream asynchronously mapped to a new event.

This acts like map, except that convert may return a Future, and in that case, the stream waits for that future to complete before continuing with its result.

The returned stream is a broadcast stream if this stream is.

Source

Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) {
  StreamController<E> controller;
  StreamSubscription<T> subscription;

  void onListen() {
    final add = controller.add;
    assert(controller is _StreamController ||
        controller is _BroadcastStreamController);
    final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/;
    final addError = eventSink._addError;
    subscription = this.listen((T event) {
      FutureOr<E> newValue;
      try {
        newValue = convert(event);
      } catch (e, s) {
        controller.addError(e, s);
        return;
      }
      if (newValue is Future<E>) {
        subscription.pause();
        newValue
            .then(add, onError: addError)
            .whenComplete(subscription.resume);
      } else {
        controller.add(newValue as Object/*=E*/);
      }
    }, onError: addError, onDone: controller.close);
  }

  if (this.isBroadcast) {
    controller = new StreamController<E>.broadcast(
        onListen: onListen,
        onCancel: () {
          subscription.cancel();
        },
        sync: true);
  } else {
    controller = new StreamController<E>(
        onListen: onListen,
        onPause: () {
          subscription.pause();
        },
        onResume: () {
          subscription.resume();
        },
        onCancel: () => subscription.cancel(),
        sync: true);
  }
  return controller.stream;
}