void startSendingData(int currentTime, int waitingAckId)

Source

void startSendingData(int currentTime, int waitingAckId) {
  _pendingSending = false;

  if (waitingAckId != -1) {
    _waitingAckCount++;
    _lastWatingAckId = waitingAckId;
  }

  if (requester.connection == null) {
    return;
  }
  List toAdd = [];

  HashSet<String> processingPaths = _changedPaths;
  _changedPaths = new HashSet<String>();
  for (String path in processingPaths) {
    if (subscriptions.containsKey(path)) {
      ReqSubscribeController sub = subscriptions[path];
      Map m = {'path': path, 'sid': sub.sid};
      if (sub.currentQos > 0) {
        m['qos'] = sub.currentQos;
      }
      toAdd.add(m);
    }
  }
  if (!toAdd.isEmpty) {
    requester._sendRequest({'method': 'subscribe', 'paths': toAdd}, null);
  }
  if (!toRemove.isEmpty) {
    List removeSids = [];
    toRemove.forEach((int sid, ReqSubscribeController sub) {
      if (sub.callbacks.isEmpty) {
        removeSids.add(sid);
        subscriptions.remove(sub.node.remotePath);
        subscriptionIds.remove(sub.sid);
        sub._destroy();
      }
    });
    requester._sendRequest(
        {'method': 'unsubscribe', 'sids': removeSids}, null);
    toRemove.clear();
  }
}