diff options
| author | Alan Conway <aconway@apache.org> | 2012-10-18 19:42:21 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-10-18 19:42:21 +0000 |
| commit | bb8760c65fdec8001d0edc2a01031ea700162e75 (patch) | |
| tree | baac0ffa77715e6e9a4fdbbfa8204e948b9e8920 | |
| parent | 43d6d12379f03a4fe9db2beacfe1be27de4f0ba3 (diff) | |
| download | qpid-python-bb8760c65fdec8001d0edc2a01031ea700162e75.tar.gz | |
Bug 867030 - QPID-4374: Make QueueGuard::cancel idempotent (Jason Dillaman)
Added QueueGuard::cancelled, only call cancel once.
Don't enqueue after cancel.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1399814 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueGuard.cpp | 24 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueGuard.h | 1 |
2 files changed, 12 insertions, 13 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp index 8852554d31..f3bc4c4417 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp +++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp @@ -50,7 +50,7 @@ class QueueGuard::QueueObserver : public broker::QueueObserver QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) - : queue(q), subscription(0) + : cancelled(false), queue(q), subscription(0) { std::ostringstream os; os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": "; @@ -61,10 +61,7 @@ QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) range = QueueRange(q); } -QueueGuard::~QueueGuard() { - QPID_LOG(debug, logPrefix << "Cancelled"); - cancel(); -} +QueueGuard::~QueueGuard() { cancel(); } // NOTE: Called with message lock held. void QueueGuard::enqueued(const Message& m) { @@ -73,10 +70,9 @@ void QueueGuard::enqueued(const Message& m) { m.getIngressCompletion()->startCompleter(); { Mutex::ScopedLock l(lock); - if (!delayed.insert(Delayed::value_type(m.getSequence(), m.getIngressCompletion())).second) { - QPID_LOG(critical, logPrefix << "Second enqueue for message with sequence " << m.getSequence()); - assert(false); - } + if (cancelled) return; // Don't record enqueues after we are cancelled. + assert(delayed.find(m.getSequence()) == delayed.end()); + delayed[m.getSequence()] = m.getIngressCompletion(); } } @@ -104,7 +100,8 @@ void QueueGuard::cancel() { Delayed removed; { Mutex::ScopedLock l(lock); - if (delayed.empty()) return; // No need if no delayed messages. + if (cancelled) return; + cancelled = true; delayed.swap(removed); } completeRange(removed.begin(), removed.end()); @@ -116,12 +113,13 @@ void QueueGuard::attach(ReplicatingSubscription& rs) { } bool QueueGuard::subscriptionStart(SequenceNumber position) { + // Complete any messages before or at the ReplicatingSubscription start position. + // Those messages are already on the backup. Delayed removed; { Mutex::ScopedLock l(lock); - // Complete any messages before or at the ReplicatingSubscription start position. - // Those messages are already on the backup. - for (Delayed::iterator i = delayed.begin(); i != delayed.end() && i->first <= position;) { + Delayed::iterator i = delayed.begin(); + while(i != delayed.end() && i->first <= position) { removed.insert(*i); delayed.erase(i++); } diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h index 3904b3bd3f..2d768b5b72 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.h +++ b/qpid/cpp/src/qpid/ha/QueueGuard.h @@ -106,6 +106,7 @@ class QueueGuard { class QueueObserver; sys::Mutex lock; + bool cancelled; std::string logPrefix; broker::Queue& queue; typedef std::map<framing::SequenceNumber, boost::intrusive_ptr<broker::AsyncCompletion> > Delayed; |
