diff options
| author | Gordon Sim <gsim@apache.org> | 2014-05-13 13:03:08 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2014-05-13 13:03:08 +0000 |
| commit | c6774b45a316b499e0593b54a1c088ceca65265a (patch) | |
| tree | 4376d330d715104d202c6f248d39a3be03bf69a6 /qpid/cpp/src | |
| parent | 34a94a083a7219c1672bcbca8ee58227c6c8205f (diff) | |
| download | qpid-python-c6774b45a316b499e0593b54a1c088ceca65265a.tar.gz | |
QPID-5758: Move purging of expired messages from timer thread to worker thread
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1594220 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueCleaner.cpp | 32 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueCleaner.h | 11 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 7 |
4 files changed, 31 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index cd62523fc1..da45d4bef9 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -234,7 +234,7 @@ Broker::Broker(const Broker::Options& conf) : conf.replayFlushLimit*1024, // convert kb to bytes. conf.replayHardLimit*1024), *this), - queueCleaner(queues, timer.get()), + queueCleaner(queues, poller, timer.get()), recoveryInProgress(false), expiryPolicy(new ExpiryPolicy), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) diff --git a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp index 8d9e3f43dd..8ae733a1ff 100644 --- a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp +++ b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp @@ -48,10 +48,15 @@ namespace { fireFunction(); } } -QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer* t) : queues(q), timer(t) {} +QueueCleaner::QueueCleaner(QueueRegistry& q, boost::shared_ptr<sys::Poller> p, sys::Timer* t) + : queues(q), timer(t), purging(boost::bind(&QueueCleaner::purge, this, _1), p) +{ + purging.start(); +} QueueCleaner::~QueueCleaner() { + purging.stop(); if (task) task->cancel(); } @@ -66,28 +71,19 @@ void QueueCleaner::setTimer(qpid::sys::Timer* timer) { this->timer = timer; } -namespace { -struct CollectQueues +void QueueCleaner::fired() { - std::vector<Queue::shared_ptr>* queues; - CollectQueues(std::vector<Queue::shared_ptr>* q) : queues(q) {} - void operator()(Queue::shared_ptr q) - { - queues->push_back(q); - } -}; + queues.eachQueue(boost::bind(&PurgeSet::push, &purging, _1)); } -void QueueCleaner::fired() +QueueCleaner::QueuePtrs::const_iterator QueueCleaner::purge(const QueueCleaner::QueuePtrs& batch) { - //collect copy of list of queues to avoid holding registry lock while we perform purge - std::vector<Queue::shared_ptr> copy; - CollectQueues collect(©); - queues.eachQueue(collect); - std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, _1, period)); - task->setupNextFire(); + for (QueuePtrs::const_iterator i = batch.begin(); i != batch.end(); ++i) { + (*i)->purgeExpired(period); + } + task->restart(); timer->add(task); + return batch.end(); } - }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/QueueCleaner.h b/qpid/cpp/src/qpid/broker/QueueCleaner.h index 896af1dcd5..499326460f 100644 --- a/qpid/cpp/src/qpid/broker/QueueCleaner.h +++ b/qpid/cpp/src/qpid/broker/QueueCleaner.h @@ -23,9 +23,11 @@ */ #include "qpid/broker/BrokerImportExport.h" +#include "qpid/sys/PollableQueue.h" #include "qpid/sys/Time.h" #include <boost/intrusive_ptr.hpp> +#include <boost/shared_ptr.hpp> namespace qpid { @@ -36,6 +38,7 @@ namespace sys { namespace broker { +class Queue; class QueueRegistry; /** * TimerTask to purge expired messages from queues @@ -43,18 +46,24 @@ class QueueRegistry; class QueueCleaner { public: - QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer* timer); + QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, boost::shared_ptr<sys::Poller>, sys::Timer* timer); QPID_BROKER_EXTERN ~QueueCleaner(); QPID_BROKER_EXTERN void start(sys::Duration period); QPID_BROKER_EXTERN void setTimer(sys::Timer* timer); private: + typedef boost::shared_ptr<Queue> QueuePtr; + typedef std::deque< QueuePtr > QueuePtrs; + typedef qpid::sys::PollableQueue< QueuePtr > PurgeSet; boost::intrusive_ptr<sys::TimerTask> task; QueueRegistry& queues; sys::Timer* timer; sys::Duration period; + PurgeSet purging; void fired(); + QueuePtrs::const_iterator purge(const QueuePtrs&); + }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index 0fd35a000b..c44483f8dd 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -40,6 +40,7 @@ #include "qpid/framing/reply_exceptions.h" #include "qpid/broker/QueueFlowLimit.h" #include "qpid/broker/QueueSettings.h" +#include "qpid/sys/Thread.h" #include "qpid/sys/Timer.h" #include <iostream> @@ -202,18 +203,22 @@ QPID_AUTO_TEST_CASE(testPurgeExpired) { } QPID_AUTO_TEST_CASE(testQueueCleaner) { + boost::shared_ptr<Poller> poller(new Poller); + Thread runner(poller.get()); Timer timer; QueueRegistry queues; Queue::shared_ptr queue = queues.declare("my-queue", QueueSettings()).first; addMessagesToQueue(10, *queue, 200, 400); BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u); - QueueCleaner cleaner(queues, &timer); + QueueCleaner cleaner(queues, poller, &timer); cleaner.start(100 * qpid::sys::TIME_MSEC); ::usleep(300*1000); BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u); ::usleep(300*1000); BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u); + poller->shutdown(); + runner.join(); } namespace { int getIntProperty(const Message& message, const std::string& key) |
