diff options
| author | Charles E. Rolke <chug@apache.org> | 2014-12-01 13:32:48 +0000 |
|---|---|---|
| committer | Charles E. Rolke <chug@apache.org> | 2014-12-01 13:32:48 +0000 |
| commit | ccc674dda3f4975199f403fba048aa473ce366dd (patch) | |
| tree | 8638b3cdbd1c551a5e4510a5624b9d003ee7235a | |
| parent | 8e3c214a4650eb427d91ccde5ff2b28d242b1029 (diff) | |
| download | qpid-python-ccc674dda3f4975199f403fba048aa473ce366dd.tar.gz | |
QPID-6213: qpidd misses heartbeats
* Pollable queue breaks when client does not process whole batch.
* QueueCleaner must not reschedule same task multiple times.
* QueueCleaner breaks out of batch processing on wall clock time interval.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1642681 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueCleaner.cpp | 24 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/PollableQueue.h | 2 |
2 files changed, 15 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp index b252c1e8be..bf4b62c849 100644 --- a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp +++ b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp @@ -23,6 +23,7 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/sys/Timer.h" +#include "qpid/sys/Time.h" #include <boost/function.hpp> #include <boost/bind.hpp> @@ -73,22 +74,25 @@ void QueueCleaner::setTimer(qpid::sys::Timer* timer) { void QueueCleaner::fired() { + QPID_LOG(debug, "QueueCleaner::fired: requesting purge"); queues.eachQueue(boost::bind(&PurgeSet::push, &purging, _1)); - QPID_LOG(debug, "Requested purge of queues"); + task->restart(); // Update task restart time to now()+interval + timer->add(task); } QueueCleaner::QueuePtrs::const_iterator QueueCleaner::purge(const QueueCleaner::QueuePtrs& batch) { - for (QueuePtrs::const_iterator i = batch.begin(); i != batch.end(); ++i) { - (*i)->purgeExpired(period); - } - QPID_LOG(debug, "Purged " << batch.size() << " queues"); - if (purging.empty()) { - task->restart(); - timer->add(task); - QPID_LOG(debug, "Restarted purge timer"); + const sys::AbsTime tmoTime = sys::AbsTime(sys::AbsTime::now(), 1 * sys::TIME_SEC); + int nPurged = 0; + QueuePtrs::const_iterator batchItr = batch.begin(); + for ( ; batchItr != batch.end() && sys::AbsTime::now() < tmoTime; ++batchItr) { + task->restart(); // Update task restart time to now()+interval + (*batchItr)->purgeExpired(period); + nPurged++; } - return batch.end(); + QPID_LOG(debug, "QueueCleaner::purge: purged " << nPurged << " of " << batch.size() << " queues"); + task->restart(); // Update task restart time to now()+interval + return batchItr; } }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/sys/PollableQueue.h b/qpid/cpp/src/qpid/sys/PollableQueue.h index 03b9d0084d..5a3e281e9f 100644 --- a/qpid/cpp/src/qpid/sys/PollableQueue.h +++ b/qpid/cpp/src/qpid/sys/PollableQueue.h @@ -143,7 +143,7 @@ template <class T> void PollableQueue<T>::dispatch(PollableCondition& cond) { template <class T> void PollableQueue<T>::process() { // Called with lock held - while (!stopped && !queue.empty()) { + if (!stopped && !queue.empty()) { assert(batch.empty()); batch.swap(queue); typename Batch::const_iterator putBack; |
