diff options
| author | Gordon Sim <gsim@apache.org> | 2009-09-28 12:12:41 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2009-09-28 12:12:41 +0000 |
| commit | 9376922f0fce58400c1e9b5b20f6c6f7b279a55b (patch) | |
| tree | 52b9422da0032defb0b3b94caef6ec168e264efd /cpp/src/qpid/broker/Queue.cpp | |
| parent | 46b7c031c27b7c047d7f2361c4d8287ee1578f05 (diff) | |
| download | qpid-python-9376922f0fce58400c1e9b5b20f6c6f7b279a55b.tar.gz | |
QPID-2102: Changed QueuePolicy to rely on external locking and require dequeues to be handled by policy user rather.
(r817742 introduced a deadlock in ring queue policy which this checkin fixes)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@819505 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 19 |
1 files changed, 9 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 80794f791f..5bfec0f24e 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -208,11 +208,10 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){ } void Queue::requeue(const QueuedMessage& msg){ - if (!isEnqueued(msg)) return; - QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); + if (!isEnqueued(msg)) return; msg.payload->enqueueComplete(); // mark the message as enqueued messages.push_front(msg); listeners.populate(copy); @@ -603,7 +602,6 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName()); } if (policy.get()) { - Mutex::ScopedUnlock locker(messageLock); policy->enqueued(qm); } } @@ -696,7 +694,14 @@ void Queue::setLastNodeFailure() bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck) { if (policy.get() && !suppressPolicyCheck) { - policy->tryEnqueue(msg); + Messages dequeues; + { + Mutex::ScopedLock locker(messageLock); + policy->tryEnqueue(msg); + policy->getPendingDequeues(dequeues); + } + //depending on policy, may have some dequeues that need to performed without holding the lock + for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } if (inLastNodeFailure && persistLastNode){ @@ -1072,10 +1077,4 @@ bool Queue::isEnqueued(const QueuedMessage& msg) return !policy.get() || policy->isEnqueued(msg); } -void Queue::addPendingDequeue(const QueuedMessage& msg) -{ - //assumes lock is held - true at present but rather nasty as this is a public method - pendingDequeues.push_back(msg); -} - QueueListeners& Queue::getListeners() { return listeners; } |
