summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/LossyQueue.cpp15
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp29
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h17
3 files changed, 43 insertions, 18 deletions
diff --git a/qpid/cpp/src/qpid/broker/LossyQueue.cpp b/qpid/cpp/src/qpid/broker/LossyQueue.cpp
index ba7dfd11a1..ee13d7733a 100644
--- a/qpid/cpp/src/qpid/broker/LossyQueue.cpp
+++ b/qpid/cpp/src/qpid/broker/LossyQueue.cpp
@@ -51,8 +51,19 @@ bool LossyQueue::checkDepth(const QueueDepth& increment, const Message& message)
while (settings.maxDepth && (settings.maxDepth - current < increment)) {
QPID_LOG(debug, "purging " << name << ": current depth is [" << current << "], max depth is [" << settings.maxDepth << "], new message has size " << increment.getSize());
qpid::sys::Mutex::ScopedUnlock u(messageLock);
- //TODO: arguably we should try and purge expired messages first but that is potentially expensive
- if (remove(1, settings.priorities ? boost::bind(&isLowerPriorityThan, message.getPriority(), _1) : MessagePredicate(), boost::bind(&reroute, alternateExchange, _1), PURGE, false)) {
+ //TODO: arguably we should try and purge expired messages first but that
+ //is potentially expensive
+
+ // Note: in the case of a priority queue we are only comparing the new mesage
+ // with single lowest-priority message, hence the final parameter maxTests
+ // is 1 in this case, so we only test one message for removal.
+ if (remove(1,
+ settings.priorities ?
+ boost::bind(&isLowerPriorityThan, message.getPriority(), _1) :
+ MessagePredicate(), boost::bind(&reroute, alternateExchange, _1),
+ PURGE, false,
+ settings.priorities ? 1 : 0))
+ {
if (mgmtObject) {
mgmtObject->inc_discardsRing(1);
if (brokerMgmtObject)
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 29e7c06e90..19b18e1b0e 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -706,30 +706,29 @@ namespace {
}
} // end namespace
-uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunctor f, SubscriptionType type, bool triggerAutoDelete)
+uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunctor f,
+ SubscriptionType type, bool triggerAutoDelete, uint32_t maxTests)
{
ScopedAutoDelete autodelete(*this);
std::deque<Message> removed;
{
QueueCursor c(type);
- uint32_t count(0);
+ uint32_t count(0), tests(0);
Mutex::ScopedLock locker(messageLock);
Message* m = messages->next(c);
while (m){
+ if (maxTests && tests++ >= maxTests) break;
if (!p || p(*m)) {
- if (!maxCount || count++ < maxCount) {
- if (m->getState() == AVAILABLE) {
- //don't actually acquire, just act as if we did
- observeAcquire(*m, locker);
- }
- observeDequeue(*m, locker, triggerAutoDelete ? &autodelete : 0);
- removed.push_back(*m);//takes a copy of the message
- if (!messages->deleted(c)) {
- QPID_LOG(warning, "Failed to correctly remove message from " << name << "; state is not consistent!");
- assert(false);
- }
- } else {
- break;
+ if (maxCount && count++ >= maxCount) break;
+ if (m->getState() == AVAILABLE) {
+ //don't actually acquire, just act as if we did
+ observeAcquire(*m, locker);
+ }
+ observeDequeue(*m, locker, triggerAutoDelete ? &autodelete : 0);
+ removed.push_back(*m);//takes a copy of the message
+ if (!messages->deleted(c)) {
+ QPID_LOG(warning, "Failed to correctly remove message from " << name << "; state is not consistent!");
+ assert(false);
}
}
m = messages->next(c);
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index a7eb71c6bb..3622b06dbd 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -255,7 +255,22 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void abandoned(const Message& message);
bool checkNotDeleted(const Consumer::shared_ptr&);
void notifyDeleted();
- uint32_t remove(uint32_t maxCount, MessagePredicate, MessageFunctor, SubscriptionType, bool triggerAutoDelete);
+
+ /** Remove messages from the queue:
+ *@param maxCount Maximum number of messages to remove, 0 means unlimited.
+ *@param p Only remove messages for which p(msg) is true.
+ *@param f Call f on each message that is removed.
+ *@param st Use a cursor of this SubscriptionType to iterate messages to remove.
+ *@param triggerAutoDelete If true removing messages may trigger aut-delete.
+ *@param maxTests Max number of messages to test for removal, 0 means unlimited.
+ *@return Number of messages removed.
+ */
+ uint32_t remove(uint32_t maxCount,
+ MessagePredicate p, MessageFunctor f,
+ SubscriptionType st,
+ bool triggerAutoDelete,
+ uint32_t maxTests=0);
+
virtual bool checkDepth(const QueueDepth& increment, const Message&);
void tryAutoDelete();
public: