diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp | 11 |
1 files changed, 3 insertions, 8 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index 21c7a2a737..3494288f7b 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -144,11 +144,6 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg) } } - /** @todo KAG: - REMOVE ONCE STABLE */ - if (index.find(msg.payload) != index.end()) { - QPID_LOG(error, "Queue \"" << queueName << "\": has enqueued a msg twice: " << msg.position); - } - if (flowStopped || !index.empty()) { // ignore flow control if we are populating the queue due to cluster replication: if (broker && broker->isClusterUpdatee()) { @@ -156,7 +151,7 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg) return; } QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position); - msg.payload->getIngressCompletion()->startCompleter(); // don't complete until flow resumes + msg.payload->getIngressCompletion().startCompleter(); // don't complete until flow resumes index.insert(msg.payload); } } @@ -196,14 +191,14 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg) // flow enabled - release all pending msgs while (!index.empty()) { std::set< boost::intrusive_ptr<Message> >::iterator itr = index.begin(); - (*itr)->getIngressCompletion()->finishCompleter(); + (*itr)->getIngressCompletion().finishCompleter(); index.erase(itr); } } else { // even if flow controlled, we must release this msg as it is being dequeued std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload); if (itr != index.end()) { // this msg is flow controlled, release it: - (*itr)->getIngressCompletion()->finishCompleter(); + (*itr)->getIngressCompletion().finishCompleter(); index.erase(itr); } } |