diff options
Diffstat (limited to 'cpp/src/qpid/broker/QueuePolicy.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/QueuePolicy.cpp | 41 |
1 files changed, 27 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp index 16fb9adf63..368bd6552d 100644 --- a/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/cpp/src/qpid/broker/QueuePolicy.cpp @@ -207,13 +207,9 @@ void RingQueuePolicy::dequeued(const QueuedMessage& m) { qpid::sys::Mutex::ScopedLock l(lock); //find and remove m from queue - for (Messages::iterator i = queue.begin(); i != queue.end(); i++) { - if (i->payload == m.payload) { - queue.erase(i); - //now update count and size - QueuePolicy::dequeued(m); - break; - } + if (find(m, pendingDequeues, true) || find(m, queue, true)) { + //now update count and size + QueuePolicy::dequeued(m); } } @@ -223,12 +219,7 @@ bool RingQueuePolicy::isEnqueued(const QueuedMessage& m) //for non-strict ring policy, a message can be replaced (and //therefore dequeued) before it is accepted or released by //subscriber; need to detect this - for (Messages::const_iterator i = queue.begin(); i != queue.end(); i++) { - if (i->payload == m.payload) { - return true; - } - } - return false; + return find(m, pendingDequeues, false) || find(m, queue, false); } bool RingQueuePolicy::checkLimit(const QueuedMessage& m) @@ -248,7 +239,18 @@ bool RingQueuePolicy::checkLimit(const QueuedMessage& m) oldest = queue.front(); } if (oldest.queue->acquire(oldest) || !strict) { - oldest.queue->dequeue(0, oldest); + { + //TODO: fix this! In the current code, this method is + //only ever called with the Queue lock already taken. This + //should not be relied upon going forward however and + //clearly the locking in this class is insufficient as + //there is no guarantee that the message previously atthe + //front is still there. + qpid::sys::Mutex::ScopedLock l(lock); + queue.pop_front(); + pendingDequeues.push_back(oldest); + } + oldest.queue->addPendingDequeue(oldest); QPID_LOG(debug, "Ring policy triggered in queue " << (m.queue ? m.queue->getName() : std::string("unknown queue")) << ": removed message " << oldest.position << " to make way for " << m.position); @@ -264,6 +266,17 @@ bool RingQueuePolicy::checkLimit(const QueuedMessage& m) } } +bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove) +{ + for (Messages::iterator i = q.begin(); i != q.end(); i++) { + if (i->payload == m.payload) { + if (remove) q.erase(i); + return true; + } + } + return false; +} + std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::FieldTable& settings) { uint32_t maxCount = getInt(settings, maxCountKey, 0); |
