summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/QueuePolicy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/QueuePolicy.cpp')
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.cpp61
1 files changed, 41 insertions, 20 deletions
diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp
index c8feaa8a62..4b185ef025 100644
--- a/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -222,30 +222,51 @@ bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
{
- if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept
-
- QueuedMessage oldest;
- if (queue.empty()) {
+
+ // If the message is bigger than the queue size, give up
+ if (m->contentSize() > getMaxSize()) {
QPID_LOG(debug, "Message too large for ring queue " << name
<< " [" << *this << "] "
- << ": message size = " << m->contentSize() << " bytes");
- return false;
- }
- oldest = queue.front();
- if (oldest.queue->acquire(oldest) || !strict) {
- queue.pop_front();
- pendingDequeues.push_back(oldest);
- QPID_LOG(debug, "Ring policy triggered in " << name
- << ": removed message " << oldest.position << " to make way for new message");
- return true;
- } else {
- QPID_LOG(debug, "Ring policy could not be triggered in " << name
- << ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued");
- //in strict mode, if oldest message has been delivered (hence
- //cannot be acquired) but not yet acked, it should not be
- //removed and the attempted enqueue should fail
+ << ": message size = " << m->contentSize() << " bytes"
+ << ": max queue size = " << getMaxSize() << " bytes");
return false;
}
+
+ // if within limits, ok to accept
+ if (QueuePolicy::checkLimit(m)) return true;
+
+ // At this point, we've exceeded maxSize, maxCount, or both.
+ //
+ // If we've exceeded maxCount, we've exceeded it by 1, so
+ // replacing the first message is sufficient. If we've exceeded
+ // maxSize, we need to pop enough messages to get the space we
+ // need.
+
+ unsigned int haveSpace = getMaxSize() - getCurrentQueueSize();
+
+ do {
+ QueuedMessage oldest = queue.front();
+
+ if (oldest.queue->acquire(oldest) || !strict) {
+ queue.pop_front();
+ pendingDequeues.push_back(oldest);
+ QPID_LOG(debug, "Ring policy triggered in " << name
+ << ": removed message " << oldest.position << " to make way for new message");
+
+ haveSpace += oldest.payload->contentSize();
+
+ } else {
+ //in strict mode, if oldest message has been delivered (hence
+ //cannot be acquired) but not yet acked, it should not be
+ //removed and the attempted enqueue should fail
+ QPID_LOG(debug, "Ring policy could not be triggered in " << name
+ << ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued");
+ return false;
+ }
+ } while (haveSpace < m->contentSize());
+
+
+ return true;
}
void RingQueuePolicy::getPendingDequeues(Messages& result)