diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/ha/QueueGuard.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha/QueueGuard.cpp')
-rw-r--r-- | cpp/src/qpid/ha/QueueGuard.cpp | 94 |
1 files changed, 39 insertions, 55 deletions
diff --git a/cpp/src/qpid/ha/QueueGuard.cpp b/cpp/src/qpid/ha/QueueGuard.cpp index 77e1f81a38..d06d88ca29 100644 --- a/cpp/src/qpid/ha/QueueGuard.cpp +++ b/cpp/src/qpid/ha/QueueGuard.cpp @@ -50,10 +50,10 @@ class QueueGuard::QueueObserver : public broker::QueueObserver QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) - : queue(q), subscription(0) + : cancelled(false), queue(q), subscription(0) { std::ostringstream os; - os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": "; + os << "Primary guard " << queue.getName() << "@" << info << ": "; logPrefix = os.str(); observer.reset(new QueueObserver(*this)); queue.addObserver(observer); @@ -66,45 +66,31 @@ QueueGuard::~QueueGuard() { cancel(); } // NOTE: Called with message lock held. void QueueGuard::enqueued(const Message& m) { // Delay completion - QPID_LOG(trace, logPrefix << "Delayed completion of " << m); + QPID_LOG(trace, logPrefix << "Delayed completion of " << m.getSequence()); + Mutex::ScopedLock l(lock); + if (cancelled) return; // Don't record enqueues after we are cancelled. + assert(delayed.find(m.getSequence()) == delayed.end()); + delayed[m.getSequence()] = m.getIngressCompletion(); m.getIngressCompletion()->startCompleter(); - { - Mutex::ScopedLock l(lock); - if (!delayed.insert(Delayed::value_type(m.getSequence(), m.getIngressCompletion())).second) { - QPID_LOG(critical, logPrefix << "Second enqueue for message with sequence " << m.getSequence()); - assert(false); - } - } } // NOTE: Called with message lock held. void QueueGuard::dequeued(const Message& m) { QPID_LOG(trace, logPrefix << "Dequeued " << m); - ReplicatingSubscription* rs=0; - { - Mutex::ScopedLock l(lock); - rs = subscription; - } - if (rs) rs->dequeued(m); - complete(m.getSequence()); -} - -void QueueGuard::completeRange(Delayed::iterator begin, Delayed::iterator end) { - for (Delayed::iterator i = begin; i != end; ++i) { - QPID_LOG(trace, logPrefix << "Completed " << i->first); - i->second->finishCompleter(); - } + Mutex::ScopedLock l(lock); + if (subscription) subscription->dequeued(m); + complete(m.getSequence(), l); } void QueueGuard::cancel() { queue.removeObserver(observer); - Delayed removed; - { - Mutex::ScopedLock l(lock); - if (delayed.empty()) return; // No need if no delayed messages. - delayed.swap(removed); + Mutex::ScopedLock l(lock); + if (cancelled) return; + cancelled = true; + for (Delayed::iterator i = delayed.begin(); i != delayed.end();) { + complete(i, l); + delayed.erase(i++); } - completeRange(removed.begin(), removed.end()); } void QueueGuard::attach(ReplicatingSubscription& rs) { @@ -113,38 +99,36 @@ void QueueGuard::attach(ReplicatingSubscription& rs) { } bool QueueGuard::subscriptionStart(SequenceNumber position) { - Delayed removed; - { - Mutex::ScopedLock l(lock); - // Complete any messages before or at the ReplicatingSubscription start position. - // Those messages are already on the backup. - for (Delayed::iterator i = delayed.begin(); i != delayed.end() && i->first <= position;) { - removed.insert(*i); - delayed.erase(i++); - } + // Complete any messages before or at the ReplicatingSubscription start position. + // Those messages are already on the backup. + Mutex::ScopedLock l(lock); + Delayed::iterator i = delayed.begin(); + while(i != delayed.end() && i->first <= position) { + complete(i, l); + delayed.erase(i++); } - completeRange(removed.begin(), removed.end()); return position >= range.back; } void QueueGuard::complete(SequenceNumber sequence) { - boost::intrusive_ptr<broker::AsyncCompletion> m; - { - Mutex::ScopedLock l(lock); - // The same message can be completed twice, by - // ReplicatingSubscription::acknowledged and dequeued. Remove it - // from the map so we only call finishCompleter() once - Delayed::iterator i = delayed.find(sequence); - if (i != delayed.end()) { - m = i->second; - delayed.erase(i); - } + Mutex::ScopedLock l(lock); + complete(sequence, l); +} +void QueueGuard::complete(SequenceNumber sequence, Mutex::ScopedLock& l) { + // The same message can be completed twice, by + // ReplicatingSubscription::acknowledged and dequeued. Remove it + // from the map so we only call finishCompleter() once + Delayed::iterator i = delayed.find(sequence); + if (i != delayed.end()) { + complete(i, l); + delayed.erase(i); } - if (m) { - QPID_LOG(trace, logPrefix << "Completed " << sequence); - m->finishCompleter(); - } +} + +void QueueGuard::complete(Delayed::iterator i, Mutex::ScopedLock&) { + QPID_LOG(trace, logPrefix << "Completed " << i->first); + i->second->finishCompleter(); } }} // namespaces qpid::ha |