summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-01-31 19:43:09 +0000
committerAlan Conway <aconway@apache.org>2013-01-31 19:43:09 +0000
commitec4b8bc7125b9cc0b518025da76dc5d2565a991d (patch)
tree8fb875c07f110a8de339adc31d6c4d9004c163dc /qpid/cpp/src
parent4d0d0d5cd7e040215e879a35fcb7ac7336c9c6df (diff)
downloadqpid-python-ec4b8bc7125b9cc0b518025da76dc5d2565a991d.tar.gz
QPID-4555: HA Fix race condition in QueueGuard
- If cancelled could delay a message without recording it. - Make all actions involving the delayed set and the AsyncCompletion atomic. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1441158 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.cpp85
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.h11
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h2
3 files changed, 44 insertions, 54 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
index b2b012766c..863b6779f8 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
@@ -67,44 +67,30 @@ QueueGuard::~QueueGuard() { cancel(); }
void QueueGuard::enqueued(const Message& m) {
// Delay completion
QPID_LOG(trace, logPrefix << "Delayed completion of " << m.getSequence());
+ Mutex::ScopedLock l(lock);
+ if (cancelled) return; // Don't record enqueues after we are cancelled.
+ assert(delayed.find(m.getSequence()) == delayed.end());
+ delayed[m.getSequence()] = m.getIngressCompletion();
m.getIngressCompletion()->startCompleter();
- {
- Mutex::ScopedLock l(lock);
- if (cancelled) return; // Don't record enqueues after we are cancelled.
- assert(delayed.find(m.getSequence()) == delayed.end());
- delayed[m.getSequence()] = m.getIngressCompletion();
- }
}
// NOTE: Called with message lock held.
void QueueGuard::dequeued(const Message& m) {
QPID_LOG(trace, logPrefix << "Dequeued " << m);
- ReplicatingSubscription* rs=0;
- {
- Mutex::ScopedLock l(lock);
- rs = subscription;
- }
- if (rs) rs->dequeued(m);
- complete(m.getSequence());
-}
-
-void QueueGuard::completeRange(Delayed::iterator begin, Delayed::iterator end) {
- for (Delayed::iterator i = begin; i != end; ++i) {
- QPID_LOG(trace, logPrefix << "Completed " << i->first);
- i->second->finishCompleter();
- }
+ Mutex::ScopedLock l(lock);
+ if (subscription) subscription->dequeued(m);
+ complete(m.getSequence(), l);
}
void QueueGuard::cancel() {
queue.removeObserver(observer);
- Delayed removed;
- {
- Mutex::ScopedLock l(lock);
- if (cancelled) return;
- cancelled = true;
- delayed.swap(removed);
+ Mutex::ScopedLock l(lock);
+ if (cancelled) return;
+ cancelled = true;
+ for (Delayed::iterator i = delayed.begin(); i != delayed.end();) {
+ complete(i, l);
+ delayed.erase(i++);
}
- completeRange(removed.begin(), removed.end());
}
void QueueGuard::attach(ReplicatingSubscription& rs) {
@@ -115,37 +101,34 @@ void QueueGuard::attach(ReplicatingSubscription& rs) {
bool QueueGuard::subscriptionStart(SequenceNumber position) {
// Complete any messages before or at the ReplicatingSubscription start position.
// Those messages are already on the backup.
- Delayed removed;
- {
- Mutex::ScopedLock l(lock);
- Delayed::iterator i = delayed.begin();
- while(i != delayed.end() && i->first <= position) {
- removed.insert(*i);
- delayed.erase(i++);
- }
+ Mutex::ScopedLock l(lock);
+ Delayed::iterator i = delayed.begin();
+ while(i != delayed.end() && i->first <= position) {
+ complete(i, l);
+ delayed.erase(i++);
}
- completeRange(removed.begin(), removed.end());
return position >= range.back;
}
void QueueGuard::complete(SequenceNumber sequence) {
- boost::intrusive_ptr<broker::AsyncCompletion> m;
- {
- Mutex::ScopedLock l(lock);
- // The same message can be completed twice, by
- // ReplicatingSubscription::acknowledged and dequeued. Remove it
- // from the map so we only call finishCompleter() once
- Delayed::iterator i = delayed.find(sequence);
- if (i != delayed.end()) {
- m = i->second;
- delayed.erase(i);
- }
+ Mutex::ScopedLock l(lock);
+ complete(sequence, l);
+}
+void QueueGuard::complete(SequenceNumber sequence, Mutex::ScopedLock& l) {
+ // The same message can be completed twice, by
+ // ReplicatingSubscription::acknowledged and dequeued. Remove it
+ // from the map so we only call finishCompleter() once
+ Delayed::iterator i = delayed.find(sequence);
+ if (i != delayed.end()) {
+ complete(i, l);
+ delayed.erase(i);
}
- if (m) {
- QPID_LOG(trace, logPrefix << "Completed " << sequence);
- m->finishCompleter();
- }
+}
+
+void QueueGuard::complete(Delayed::iterator i, Mutex::ScopedLock&) {
+ QPID_LOG(trace, logPrefix << "Completed " << i->first);
+ i->second->finishCompleter();
}
}} // namespaces qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h
index 2d768b5b72..e7ceb351e8 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.h
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.h
@@ -54,6 +54,9 @@ class ReplicatingSubscription;
* THREAD SAFE: Concurrent calls:
* - enqueued() via QueueObserver in arbitrary connection threads.
* - attach(), cancel(), complete() from ReplicatingSubscription in subscription thread.
+ *
+ * Lock Hierarchy: ReplicatingSubscription MUS NOT call QueueGuard with it's lock held
+ * QueueGuard MAY call ReplicatingSubscription with it's lock held.
*/
class QueueGuard {
public:
@@ -104,18 +107,20 @@ class QueueGuard {
private:
class QueueObserver;
+ typedef std::map<framing::SequenceNumber,
+ boost::intrusive_ptr<broker::AsyncCompletion> > Delayed;
+
+ void complete(framing::SequenceNumber, sys::Mutex::ScopedLock &);
+ void complete(Delayed::iterator, sys::Mutex::ScopedLock &);
sys::Mutex lock;
bool cancelled;
std::string logPrefix;
broker::Queue& queue;
- typedef std::map<framing::SequenceNumber, boost::intrusive_ptr<broker::AsyncCompletion> > Delayed;
Delayed delayed;
ReplicatingSubscription* subscription;
boost::shared_ptr<QueueObserver> observer;
QueueRange range;
-
- void completeRange(Delayed::iterator begin, Delayed::iterator end);
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index c2d35dd5cf..7fcb4ccf13 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -61,6 +61,8 @@ class QueueGuard;
*
* Lifecycle: broker::Queue holds shared_ptrs to this as a consumer.
*
+ * Lock Hierarchy: ReplicatingSubscription MUS NOT call QueueGuard with it's lock held
+ * QueueGuard MAY call ReplicatingSubscription with it's lock held.
*/
class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
{