summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp26
1 files changed, 22 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 40ef6052a0..e59857462c 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -494,16 +494,34 @@ void Queue::purgeExpired()
{
//As expired messages are discarded during dequeue also, only
//bother explicitly expiring if the rate of dequeues since last
- //attempt is less than one per second.
- if (dequeueTracker.sampleRatePerSecond() < 1) {
+ //attempt is less than one per second.
+
+ //Note: This method is currently called periodically on the timer
+ //thread. In a clustered broker this means that the purging does
+ //not occur on the cluster event dispatch thread and consequently
+ //that is not totally ordered w.r.t other events (including
+ //publication of messages). However the cluster does ensure that
+ //the actual expiration of messages (as distinct from the removing
+ //of those expired messages from the queue) *is* consistently
+ //ordered w.r.t. cluster events. This means that delivery of
+ //messages is in general consistent across the cluster inspite of
+ //any non-determinism in the triggering of a purge. However at
+ //present purging a last value queue could potentially cause
+ //inconsistencies in the cluster (as the order w.r.t publications
+ //can affect the order in which messages appear in the
+ //queue). Consequently periodic purging of an LVQ is not enabled
+ //(expired messages will be removed on delivery and consolidated
+ //by key as part of normal LVQ operation).
+
+ if (dequeueTracker.sampleRatePerSecond() < 1 && !lastValueQueue) {
Messages expired;
{
Mutex::ScopedLock locker(messageLock);
for (Messages::iterator i = messages.begin(); i != messages.end();) {
- if (lastValueQueue) checkLvqReplace(*i);
+ //Re-introduce management of LVQ-specific state here
+ //if purging is renabled for that case (see note above)
if (i->payload->hasExpired()) {
expired.push_back(*i);
- clearLVQIndex(*i);
i = messages.erase(i);
} else {
++i;