Creates a new stream with each data event of this stream asynchronously mapped to a new event.
This acts like map
, except that convert
may return a Future
,
and in that case, the stream waits for that future to complete before
continuing with its result.
The returned stream is a broadcast stream if this stream is.
Source
Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) { StreamController<E> controller; StreamSubscription<T> subscription; void onListen() { final add = controller.add; assert(controller is _StreamController || controller is _BroadcastStreamController); final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/; final addError = eventSink._addError; subscription = this.listen((T event) { FutureOr<E> newValue; try { newValue = convert(event); } catch (e, s) { controller.addError(e, s); return; } if (newValue is Future<E>) { subscription.pause(); newValue .then(add, onError: addError) .whenComplete(subscription.resume); } else { controller.add(newValue as Object/*=E*/); } }, onError: addError, onDone: controller.close); } if (this.isBroadcast) { controller = new StreamController<E>.broadcast( onListen: onListen, onCancel: () { subscription.cancel(); }, sync: true); } else { controller = new StreamController<E>( onListen: onListen, onPause: () { subscription.pause(); }, onResume: () { subscription.resume(); }, onCancel: () => subscription.cancel(), sync: true); } return controller.stream; }