summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-09-28 12:12:41 +0000
committerGordon Sim <gsim@apache.org>2009-09-28 12:12:41 +0000
commit9376922f0fce58400c1e9b5b20f6c6f7b279a55b (patch)
tree52b9422da0032defb0b3b94caef6ec168e264efd /cpp/src/qpid/broker/Queue.cpp
parent46b7c031c27b7c047d7f2361c4d8287ee1578f05 (diff)
downloadqpid-python-9376922f0fce58400c1e9b5b20f6c6f7b279a55b.tar.gz
QPID-2102: Changed QueuePolicy to rely on external locking and require dequeues to be handled by policy user rather.
(r817742 introduced a deadlock in ring queue policy which this checkin fixes) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@819505 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp19
1 files changed, 9 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 80794f791f..5bfec0f24e 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -208,11 +208,10 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){
}
void Queue::requeue(const QueuedMessage& msg){
- if (!isEnqueued(msg)) return;
-
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
+ if (!isEnqueued(msg)) return;
msg.payload->enqueueComplete(); // mark the message as enqueued
messages.push_front(msg);
listeners.populate(copy);
@@ -603,7 +602,6 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
}
if (policy.get()) {
- Mutex::ScopedUnlock locker(messageLock);
policy->enqueued(qm);
}
}
@@ -696,7 +694,14 @@ void Queue::setLastNodeFailure()
bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck)
{
if (policy.get() && !suppressPolicyCheck) {
- policy->tryEnqueue(msg);
+ Messages dequeues;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ policy->tryEnqueue(msg);
+ policy->getPendingDequeues(dequeues);
+ }
+ //depending on policy, may have some dequeues that need to performed without holding the lock
+ for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
if (inLastNodeFailure && persistLastNode){
@@ -1072,10 +1077,4 @@ bool Queue::isEnqueued(const QueuedMessage& msg)
return !policy.get() || policy->isEnqueued(msg);
}
-void Queue::addPendingDequeue(const QueuedMessage& msg)
-{
- //assumes lock is held - true at present but rather nasty as this is a public method
- pendingDequeues.push_back(msg);
-}
-
QueueListeners& Queue::getListeners() { return listeners; }