From a1eaf3a3abf8fc22a235b4ca1ce902be2834b3d9 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Wed, 11 Aug 2010 10:06:24 +0000 Subject: Revert commits r981517 and r981435 that moved periodic purging of queues onto cluster's timer. If the timer fires during an update it causes errors; it also puts a potentially time consuming task on the clusters dispatch thread. Instead don't purge LVQs to avoid cluster inconsistencies (and more directly the assertion that aims to prevent these). git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@984357 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/Broker.cpp | 3 +-- cpp/src/qpid/broker/Queue.cpp | 26 ++++++++++++++++++++++---- cpp/src/qpid/broker/QueueCleaner.cpp | 17 +++++------------ cpp/src/qpid/broker/QueueCleaner.h | 5 ++--- cpp/src/tests/QueueTest.cpp | 2 +- 5 files changed, 31 insertions(+), 22 deletions(-) (limited to 'cpp') diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 5e0ca90e69..1a8bed1be0 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -156,7 +156,7 @@ Broker::Broker(const Broker::Options& conf) : conf.replayFlushLimit*1024, // convert kb to bytes. conf.replayHardLimit*1024), *this), - queueCleaner(queues, &timer), + queueCleaner(queues, timer), queueEvents(poller,!conf.asyncQueueEvents), recovery(true), clusterUpdatee(false), @@ -504,7 +504,6 @@ bool Broker::deferDeliveryImpl(const std::string& , void Broker::setClusterTimer(std::auto_ptr t) { clusterTimer = t; - queueCleaner.setTimer(clusterTimer.get()); } const std::string Broker::TCP_TRANSPORT("tcp"); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 40ef6052a0..e59857462c 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -494,16 +494,34 @@ void Queue::purgeExpired() { //As expired messages are discarded during dequeue also, only //bother explicitly expiring if the rate of dequeues since last - //attempt is less than one per second. - if (dequeueTracker.sampleRatePerSecond() < 1) { + //attempt is less than one per second. + + //Note: This method is currently called periodically on the timer + //thread. In a clustered broker this means that the purging does + //not occur on the cluster event dispatch thread and consequently + //that is not totally ordered w.r.t other events (including + //publication of messages). However the cluster does ensure that + //the actual expiration of messages (as distinct from the removing + //of those expired messages from the queue) *is* consistently + //ordered w.r.t. cluster events. This means that delivery of + //messages is in general consistent across the cluster inspite of + //any non-determinism in the triggering of a purge. However at + //present purging a last value queue could potentially cause + //inconsistencies in the cluster (as the order w.r.t publications + //can affect the order in which messages appear in the + //queue). Consequently periodic purging of an LVQ is not enabled + //(expired messages will be removed on delivery and consolidated + //by key as part of normal LVQ operation). + + if (dequeueTracker.sampleRatePerSecond() < 1 && !lastValueQueue) { Messages expired; { Mutex::ScopedLock locker(messageLock); for (Messages::iterator i = messages.begin(); i != messages.end();) { - if (lastValueQueue) checkLvqReplace(*i); + //Re-introduce management of LVQ-specific state here + //if purging is renabled for that case (see note above) if (i->payload->hasExpired()) { expired.push_back(*i); - clearLVQIndex(*i); i = messages.erase(i); } else { ++i; diff --git a/cpp/src/qpid/broker/QueueCleaner.cpp b/cpp/src/qpid/broker/QueueCleaner.cpp index a3d06cc4f7..ed98468490 100644 --- a/cpp/src/qpid/broker/QueueCleaner.cpp +++ b/cpp/src/qpid/broker/QueueCleaner.cpp @@ -26,27 +26,20 @@ namespace qpid { namespace broker { -QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer* t) : queues(q), timer(t) {} +QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer& t) : queues(q), timer(t) {} QueueCleaner::~QueueCleaner() { if (task) task->cancel(); } -void QueueCleaner::setTimer(sys::Timer* t) -{ - timer = t; -} - void QueueCleaner::start(qpid::sys::Duration p) { - if (timer) { - task = new Task(*this, p); - timer->add(task); - } + task = new Task(*this, p); + timer.add(task); } -QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d, "QueueCleaner::fired"), parent(p) {} +QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d), parent(p) {} void QueueCleaner::Task::fire() { @@ -73,7 +66,7 @@ void QueueCleaner::fired() queues.eachQueue(collect); std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, _1)); task->setupNextFire(); - if (timer) timer->add(task); + timer.add(task); } diff --git a/cpp/src/qpid/broker/QueueCleaner.h b/cpp/src/qpid/broker/QueueCleaner.h index 8eae0afaaf..11c2d180ac 100644 --- a/cpp/src/qpid/broker/QueueCleaner.h +++ b/cpp/src/qpid/broker/QueueCleaner.h @@ -35,9 +35,8 @@ class QueueRegistry; class QueueCleaner { public: - QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer* timer); + QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer& timer); QPID_BROKER_EXTERN ~QueueCleaner(); - QPID_BROKER_EXTERN void setTimer(sys::Timer* timer); QPID_BROKER_EXTERN void start(qpid::sys::Duration period); private: class Task : public sys::TimerTask @@ -51,7 +50,7 @@ class QueueCleaner boost::intrusive_ptr task; QueueRegistry& queues; - sys::Timer* timer; + sys::Timer& timer; void fired(); }; diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index adc5c0bee7..80c69ac386 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -687,7 +687,7 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) { addMessagesToQueue(10, *queue, 200, 400); BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u); - QueueCleaner cleaner(queues, &timer); + QueueCleaner cleaner(queues, timer); cleaner.start(100 * qpid::sys::TIME_MSEC); ::usleep(300*1000); BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u); -- cgit v1.2.1