Stream<ValueUpdate> onValueChange(String path, { int cacheLevel: 1 })

Source

Stream<ValueUpdate> onValueChange(String path, {int cacheLevel: 1}) {
  RespSubscribeListener listener;
  StreamController<ValueUpdate> controller;
  int subs = 0;
  controller = new StreamController<ValueUpdate>.broadcast(onListen: () {
    subs++;
    if (listener == null) {
      listener = this[path].subscribe((ValueUpdate update) {
        controller.add(update);
      }, cacheLevel);
    }
  }, onCancel: () {
    subs--;
    if (subs == 0) {
      listener.cancel();
      listener = null;
    }
  });
  return controller.stream;
}