1. @override
Future onInvoke(Map<String, dynamic> params)

This is called when this node is invoked. You can return the following types from this method:

  • Iterable
  • Map
  • Table
  • Stream
  • SimpleTableResult
  • AsyncTableResult

You can also return a future that resolves to one (like if the method is async) of the following types:

  • Stream
  • Iterable
  • Map
  • Table

Source

@override
onInvoke(Map<String, dynamic> params) async* {
  String range = params["Timerange"];
  String rollupName = params["Rollup"];
  RollupFactory rollupFactory = _rollups[rollupName];
  Rollup rollup = rollupFactory == null ? null : rollupFactory();
  Duration interval = new Duration(
    milliseconds: parseInterval(params["Interval"]));
  num batchSize = params["Batch Size"];

  if (batchSize == null) {
    batchSize = 0;
  }

  int batchCount = batchSize.toInt();

  TimeRange tr = parseTimeRange(range);
  if (params["Real Time"] == true) {
    tr = new TimeRange(tr.start, null);
  }

  try {
    Stream<ValuePair> pairs = await calculateHistory(
      tr,
      interval,
      rollup
    );

    if (params["Real Time"] == true) {
      await for (ValuePair pair in pairs) {
        yield [pair.toRow()];
      }
    } else {
      int count = 0;
      List<List<dynamic>> buffer = [];

      await for (ValuePair row in pairs) {
        count++;
        buffer.add(row.toRow());
        if (count != 0 && count == batchCount) {
          yield buffer;
          buffer = [];
          count = 0;
        }
      }

      if (buffer.isNotEmpty) {
        yield buffer;
        buffer.length = 0;
      }
    }
  } catch (e) {
    rethrow;
  }
}