diff options
Diffstat (limited to 'cpp/src/qpid/broker/QueuePolicy.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/QueuePolicy.cpp | 15 |
1 files changed, 6 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp index 0f1f7f370f..a8aa674c53 100644 --- a/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/cpp/src/qpid/broker/QueuePolicy.cpp @@ -77,7 +77,6 @@ bool QueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) void QueuePolicy::tryEnqueue(boost::intrusive_ptr<Message> m) { - qpid::sys::Mutex::ScopedLock l(lock); if (checkLimit(m)) { enqueued(m->contentSize()); } else { @@ -87,13 +86,11 @@ void QueuePolicy::tryEnqueue(boost::intrusive_ptr<Message> m) void QueuePolicy::recoverEnqueued(boost::intrusive_ptr<Message> m) { - qpid::sys::Mutex::ScopedLock l(lock); enqueued(m->contentSize()); } void QueuePolicy::enqueueAborted(boost::intrusive_ptr<Message> m) { - qpid::sys::Mutex::ScopedLock l(lock); dequeued(m->contentSize()); } @@ -101,7 +98,6 @@ void QueuePolicy::enqueued(const QueuedMessage&) {} void QueuePolicy::dequeued(const QueuedMessage& m) { - qpid::sys::Mutex::ScopedLock l(lock); dequeued(m.payload->contentSize()); } @@ -141,6 +137,7 @@ void QueuePolicy::setDefaultMaxSize(uint64_t s) defaultMaxSize = s; } +void QueuePolicy::getPendingDequeues(Messages&) {} @@ -200,14 +197,12 @@ bool before(const QueuedMessage& a, const QueuedMessage& b) void RingQueuePolicy::enqueued(const QueuedMessage& m) { - qpid::sys::Mutex::ScopedLock l(lock); //need to insert in correct location based on position queue.insert(lower_bound(queue.begin(), queue.end(), m, before), m); } void RingQueuePolicy::dequeued(const QueuedMessage& m) { - qpid::sys::Mutex::ScopedLock l(lock); //find and remove m from queue if (find(m, pendingDequeues, true) || find(m, queue, true)) { //now update count and size @@ -217,7 +212,6 @@ void RingQueuePolicy::dequeued(const QueuedMessage& m) bool RingQueuePolicy::isEnqueued(const QueuedMessage& m) { - qpid::sys::Mutex::ScopedLock l(lock); //for non-strict ring policy, a message can be replaced (and //therefore dequeued) before it is accepted or released by //subscriber; need to detect this @@ -241,8 +235,6 @@ bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) pendingDequeues.push_back(oldest); QPID_LOG(debug, "Ring policy triggered in " << name << ": removed message " << oldest.position << " to make way for new message"); - qpid::sys::Mutex::ScopedUnlock u(lock); - oldest.queue->dequeue(0, oldest); return true; } else { QPID_LOG(debug, "Ring policy could not be triggered in " << name @@ -254,6 +246,11 @@ bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) } } +void RingQueuePolicy::getPendingDequeues(Messages& result) +{ + result = pendingDequeues; +} + bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove) { for (Messages::iterator i = q.begin(); i != q.end(); i++) { |
