summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-06-23 10:46:15 +0000
committerGordon Sim <gsim@apache.org>2009-06-23 10:46:15 +0000
commit7466362360030c8009fc7864d9753ca13da37335 (patch)
tree6b5a3ff1b022a489550bdbc5f3aecebf9613f1e6 /cpp/src/qpid/broker/Queue.cpp
parent42342e858d4377268ba5724ca2d16e8c15c04fe5 (diff)
downloadqpid-python-7466362360030c8009fc7864d9753ca13da37335.tar.gz
QPID-1936: Fix potential deadlock for durable ring queue
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@787625 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp17
1 files changed, 16 insertions, 1 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index c96b1af6f8..64efc93a22 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -551,11 +551,16 @@ void Queue::popMsg(QueuedMessage& qmsg)
}
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
+ Messages dequeues;
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
- if (policy.get()) policy->tryEnqueue(qm);
+ if (policy.get()) {
+ policy->tryEnqueue(qm);
+ //depending on policy, may have some dequeues
+ if (!isRecovery) pendingDequeues.swap(dequeues);
+ }
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
LVQ::iterator i;
@@ -591,6 +596,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
}
}
copy.notify();
+ if (!dequeues.empty()) {
+ //depending on policy, may have some dequeues
+ for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+ }
}
QueuedMessage Queue::getFront()
@@ -1026,4 +1035,10 @@ bool Queue::isEnqueued(const QueuedMessage& msg)
return !policy.get() || policy->isEnqueued(msg);
}
+void Queue::addPendingDequeue(const QueuedMessage& msg)
+{
+ //assumes lock is held - true at present but rather nasty as this is a public method
+ pendingDequeues.push_back(msg);
+}
+
QueueListeners& Queue::getListeners() { return listeners; }