summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/QueuePolicy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/QueuePolicy.cpp')
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.cpp41
1 files changed, 27 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp
index 16fb9adf63..368bd6552d 100644
--- a/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -207,13 +207,9 @@ void RingQueuePolicy::dequeued(const QueuedMessage& m)
{
qpid::sys::Mutex::ScopedLock l(lock);
//find and remove m from queue
- for (Messages::iterator i = queue.begin(); i != queue.end(); i++) {
- if (i->payload == m.payload) {
- queue.erase(i);
- //now update count and size
- QueuePolicy::dequeued(m);
- break;
- }
+ if (find(m, pendingDequeues, true) || find(m, queue, true)) {
+ //now update count and size
+ QueuePolicy::dequeued(m);
}
}
@@ -223,12 +219,7 @@ bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
//for non-strict ring policy, a message can be replaced (and
//therefore dequeued) before it is accepted or released by
//subscriber; need to detect this
- for (Messages::const_iterator i = queue.begin(); i != queue.end(); i++) {
- if (i->payload == m.payload) {
- return true;
- }
- }
- return false;
+ return find(m, pendingDequeues, false) || find(m, queue, false);
}
bool RingQueuePolicy::checkLimit(const QueuedMessage& m)
@@ -248,7 +239,18 @@ bool RingQueuePolicy::checkLimit(const QueuedMessage& m)
oldest = queue.front();
}
if (oldest.queue->acquire(oldest) || !strict) {
- oldest.queue->dequeue(0, oldest);
+ {
+ //TODO: fix this! In the current code, this method is
+ //only ever called with the Queue lock already taken. This
+ //should not be relied upon going forward however and
+ //clearly the locking in this class is insufficient as
+ //there is no guarantee that the message previously atthe
+ //front is still there.
+ qpid::sys::Mutex::ScopedLock l(lock);
+ queue.pop_front();
+ pendingDequeues.push_back(oldest);
+ }
+ oldest.queue->addPendingDequeue(oldest);
QPID_LOG(debug, "Ring policy triggered in queue "
<< (m.queue ? m.queue->getName() : std::string("unknown queue"))
<< ": removed message " << oldest.position << " to make way for " << m.position);
@@ -264,6 +266,17 @@ bool RingQueuePolicy::checkLimit(const QueuedMessage& m)
}
}
+bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove)
+{
+ for (Messages::iterator i = q.begin(); i != q.end(); i++) {
+ if (i->payload == m.payload) {
+ if (remove) q.erase(i);
+ return true;
+ }
+ }
+ return false;
+}
+
std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::FieldTable& settings)
{
uint32_t maxCount = getInt(settings, maxCountKey, 0);