diff options
| author | Gordon Sim <gsim@apache.org> | 2009-09-28 12:12:41 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2009-09-28 12:12:41 +0000 |
| commit | 9376922f0fce58400c1e9b5b20f6c6f7b279a55b (patch) | |
| tree | 52b9422da0032defb0b3b94caef6ec168e264efd /cpp/src/qpid/broker/QueuePolicy.cpp | |
| parent | 46b7c031c27b7c047d7f2361c4d8287ee1578f05 (diff) | |
| download | qpid-python-9376922f0fce58400c1e9b5b20f6c6f7b279a55b.tar.gz | |
QPID-2102: Changed QueuePolicy to rely on external locking and require dequeues to be handled by policy user rather.
(r817742 introduced a deadlock in ring queue policy which this checkin fixes)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@819505 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/QueuePolicy.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/QueuePolicy.cpp | 15 |
1 files changed, 6 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp index 0f1f7f370f..a8aa674c53 100644 --- a/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/cpp/src/qpid/broker/QueuePolicy.cpp @@ -77,7 +77,6 @@ bool QueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) void QueuePolicy::tryEnqueue(boost::intrusive_ptr<Message> m) { - qpid::sys::Mutex::ScopedLock l(lock); if (checkLimit(m)) { enqueued(m->contentSize()); } else { @@ -87,13 +86,11 @@ void QueuePolicy::tryEnqueue(boost::intrusive_ptr<Message> m) void QueuePolicy::recoverEnqueued(boost::intrusive_ptr<Message> m) { - qpid::sys::Mutex::ScopedLock l(lock); enqueued(m->contentSize()); } void QueuePolicy::enqueueAborted(boost::intrusive_ptr<Message> m) { - qpid::sys::Mutex::ScopedLock l(lock); dequeued(m->contentSize()); } @@ -101,7 +98,6 @@ void QueuePolicy::enqueued(const QueuedMessage&) {} void QueuePolicy::dequeued(const QueuedMessage& m) { - qpid::sys::Mutex::ScopedLock l(lock); dequeued(m.payload->contentSize()); } @@ -141,6 +137,7 @@ void QueuePolicy::setDefaultMaxSize(uint64_t s) defaultMaxSize = s; } +void QueuePolicy::getPendingDequeues(Messages&) {} @@ -200,14 +197,12 @@ bool before(const QueuedMessage& a, const QueuedMessage& b) void RingQueuePolicy::enqueued(const QueuedMessage& m) { - qpid::sys::Mutex::ScopedLock l(lock); //need to insert in correct location based on position queue.insert(lower_bound(queue.begin(), queue.end(), m, before), m); } void RingQueuePolicy::dequeued(const QueuedMessage& m) { - qpid::sys::Mutex::ScopedLock l(lock); //find and remove m from queue if (find(m, pendingDequeues, true) || find(m, queue, true)) { //now update count and size @@ -217,7 +212,6 @@ void RingQueuePolicy::dequeued(const QueuedMessage& m) bool RingQueuePolicy::isEnqueued(const QueuedMessage& m) { - qpid::sys::Mutex::ScopedLock l(lock); //for non-strict ring policy, a message can be replaced (and //therefore dequeued) before it is accepted or released by //subscriber; need to detect this @@ -241,8 +235,6 @@ bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) pendingDequeues.push_back(oldest); QPID_LOG(debug, "Ring policy triggered in " << name << ": removed message " << oldest.position << " to make way for new message"); - qpid::sys::Mutex::ScopedUnlock u(lock); - oldest.queue->dequeue(0, oldest); return true; } else { QPID_LOG(debug, "Ring policy could not be triggered in " << name @@ -254,6 +246,11 @@ bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) } } +void RingQueuePolicy::getPendingDequeues(Messages& result) +{ + result = pendingDequeues; +} + bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove) { for (Messages::iterator i = q.begin(); i != q.end(); i++) { |
