summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2014-05-13 13:03:08 +0000
committerGordon Sim <gsim@apache.org>2014-05-13 13:03:08 +0000
commitc6774b45a316b499e0593b54a1c088ceca65265a (patch)
tree4376d330d715104d202c6f248d39a3be03bf69a6 /qpid/cpp/src
parent34a94a083a7219c1672bcbca8ee58227c6c8205f (diff)
downloadqpid-python-c6774b45a316b499e0593b54a1c088ceca65265a.tar.gz
QPID-5758: Move purging of expired messages from timer thread to worker thread
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1594220 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/QueueCleaner.cpp32
-rw-r--r--qpid/cpp/src/qpid/broker/QueueCleaner.h11
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp7
4 files changed, 31 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index cd62523fc1..da45d4bef9 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -234,7 +234,7 @@ Broker::Broker(const Broker::Options& conf) :
conf.replayFlushLimit*1024, // convert kb to bytes.
conf.replayHardLimit*1024),
*this),
- queueCleaner(queues, timer.get()),
+ queueCleaner(queues, poller, timer.get()),
recoveryInProgress(false),
expiryPolicy(new ExpiryPolicy),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
diff --git a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
index 8d9e3f43dd..8ae733a1ff 100644
--- a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
@@ -48,10 +48,15 @@ namespace {
fireFunction();
}
}
-QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer* t) : queues(q), timer(t) {}
+QueueCleaner::QueueCleaner(QueueRegistry& q, boost::shared_ptr<sys::Poller> p, sys::Timer* t)
+ : queues(q), timer(t), purging(boost::bind(&QueueCleaner::purge, this, _1), p)
+{
+ purging.start();
+}
QueueCleaner::~QueueCleaner()
{
+ purging.stop();
if (task) task->cancel();
}
@@ -66,28 +71,19 @@ void QueueCleaner::setTimer(qpid::sys::Timer* timer) {
this->timer = timer;
}
-namespace {
-struct CollectQueues
+void QueueCleaner::fired()
{
- std::vector<Queue::shared_ptr>* queues;
- CollectQueues(std::vector<Queue::shared_ptr>* q) : queues(q) {}
- void operator()(Queue::shared_ptr q)
- {
- queues->push_back(q);
- }
-};
+ queues.eachQueue(boost::bind(&PurgeSet::push, &purging, _1));
}
-void QueueCleaner::fired()
+QueueCleaner::QueuePtrs::const_iterator QueueCleaner::purge(const QueueCleaner::QueuePtrs& batch)
{
- //collect copy of list of queues to avoid holding registry lock while we perform purge
- std::vector<Queue::shared_ptr> copy;
- CollectQueues collect(&copy);
- queues.eachQueue(collect);
- std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, _1, period));
- task->setupNextFire();
+ for (QueuePtrs::const_iterator i = batch.begin(); i != batch.end(); ++i) {
+ (*i)->purgeExpired(period);
+ }
+ task->restart();
timer->add(task);
+ return batch.end();
}
-
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/QueueCleaner.h b/qpid/cpp/src/qpid/broker/QueueCleaner.h
index 896af1dcd5..499326460f 100644
--- a/qpid/cpp/src/qpid/broker/QueueCleaner.h
+++ b/qpid/cpp/src/qpid/broker/QueueCleaner.h
@@ -23,9 +23,11 @@
*/
#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/sys/PollableQueue.h"
#include "qpid/sys/Time.h"
#include <boost/intrusive_ptr.hpp>
+#include <boost/shared_ptr.hpp>
namespace qpid {
@@ -36,6 +38,7 @@ namespace sys {
namespace broker {
+class Queue;
class QueueRegistry;
/**
* TimerTask to purge expired messages from queues
@@ -43,18 +46,24 @@ class QueueRegistry;
class QueueCleaner
{
public:
- QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer* timer);
+ QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, boost::shared_ptr<sys::Poller>, sys::Timer* timer);
QPID_BROKER_EXTERN ~QueueCleaner();
QPID_BROKER_EXTERN void start(sys::Duration period);
QPID_BROKER_EXTERN void setTimer(sys::Timer* timer);
private:
+ typedef boost::shared_ptr<Queue> QueuePtr;
+ typedef std::deque< QueuePtr > QueuePtrs;
+ typedef qpid::sys::PollableQueue< QueuePtr > PurgeSet;
boost::intrusive_ptr<sys::TimerTask> task;
QueueRegistry& queues;
sys::Timer* timer;
sys::Duration period;
+ PurgeSet purging;
void fired();
+ QueuePtrs::const_iterator purge(const QueuePtrs&);
+
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index 0fd35a000b..c44483f8dd 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -40,6 +40,7 @@
#include "qpid/framing/reply_exceptions.h"
#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/broker/QueueSettings.h"
+#include "qpid/sys/Thread.h"
#include "qpid/sys/Timer.h"
#include <iostream>
@@ -202,18 +203,22 @@ QPID_AUTO_TEST_CASE(testPurgeExpired) {
}
QPID_AUTO_TEST_CASE(testQueueCleaner) {
+ boost::shared_ptr<Poller> poller(new Poller);
+ Thread runner(poller.get());
Timer timer;
QueueRegistry queues;
Queue::shared_ptr queue = queues.declare("my-queue", QueueSettings()).first;
addMessagesToQueue(10, *queue, 200, 400);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u);
- QueueCleaner cleaner(queues, &timer);
+ QueueCleaner cleaner(queues, poller, &timer);
cleaner.start(100 * qpid::sys::TIME_MSEC);
::usleep(300*1000);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u);
::usleep(300*1000);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u);
+ poller->shutdown();
+ runner.join();
}
namespace {
int getIntProperty(const Message& message, const std::string& key)