summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/QueuePolicy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/QueuePolicy.cpp')
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.cpp15
1 files changed, 6 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp
index 0f1f7f370f..a8aa674c53 100644
--- a/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -77,7 +77,6 @@ bool QueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
void QueuePolicy::tryEnqueue(boost::intrusive_ptr<Message> m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
if (checkLimit(m)) {
enqueued(m->contentSize());
} else {
@@ -87,13 +86,11 @@ void QueuePolicy::tryEnqueue(boost::intrusive_ptr<Message> m)
void QueuePolicy::recoverEnqueued(boost::intrusive_ptr<Message> m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
enqueued(m->contentSize());
}
void QueuePolicy::enqueueAborted(boost::intrusive_ptr<Message> m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
dequeued(m->contentSize());
}
@@ -101,7 +98,6 @@ void QueuePolicy::enqueued(const QueuedMessage&) {}
void QueuePolicy::dequeued(const QueuedMessage& m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
dequeued(m.payload->contentSize());
}
@@ -141,6 +137,7 @@ void QueuePolicy::setDefaultMaxSize(uint64_t s)
defaultMaxSize = s;
}
+void QueuePolicy::getPendingDequeues(Messages&) {}
@@ -200,14 +197,12 @@ bool before(const QueuedMessage& a, const QueuedMessage& b)
void RingQueuePolicy::enqueued(const QueuedMessage& m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
//need to insert in correct location based on position
queue.insert(lower_bound(queue.begin(), queue.end(), m, before), m);
}
void RingQueuePolicy::dequeued(const QueuedMessage& m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
//find and remove m from queue
if (find(m, pendingDequeues, true) || find(m, queue, true)) {
//now update count and size
@@ -217,7 +212,6 @@ void RingQueuePolicy::dequeued(const QueuedMessage& m)
bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
//for non-strict ring policy, a message can be replaced (and
//therefore dequeued) before it is accepted or released by
//subscriber; need to detect this
@@ -241,8 +235,6 @@ bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
pendingDequeues.push_back(oldest);
QPID_LOG(debug, "Ring policy triggered in " << name
<< ": removed message " << oldest.position << " to make way for new message");
- qpid::sys::Mutex::ScopedUnlock u(lock);
- oldest.queue->dequeue(0, oldest);
return true;
} else {
QPID_LOG(debug, "Ring policy could not be triggered in " << name
@@ -254,6 +246,11 @@ bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
}
}
+void RingQueuePolicy::getPendingDequeues(Messages& result)
+{
+ result = pendingDequeues;
+}
+
bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove)
{
for (Messages::iterator i = q.begin(); i != q.end(); i++) {