summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/broker/QueueCleaner.cpp24
-rw-r--r--qpid/cpp/src/qpid/sys/PollableQueue.h2
2 files changed, 15 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
index b252c1e8be..bf4b62c849 100644
--- a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
@@ -23,6 +23,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/sys/Timer.h"
+#include "qpid/sys/Time.h"
#include <boost/function.hpp>
#include <boost/bind.hpp>
@@ -73,22 +74,25 @@ void QueueCleaner::setTimer(qpid::sys::Timer* timer) {
void QueueCleaner::fired()
{
+ QPID_LOG(debug, "QueueCleaner::fired: requesting purge");
queues.eachQueue(boost::bind(&PurgeSet::push, &purging, _1));
- QPID_LOG(debug, "Requested purge of queues");
+ task->restart(); // Update task restart time to now()+interval
+ timer->add(task);
}
QueueCleaner::QueuePtrs::const_iterator QueueCleaner::purge(const QueueCleaner::QueuePtrs& batch)
{
- for (QueuePtrs::const_iterator i = batch.begin(); i != batch.end(); ++i) {
- (*i)->purgeExpired(period);
- }
- QPID_LOG(debug, "Purged " << batch.size() << " queues");
- if (purging.empty()) {
- task->restart();
- timer->add(task);
- QPID_LOG(debug, "Restarted purge timer");
+ const sys::AbsTime tmoTime = sys::AbsTime(sys::AbsTime::now(), 1 * sys::TIME_SEC);
+ int nPurged = 0;
+ QueuePtrs::const_iterator batchItr = batch.begin();
+ for ( ; batchItr != batch.end() && sys::AbsTime::now() < tmoTime; ++batchItr) {
+ task->restart(); // Update task restart time to now()+interval
+ (*batchItr)->purgeExpired(period);
+ nPurged++;
}
- return batch.end();
+ QPID_LOG(debug, "QueueCleaner::purge: purged " << nPurged << " of " << batch.size() << " queues");
+ task->restart(); // Update task restart time to now()+interval
+ return batchItr;
}
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/sys/PollableQueue.h b/qpid/cpp/src/qpid/sys/PollableQueue.h
index 03b9d0084d..5a3e281e9f 100644
--- a/qpid/cpp/src/qpid/sys/PollableQueue.h
+++ b/qpid/cpp/src/qpid/sys/PollableQueue.h
@@ -143,7 +143,7 @@ template <class T> void PollableQueue<T>::dispatch(PollableCondition& cond) {
template <class T> void PollableQueue<T>::process() {
// Called with lock held
- while (!stopped && !queue.empty()) {
+ if (!stopped && !queue.empty()) {
assert(batch.empty());
batch.swap(queue);
typename Batch::const_iterator putBack;