Source
void addValue(ValueUpdate val) {
val = val.cloneForAckQueue();
if (_caching && _isCacheValid) {
lastValues.add(val);
bool needClearQueue = (lastValues.length > response.responder.maxCacheLength);
if (!needClearQueue && !cachingQueue && response._sendingAfterAck && lastValues.length > 1) {
needClearQueue = true;
}
if (needClearQueue) {
// cache is no longer valid, fallback to rollup mode
_isCacheValid = false;
lastValue = new ValueUpdate(null, ts: '');
for (ValueUpdate update in lastValues) {
lastValue.mergeAdd(update);
}
lastValues.length = 0;
if (_qosLevel > 0) {
if (_storage != null) {
_storage.setValue(waitingValues, lastValue);
}
waitingValues
..clear()
..add(lastValue);
}
} else {
lastValue = val;
if (_qosLevel > 0) {
waitingValues.add(lastValue);
if (_storage != null) {
_storage.addValue(lastValue);
}
}
}
} else {
if (lastValue != null) {
lastValue = new ValueUpdate.merge(lastValue, val);
} else {
lastValue = val;
}
if (_qosLevel > 0) {
if (_storage != null) {
_storage.setValue(waitingValues, lastValue);
}
waitingValues
..clear()
..add(lastValue);
}
}
// TODO, don't allow this to be called from same controller more often than 100ms
// the first response can happen ASAP, but
if (_permitted && sid > -1) {
response.subscriptionChanged(this);
}
}