diff options
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueGuard.cpp | 24 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueGuard.h | 1 |
2 files changed, 12 insertions, 13 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp index 8852554d31..f3bc4c4417 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp +++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp @@ -50,7 +50,7 @@ class QueueGuard::QueueObserver : public broker::QueueObserver QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) - : queue(q), subscription(0) + : cancelled(false), queue(q), subscription(0) { std::ostringstream os; os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": "; @@ -61,10 +61,7 @@ QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) range = QueueRange(q); } -QueueGuard::~QueueGuard() { - QPID_LOG(debug, logPrefix << "Cancelled"); - cancel(); -} +QueueGuard::~QueueGuard() { cancel(); } // NOTE: Called with message lock held. void QueueGuard::enqueued(const Message& m) { @@ -73,10 +70,9 @@ void QueueGuard::enqueued(const Message& m) { m.getIngressCompletion()->startCompleter(); { Mutex::ScopedLock l(lock); - if (!delayed.insert(Delayed::value_type(m.getSequence(), m.getIngressCompletion())).second) { - QPID_LOG(critical, logPrefix << "Second enqueue for message with sequence " << m.getSequence()); - assert(false); - } + if (cancelled) return; // Don't record enqueues after we are cancelled. + assert(delayed.find(m.getSequence()) == delayed.end()); + delayed[m.getSequence()] = m.getIngressCompletion(); } } @@ -104,7 +100,8 @@ void QueueGuard::cancel() { Delayed removed; { Mutex::ScopedLock l(lock); - if (delayed.empty()) return; // No need if no delayed messages. + if (cancelled) return; + cancelled = true; delayed.swap(removed); } completeRange(removed.begin(), removed.end()); @@ -116,12 +113,13 @@ void QueueGuard::attach(ReplicatingSubscription& rs) { } bool QueueGuard::subscriptionStart(SequenceNumber position) { + // Complete any messages before or at the ReplicatingSubscription start position. + // Those messages are already on the backup. Delayed removed; { Mutex::ScopedLock l(lock); - // Complete any messages before or at the ReplicatingSubscription start position. - // Those messages are already on the backup. - for (Delayed::iterator i = delayed.begin(); i != delayed.end() && i->first <= position;) { + Delayed::iterator i = delayed.begin(); + while(i != delayed.end() && i->first <= position) { removed.insert(*i); delayed.erase(i++); } diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h index 3904b3bd3f..2d768b5b72 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.h +++ b/qpid/cpp/src/qpid/ha/QueueGuard.h @@ -106,6 +106,7 @@ class QueueGuard { class QueueObserver; sys::Mutex lock; + bool cancelled; std::string logPrefix; broker::Queue& queue; typedef std::map<framing::SequenceNumber, boost::intrusive_ptr<broker::AsyncCompletion> > Delayed; |
