diff options
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueCleaner.cpp | 24 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/PollableQueue.h | 2 |
2 files changed, 15 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp index b252c1e8be..bf4b62c849 100644 --- a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp +++ b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp @@ -23,6 +23,7 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/sys/Timer.h" +#include "qpid/sys/Time.h" #include <boost/function.hpp> #include <boost/bind.hpp> @@ -73,22 +74,25 @@ void QueueCleaner::setTimer(qpid::sys::Timer* timer) { void QueueCleaner::fired() { + QPID_LOG(debug, "QueueCleaner::fired: requesting purge"); queues.eachQueue(boost::bind(&PurgeSet::push, &purging, _1)); - QPID_LOG(debug, "Requested purge of queues"); + task->restart(); // Update task restart time to now()+interval + timer->add(task); } QueueCleaner::QueuePtrs::const_iterator QueueCleaner::purge(const QueueCleaner::QueuePtrs& batch) { - for (QueuePtrs::const_iterator i = batch.begin(); i != batch.end(); ++i) { - (*i)->purgeExpired(period); - } - QPID_LOG(debug, "Purged " << batch.size() << " queues"); - if (purging.empty()) { - task->restart(); - timer->add(task); - QPID_LOG(debug, "Restarted purge timer"); + const sys::AbsTime tmoTime = sys::AbsTime(sys::AbsTime::now(), 1 * sys::TIME_SEC); + int nPurged = 0; + QueuePtrs::const_iterator batchItr = batch.begin(); + for ( ; batchItr != batch.end() && sys::AbsTime::now() < tmoTime; ++batchItr) { + task->restart(); // Update task restart time to now()+interval + (*batchItr)->purgeExpired(period); + nPurged++; } - return batch.end(); + QPID_LOG(debug, "QueueCleaner::purge: purged " << nPurged << " of " << batch.size() << " queues"); + task->restart(); // Update task restart time to now()+interval + return batchItr; } }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/sys/PollableQueue.h b/qpid/cpp/src/qpid/sys/PollableQueue.h index 03b9d0084d..5a3e281e9f 100644 --- a/qpid/cpp/src/qpid/sys/PollableQueue.h +++ b/qpid/cpp/src/qpid/sys/PollableQueue.h @@ -143,7 +143,7 @@ template <class T> void PollableQueue<T>::dispatch(PollableCondition& cond) { template <class T> void PollableQueue<T>::process() { // Called with lock held - while (!stopped && !queue.empty()) { + if (!stopped && !queue.empty()) { assert(batch.empty()); batch.swap(queue); typename Batch::const_iterator putBack; |
