- @override
This is called when this node is invoked. You can return the following types from this method:
IterableMapTableStreamSimpleTableResultAsyncTableResult
You can also return a future that resolves to one (like if the method is async) of the following types:
StreamIterableMapTable
Source
@override
onInvoke(Map<String, dynamic> params) async* {
String range = params["Timerange"];
String rollupName = params["Rollup"];
RollupFactory rollupFactory = _rollups[rollupName];
Rollup rollup = rollupFactory == null ? null : rollupFactory();
Duration interval = new Duration(
milliseconds: parseInterval(params["Interval"]));
num batchSize = params["Batch Size"];
if (batchSize == null) {
batchSize = 0;
}
int batchCount = batchSize.toInt();
TimeRange tr = parseTimeRange(range);
if (params["Real Time"] == true) {
tr = new TimeRange(tr.start, null);
}
try {
Stream<ValuePair> pairs = await calculateHistory(
tr,
interval,
rollup
);
if (params["Real Time"] == true) {
await for (ValuePair pair in pairs) {
yield [pair.toRow()];
}
} else {
int count = 0;
List<List<dynamic>> buffer = [];
await for (ValuePair row in pairs) {
count++;
buffer.add(row.toRow());
if (count != 0 && count == batchCount) {
yield buffer;
buffer = [];
count = 0;
}
}
if (buffer.isNotEmpty) {
yield buffer;
buffer.length = 0;
}
}
} catch (e) {
rethrow;
}
}