diff options
Diffstat (limited to 'cpp/src/qpid/broker/QueuePolicy.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/QueuePolicy.cpp | 61 |
1 files changed, 41 insertions, 20 deletions
diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp index c8feaa8a62..4b185ef025 100644 --- a/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/cpp/src/qpid/broker/QueuePolicy.cpp @@ -222,30 +222,51 @@ bool RingQueuePolicy::isEnqueued(const QueuedMessage& m) bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) { - if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept - - QueuedMessage oldest; - if (queue.empty()) { + + // If the message is bigger than the queue size, give up + if (m->contentSize() > getMaxSize()) { QPID_LOG(debug, "Message too large for ring queue " << name << " [" << *this << "] " - << ": message size = " << m->contentSize() << " bytes"); - return false; - } - oldest = queue.front(); - if (oldest.queue->acquire(oldest) || !strict) { - queue.pop_front(); - pendingDequeues.push_back(oldest); - QPID_LOG(debug, "Ring policy triggered in " << name - << ": removed message " << oldest.position << " to make way for new message"); - return true; - } else { - QPID_LOG(debug, "Ring policy could not be triggered in " << name - << ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued"); - //in strict mode, if oldest message has been delivered (hence - //cannot be acquired) but not yet acked, it should not be - //removed and the attempted enqueue should fail + << ": message size = " << m->contentSize() << " bytes" + << ": max queue size = " << getMaxSize() << " bytes"); return false; } + + // if within limits, ok to accept + if (QueuePolicy::checkLimit(m)) return true; + + // At this point, we've exceeded maxSize, maxCount, or both. + // + // If we've exceeded maxCount, we've exceeded it by 1, so + // replacing the first message is sufficient. If we've exceeded + // maxSize, we need to pop enough messages to get the space we + // need. + + unsigned int haveSpace = getMaxSize() - getCurrentQueueSize(); + + do { + QueuedMessage oldest = queue.front(); + + if (oldest.queue->acquire(oldest) || !strict) { + queue.pop_front(); + pendingDequeues.push_back(oldest); + QPID_LOG(debug, "Ring policy triggered in " << name + << ": removed message " << oldest.position << " to make way for new message"); + + haveSpace += oldest.payload->contentSize(); + + } else { + //in strict mode, if oldest message has been delivered (hence + //cannot be acquired) but not yet acked, it should not be + //removed and the attempted enqueue should fail + QPID_LOG(debug, "Ring policy could not be triggered in " << name + << ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued"); + return false; + } + } while (haveSpace < m->contentSize()); + + + return true; } void RingQueuePolicy::getPendingDequeues(Messages& result) |
