summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/QueuePolicy.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-09-28 12:12:41 +0000
committerGordon Sim <gsim@apache.org>2009-09-28 12:12:41 +0000
commit9376922f0fce58400c1e9b5b20f6c6f7b279a55b (patch)
tree52b9422da0032defb0b3b94caef6ec168e264efd /cpp/src/qpid/broker/QueuePolicy.cpp
parent46b7c031c27b7c047d7f2361c4d8287ee1578f05 (diff)
downloadqpid-python-9376922f0fce58400c1e9b5b20f6c6f7b279a55b.tar.gz
QPID-2102: Changed QueuePolicy to rely on external locking and require dequeues to be handled by policy user rather.
(r817742 introduced a deadlock in ring queue policy which this checkin fixes) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@819505 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/QueuePolicy.cpp')
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.cpp15
1 files changed, 6 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp
index 0f1f7f370f..a8aa674c53 100644
--- a/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -77,7 +77,6 @@ bool QueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
void QueuePolicy::tryEnqueue(boost::intrusive_ptr<Message> m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
if (checkLimit(m)) {
enqueued(m->contentSize());
} else {
@@ -87,13 +86,11 @@ void QueuePolicy::tryEnqueue(boost::intrusive_ptr<Message> m)
void QueuePolicy::recoverEnqueued(boost::intrusive_ptr<Message> m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
enqueued(m->contentSize());
}
void QueuePolicy::enqueueAborted(boost::intrusive_ptr<Message> m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
dequeued(m->contentSize());
}
@@ -101,7 +98,6 @@ void QueuePolicy::enqueued(const QueuedMessage&) {}
void QueuePolicy::dequeued(const QueuedMessage& m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
dequeued(m.payload->contentSize());
}
@@ -141,6 +137,7 @@ void QueuePolicy::setDefaultMaxSize(uint64_t s)
defaultMaxSize = s;
}
+void QueuePolicy::getPendingDequeues(Messages&) {}
@@ -200,14 +197,12 @@ bool before(const QueuedMessage& a, const QueuedMessage& b)
void RingQueuePolicy::enqueued(const QueuedMessage& m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
//need to insert in correct location based on position
queue.insert(lower_bound(queue.begin(), queue.end(), m, before), m);
}
void RingQueuePolicy::dequeued(const QueuedMessage& m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
//find and remove m from queue
if (find(m, pendingDequeues, true) || find(m, queue, true)) {
//now update count and size
@@ -217,7 +212,6 @@ void RingQueuePolicy::dequeued(const QueuedMessage& m)
bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
//for non-strict ring policy, a message can be replaced (and
//therefore dequeued) before it is accepted or released by
//subscriber; need to detect this
@@ -241,8 +235,6 @@ bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
pendingDequeues.push_back(oldest);
QPID_LOG(debug, "Ring policy triggered in " << name
<< ": removed message " << oldest.position << " to make way for new message");
- qpid::sys::Mutex::ScopedUnlock u(lock);
- oldest.queue->dequeue(0, oldest);
return true;
} else {
QPID_LOG(debug, "Ring policy could not be triggered in " << name
@@ -254,6 +246,11 @@ bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
}
}
+void RingQueuePolicy::getPendingDequeues(Messages& result)
+{
+ result = pendingDequeues;
+}
+
bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove)
{
for (Messages::iterator i = q.begin(); i != q.end(); i++) {