From ccc674dda3f4975199f403fba048aa473ce366dd Mon Sep 17 00:00:00 2001 From: "Charles E. Rolke" Date: Mon, 1 Dec 2014 13:32:48 +0000 Subject: QPID-6213: qpidd misses heartbeats * Pollable queue breaks when client does not process whole batch. * QueueCleaner must not reschedule same task multiple times. * QueueCleaner breaks out of batch processing on wall clock time interval. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1642681 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/QueueCleaner.cpp | 24 ++++++++++++++---------- qpid/cpp/src/qpid/sys/PollableQueue.h | 2 +- 2 files changed, 15 insertions(+), 11 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp index b252c1e8be..bf4b62c849 100644 --- a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp +++ b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp @@ -23,6 +23,7 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/sys/Timer.h" +#include "qpid/sys/Time.h" #include #include @@ -73,22 +74,25 @@ void QueueCleaner::setTimer(qpid::sys::Timer* timer) { void QueueCleaner::fired() { + QPID_LOG(debug, "QueueCleaner::fired: requesting purge"); queues.eachQueue(boost::bind(&PurgeSet::push, &purging, _1)); - QPID_LOG(debug, "Requested purge of queues"); + task->restart(); // Update task restart time to now()+interval + timer->add(task); } QueueCleaner::QueuePtrs::const_iterator QueueCleaner::purge(const QueueCleaner::QueuePtrs& batch) { - for (QueuePtrs::const_iterator i = batch.begin(); i != batch.end(); ++i) { - (*i)->purgeExpired(period); - } - QPID_LOG(debug, "Purged " << batch.size() << " queues"); - if (purging.empty()) { - task->restart(); - timer->add(task); - QPID_LOG(debug, "Restarted purge timer"); + const sys::AbsTime tmoTime = sys::AbsTime(sys::AbsTime::now(), 1 * sys::TIME_SEC); + int nPurged = 0; + QueuePtrs::const_iterator batchItr = batch.begin(); + for ( ; batchItr != batch.end() && sys::AbsTime::now() < tmoTime; ++batchItr) { + task->restart(); // Update task restart time to now()+interval + (*batchItr)->purgeExpired(period); + nPurged++; } - return batch.end(); + QPID_LOG(debug, "QueueCleaner::purge: purged " << nPurged << " of " << batch.size() << " queues"); + task->restart(); // Update task restart time to now()+interval + return batchItr; } }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/sys/PollableQueue.h b/qpid/cpp/src/qpid/sys/PollableQueue.h index 03b9d0084d..5a3e281e9f 100644 --- a/qpid/cpp/src/qpid/sys/PollableQueue.h +++ b/qpid/cpp/src/qpid/sys/PollableQueue.h @@ -143,7 +143,7 @@ template void PollableQueue::dispatch(PollableCondition& cond) { template void PollableQueue::process() { // Called with lock held - while (!stopped && !queue.empty()) { + if (!stopped && !queue.empty()) { assert(batch.empty()); batch.swap(queue); typename Batch::const_iterator putBack; -- cgit v1.2.1