From a1eaf3a3abf8fc22a235b4ca1ce902be2834b3d9 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Wed, 11 Aug 2010 10:06:24 +0000 Subject: Revert commits r981517 and r981435 that moved periodic purging of queues onto cluster's timer. If the timer fires during an update it causes errors; it also puts a potentially time consuming task on the clusters dispatch thread. Instead don't purge LVQs to avoid cluster inconsistencies (and more directly the assertion that aims to prevent these). git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@984357 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/Queue.cpp | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) (limited to 'cpp/src/qpid/broker/Queue.cpp') diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 40ef6052a0..e59857462c 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -494,16 +494,34 @@ void Queue::purgeExpired() { //As expired messages are discarded during dequeue also, only //bother explicitly expiring if the rate of dequeues since last - //attempt is less than one per second. - if (dequeueTracker.sampleRatePerSecond() < 1) { + //attempt is less than one per second. + + //Note: This method is currently called periodically on the timer + //thread. In a clustered broker this means that the purging does + //not occur on the cluster event dispatch thread and consequently + //that is not totally ordered w.r.t other events (including + //publication of messages). However the cluster does ensure that + //the actual expiration of messages (as distinct from the removing + //of those expired messages from the queue) *is* consistently + //ordered w.r.t. cluster events. This means that delivery of + //messages is in general consistent across the cluster inspite of + //any non-determinism in the triggering of a purge. However at + //present purging a last value queue could potentially cause + //inconsistencies in the cluster (as the order w.r.t publications + //can affect the order in which messages appear in the + //queue). Consequently periodic purging of an LVQ is not enabled + //(expired messages will be removed on delivery and consolidated + //by key as part of normal LVQ operation). + + if (dequeueTracker.sampleRatePerSecond() < 1 && !lastValueQueue) { Messages expired; { Mutex::ScopedLock locker(messageLock); for (Messages::iterator i = messages.begin(); i != messages.end();) { - if (lastValueQueue) checkLvqReplace(*i); + //Re-introduce management of LVQ-specific state here + //if purging is renabled for that case (see note above) if (i->payload->hasExpired()) { expired.push_back(*i); - clearLVQIndex(*i); i = messages.erase(i); } else { ++i; -- cgit v1.2.1