diff options
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/LossyQueue.cpp | 15 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 29 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 17 |
3 files changed, 43 insertions, 18 deletions
diff --git a/qpid/cpp/src/qpid/broker/LossyQueue.cpp b/qpid/cpp/src/qpid/broker/LossyQueue.cpp index ba7dfd11a1..ee13d7733a 100644 --- a/qpid/cpp/src/qpid/broker/LossyQueue.cpp +++ b/qpid/cpp/src/qpid/broker/LossyQueue.cpp @@ -51,8 +51,19 @@ bool LossyQueue::checkDepth(const QueueDepth& increment, const Message& message) while (settings.maxDepth && (settings.maxDepth - current < increment)) { QPID_LOG(debug, "purging " << name << ": current depth is [" << current << "], max depth is [" << settings.maxDepth << "], new message has size " << increment.getSize()); qpid::sys::Mutex::ScopedUnlock u(messageLock); - //TODO: arguably we should try and purge expired messages first but that is potentially expensive - if (remove(1, settings.priorities ? boost::bind(&isLowerPriorityThan, message.getPriority(), _1) : MessagePredicate(), boost::bind(&reroute, alternateExchange, _1), PURGE, false)) { + //TODO: arguably we should try and purge expired messages first but that + //is potentially expensive + + // Note: in the case of a priority queue we are only comparing the new mesage + // with single lowest-priority message, hence the final parameter maxTests + // is 1 in this case, so we only test one message for removal. + if (remove(1, + settings.priorities ? + boost::bind(&isLowerPriorityThan, message.getPriority(), _1) : + MessagePredicate(), boost::bind(&reroute, alternateExchange, _1), + PURGE, false, + settings.priorities ? 1 : 0)) + { if (mgmtObject) { mgmtObject->inc_discardsRing(1); if (brokerMgmtObject) diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 29e7c06e90..19b18e1b0e 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -706,30 +706,29 @@ namespace { } } // end namespace -uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunctor f, SubscriptionType type, bool triggerAutoDelete) +uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunctor f, + SubscriptionType type, bool triggerAutoDelete, uint32_t maxTests) { ScopedAutoDelete autodelete(*this); std::deque<Message> removed; { QueueCursor c(type); - uint32_t count(0); + uint32_t count(0), tests(0); Mutex::ScopedLock locker(messageLock); Message* m = messages->next(c); while (m){ + if (maxTests && tests++ >= maxTests) break; if (!p || p(*m)) { - if (!maxCount || count++ < maxCount) { - if (m->getState() == AVAILABLE) { - //don't actually acquire, just act as if we did - observeAcquire(*m, locker); - } - observeDequeue(*m, locker, triggerAutoDelete ? &autodelete : 0); - removed.push_back(*m);//takes a copy of the message - if (!messages->deleted(c)) { - QPID_LOG(warning, "Failed to correctly remove message from " << name << "; state is not consistent!"); - assert(false); - } - } else { - break; + if (maxCount && count++ >= maxCount) break; + if (m->getState() == AVAILABLE) { + //don't actually acquire, just act as if we did + observeAcquire(*m, locker); + } + observeDequeue(*m, locker, triggerAutoDelete ? &autodelete : 0); + removed.push_back(*m);//takes a copy of the message + if (!messages->deleted(c)) { + QPID_LOG(warning, "Failed to correctly remove message from " << name << "; state is not consistent!"); + assert(false); } } m = messages->next(c); diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index a7eb71c6bb..3622b06dbd 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -255,7 +255,22 @@ class Queue : public boost::enable_shared_from_this<Queue>, void abandoned(const Message& message); bool checkNotDeleted(const Consumer::shared_ptr&); void notifyDeleted(); - uint32_t remove(uint32_t maxCount, MessagePredicate, MessageFunctor, SubscriptionType, bool triggerAutoDelete); + + /** Remove messages from the queue: + *@param maxCount Maximum number of messages to remove, 0 means unlimited. + *@param p Only remove messages for which p(msg) is true. + *@param f Call f on each message that is removed. + *@param st Use a cursor of this SubscriptionType to iterate messages to remove. + *@param triggerAutoDelete If true removing messages may trigger aut-delete. + *@param maxTests Max number of messages to test for removal, 0 means unlimited. + *@return Number of messages removed. + */ + uint32_t remove(uint32_t maxCount, + MessagePredicate p, MessageFunctor f, + SubscriptionType st, + bool triggerAutoDelete, + uint32_t maxTests=0); + virtual bool checkDepth(const QueueDepth& increment, const Message&); void tryAutoDelete(); public: |
