diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 26 |
1 files changed, 22 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 40ef6052a0..e59857462c 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -494,16 +494,34 @@ void Queue::purgeExpired() { //As expired messages are discarded during dequeue also, only //bother explicitly expiring if the rate of dequeues since last - //attempt is less than one per second. - if (dequeueTracker.sampleRatePerSecond() < 1) { + //attempt is less than one per second. + + //Note: This method is currently called periodically on the timer + //thread. In a clustered broker this means that the purging does + //not occur on the cluster event dispatch thread and consequently + //that is not totally ordered w.r.t other events (including + //publication of messages). However the cluster does ensure that + //the actual expiration of messages (as distinct from the removing + //of those expired messages from the queue) *is* consistently + //ordered w.r.t. cluster events. This means that delivery of + //messages is in general consistent across the cluster inspite of + //any non-determinism in the triggering of a purge. However at + //present purging a last value queue could potentially cause + //inconsistencies in the cluster (as the order w.r.t publications + //can affect the order in which messages appear in the + //queue). Consequently periodic purging of an LVQ is not enabled + //(expired messages will be removed on delivery and consolidated + //by key as part of normal LVQ operation). + + if (dequeueTracker.sampleRatePerSecond() < 1 && !lastValueQueue) { Messages expired; { Mutex::ScopedLock locker(messageLock); for (Messages::iterator i = messages.begin(); i != messages.end();) { - if (lastValueQueue) checkLvqReplace(*i); + //Re-introduce management of LVQ-specific state here + //if purging is renabled for that case (see note above) if (i->payload->hasExpired()) { expired.push_back(*i); - clearLVQIndex(*i); i = messages.erase(i); } else { ++i; |