summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp26
1 files changed, 16 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 355f822b57..1f508a1cc7 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -405,19 +405,24 @@ QueuedMessage Queue::get(){
void Queue::purgeExpired()
{
- Messages expired;
- {
- Mutex::ScopedLock locker(messageLock);
- for (Messages::iterator i = messages.begin(); i != messages.end();) {
- if (i->payload->hasExpired()) {
- expired.push_back(*i);
- i = messages.erase(i);
- } else {
- ++i;
+ //As expired messages are discarded during dequeue also, only
+ //bother explicitly expiring if the rate of dequeues since last
+ //attempt is less than one per second.
+ if (dequeueTracker.sampleRatePerSecond() < 1) {
+ Messages expired;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ for (Messages::iterator i = messages.begin(); i != messages.end();) {
+ if (i->payload->hasExpired()) {
+ expired.push_back(*i);
+ i = messages.erase(i);
+ } else {
+ ++i;
+ }
}
}
+ for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
- for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
/**
@@ -465,6 +470,7 @@ void Queue::popMsg(QueuedMessage& qmsg)
lvq.erase(key);
}
messages.pop_front();
+ ++dequeueTracker;
}
void Queue::push(boost::intrusive_ptr<Message>& msg){