summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorCharles E. Rolke <chug@apache.org>2014-12-01 13:32:48 +0000
committerCharles E. Rolke <chug@apache.org>2014-12-01 13:32:48 +0000
commitccc674dda3f4975199f403fba048aa473ce366dd (patch)
tree8638b3cdbd1c551a5e4510a5624b9d003ee7235a /qpid/cpp
parent8e3c214a4650eb427d91ccde5ff2b28d242b1029 (diff)
downloadqpid-python-ccc674dda3f4975199f403fba048aa473ce366dd.tar.gz
QPID-6213: qpidd misses heartbeats
* Pollable queue breaks when client does not process whole batch. * QueueCleaner must not reschedule same task multiple times. * QueueCleaner breaks out of batch processing on wall clock time interval. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1642681 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/QueueCleaner.cpp24
-rw-r--r--qpid/cpp/src/qpid/sys/PollableQueue.h2
2 files changed, 15 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
index b252c1e8be..bf4b62c849 100644
--- a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
@@ -23,6 +23,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/sys/Timer.h"
+#include "qpid/sys/Time.h"
#include <boost/function.hpp>
#include <boost/bind.hpp>
@@ -73,22 +74,25 @@ void QueueCleaner::setTimer(qpid::sys::Timer* timer) {
void QueueCleaner::fired()
{
+ QPID_LOG(debug, "QueueCleaner::fired: requesting purge");
queues.eachQueue(boost::bind(&PurgeSet::push, &purging, _1));
- QPID_LOG(debug, "Requested purge of queues");
+ task->restart(); // Update task restart time to now()+interval
+ timer->add(task);
}
QueueCleaner::QueuePtrs::const_iterator QueueCleaner::purge(const QueueCleaner::QueuePtrs& batch)
{
- for (QueuePtrs::const_iterator i = batch.begin(); i != batch.end(); ++i) {
- (*i)->purgeExpired(period);
- }
- QPID_LOG(debug, "Purged " << batch.size() << " queues");
- if (purging.empty()) {
- task->restart();
- timer->add(task);
- QPID_LOG(debug, "Restarted purge timer");
+ const sys::AbsTime tmoTime = sys::AbsTime(sys::AbsTime::now(), 1 * sys::TIME_SEC);
+ int nPurged = 0;
+ QueuePtrs::const_iterator batchItr = batch.begin();
+ for ( ; batchItr != batch.end() && sys::AbsTime::now() < tmoTime; ++batchItr) {
+ task->restart(); // Update task restart time to now()+interval
+ (*batchItr)->purgeExpired(period);
+ nPurged++;
}
- return batch.end();
+ QPID_LOG(debug, "QueueCleaner::purge: purged " << nPurged << " of " << batch.size() << " queues");
+ task->restart(); // Update task restart time to now()+interval
+ return batchItr;
}
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/sys/PollableQueue.h b/qpid/cpp/src/qpid/sys/PollableQueue.h
index 03b9d0084d..5a3e281e9f 100644
--- a/qpid/cpp/src/qpid/sys/PollableQueue.h
+++ b/qpid/cpp/src/qpid/sys/PollableQueue.h
@@ -143,7 +143,7 @@ template <class T> void PollableQueue<T>::dispatch(PollableCondition& cond) {
template <class T> void PollableQueue<T>::process() {
// Called with lock held
- while (!stopped && !queue.empty()) {
+ if (!stopped && !queue.empty()) {
assert(batch.empty());
batch.swap(queue);
typename Batch::const_iterator putBack;