diff options
| author | Alan Conway <aconway@apache.org> | 2013-01-31 19:43:09 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2013-01-31 19:43:09 +0000 |
| commit | ec4b8bc7125b9cc0b518025da76dc5d2565a991d (patch) | |
| tree | 8fb875c07f110a8de339adc31d6c4d9004c163dc /qpid/cpp/src | |
| parent | 4d0d0d5cd7e040215e879a35fcb7ac7336c9c6df (diff) | |
| download | qpid-python-ec4b8bc7125b9cc0b518025da76dc5d2565a991d.tar.gz | |
QPID-4555: HA Fix race condition in QueueGuard
- If cancelled could delay a message without recording it.
- Make all actions involving the delayed set and the AsyncCompletion atomic.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1441158 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueGuard.cpp | 85 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueGuard.h | 11 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 2 |
3 files changed, 44 insertions, 54 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp index b2b012766c..863b6779f8 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp +++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp @@ -67,44 +67,30 @@ QueueGuard::~QueueGuard() { cancel(); } void QueueGuard::enqueued(const Message& m) { // Delay completion QPID_LOG(trace, logPrefix << "Delayed completion of " << m.getSequence()); + Mutex::ScopedLock l(lock); + if (cancelled) return; // Don't record enqueues after we are cancelled. + assert(delayed.find(m.getSequence()) == delayed.end()); + delayed[m.getSequence()] = m.getIngressCompletion(); m.getIngressCompletion()->startCompleter(); - { - Mutex::ScopedLock l(lock); - if (cancelled) return; // Don't record enqueues after we are cancelled. - assert(delayed.find(m.getSequence()) == delayed.end()); - delayed[m.getSequence()] = m.getIngressCompletion(); - } } // NOTE: Called with message lock held. void QueueGuard::dequeued(const Message& m) { QPID_LOG(trace, logPrefix << "Dequeued " << m); - ReplicatingSubscription* rs=0; - { - Mutex::ScopedLock l(lock); - rs = subscription; - } - if (rs) rs->dequeued(m); - complete(m.getSequence()); -} - -void QueueGuard::completeRange(Delayed::iterator begin, Delayed::iterator end) { - for (Delayed::iterator i = begin; i != end; ++i) { - QPID_LOG(trace, logPrefix << "Completed " << i->first); - i->second->finishCompleter(); - } + Mutex::ScopedLock l(lock); + if (subscription) subscription->dequeued(m); + complete(m.getSequence(), l); } void QueueGuard::cancel() { queue.removeObserver(observer); - Delayed removed; - { - Mutex::ScopedLock l(lock); - if (cancelled) return; - cancelled = true; - delayed.swap(removed); + Mutex::ScopedLock l(lock); + if (cancelled) return; + cancelled = true; + for (Delayed::iterator i = delayed.begin(); i != delayed.end();) { + complete(i, l); + delayed.erase(i++); } - completeRange(removed.begin(), removed.end()); } void QueueGuard::attach(ReplicatingSubscription& rs) { @@ -115,37 +101,34 @@ void QueueGuard::attach(ReplicatingSubscription& rs) { bool QueueGuard::subscriptionStart(SequenceNumber position) { // Complete any messages before or at the ReplicatingSubscription start position. // Those messages are already on the backup. - Delayed removed; - { - Mutex::ScopedLock l(lock); - Delayed::iterator i = delayed.begin(); - while(i != delayed.end() && i->first <= position) { - removed.insert(*i); - delayed.erase(i++); - } + Mutex::ScopedLock l(lock); + Delayed::iterator i = delayed.begin(); + while(i != delayed.end() && i->first <= position) { + complete(i, l); + delayed.erase(i++); } - completeRange(removed.begin(), removed.end()); return position >= range.back; } void QueueGuard::complete(SequenceNumber sequence) { - boost::intrusive_ptr<broker::AsyncCompletion> m; - { - Mutex::ScopedLock l(lock); - // The same message can be completed twice, by - // ReplicatingSubscription::acknowledged and dequeued. Remove it - // from the map so we only call finishCompleter() once - Delayed::iterator i = delayed.find(sequence); - if (i != delayed.end()) { - m = i->second; - delayed.erase(i); - } + Mutex::ScopedLock l(lock); + complete(sequence, l); +} +void QueueGuard::complete(SequenceNumber sequence, Mutex::ScopedLock& l) { + // The same message can be completed twice, by + // ReplicatingSubscription::acknowledged and dequeued. Remove it + // from the map so we only call finishCompleter() once + Delayed::iterator i = delayed.find(sequence); + if (i != delayed.end()) { + complete(i, l); + delayed.erase(i); } - if (m) { - QPID_LOG(trace, logPrefix << "Completed " << sequence); - m->finishCompleter(); - } +} + +void QueueGuard::complete(Delayed::iterator i, Mutex::ScopedLock&) { + QPID_LOG(trace, logPrefix << "Completed " << i->first); + i->second->finishCompleter(); } }} // namespaces qpid::ha diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h index 2d768b5b72..e7ceb351e8 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.h +++ b/qpid/cpp/src/qpid/ha/QueueGuard.h @@ -54,6 +54,9 @@ class ReplicatingSubscription; * THREAD SAFE: Concurrent calls: * - enqueued() via QueueObserver in arbitrary connection threads. * - attach(), cancel(), complete() from ReplicatingSubscription in subscription thread. + * + * Lock Hierarchy: ReplicatingSubscription MUS NOT call QueueGuard with it's lock held + * QueueGuard MAY call ReplicatingSubscription with it's lock held. */ class QueueGuard { public: @@ -104,18 +107,20 @@ class QueueGuard { private: class QueueObserver; + typedef std::map<framing::SequenceNumber, + boost::intrusive_ptr<broker::AsyncCompletion> > Delayed; + + void complete(framing::SequenceNumber, sys::Mutex::ScopedLock &); + void complete(Delayed::iterator, sys::Mutex::ScopedLock &); sys::Mutex lock; bool cancelled; std::string logPrefix; broker::Queue& queue; - typedef std::map<framing::SequenceNumber, boost::intrusive_ptr<broker::AsyncCompletion> > Delayed; Delayed delayed; ReplicatingSubscription* subscription; boost::shared_ptr<QueueObserver> observer; QueueRange range; - - void completeRange(Delayed::iterator begin, Delayed::iterator end); }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index c2d35dd5cf..7fcb4ccf13 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -61,6 +61,8 @@ class QueueGuard; * * Lifecycle: broker::Queue holds shared_ptrs to this as a consumer. * + * Lock Hierarchy: ReplicatingSubscription MUS NOT call QueueGuard with it's lock held + * QueueGuard MAY call ReplicatingSubscription with it's lock held. */ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl { |
