summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-10-18 19:42:21 +0000
committerAlan Conway <aconway@apache.org>2012-10-18 19:42:21 +0000
commitbb8760c65fdec8001d0edc2a01031ea700162e75 (patch)
treebaac0ffa77715e6e9a4fdbbfa8204e948b9e8920
parent43d6d12379f03a4fe9db2beacfe1be27de4f0ba3 (diff)
downloadqpid-python-bb8760c65fdec8001d0edc2a01031ea700162e75.tar.gz
Bug 867030 - QPID-4374: Make QueueGuard::cancel idempotent (Jason Dillaman)
Added QueueGuard::cancelled, only call cancel once. Don't enqueue after cancel. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1399814 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.cpp24
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.h1
2 files changed, 12 insertions, 13 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
index 8852554d31..f3bc4c4417 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
@@ -50,7 +50,7 @@ class QueueGuard::QueueObserver : public broker::QueueObserver
QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
- : queue(q), subscription(0)
+ : cancelled(false), queue(q), subscription(0)
{
std::ostringstream os;
os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": ";
@@ -61,10 +61,7 @@ QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
range = QueueRange(q);
}
-QueueGuard::~QueueGuard() {
- QPID_LOG(debug, logPrefix << "Cancelled");
- cancel();
-}
+QueueGuard::~QueueGuard() { cancel(); }
// NOTE: Called with message lock held.
void QueueGuard::enqueued(const Message& m) {
@@ -73,10 +70,9 @@ void QueueGuard::enqueued(const Message& m) {
m.getIngressCompletion()->startCompleter();
{
Mutex::ScopedLock l(lock);
- if (!delayed.insert(Delayed::value_type(m.getSequence(), m.getIngressCompletion())).second) {
- QPID_LOG(critical, logPrefix << "Second enqueue for message with sequence " << m.getSequence());
- assert(false);
- }
+ if (cancelled) return; // Don't record enqueues after we are cancelled.
+ assert(delayed.find(m.getSequence()) == delayed.end());
+ delayed[m.getSequence()] = m.getIngressCompletion();
}
}
@@ -104,7 +100,8 @@ void QueueGuard::cancel() {
Delayed removed;
{
Mutex::ScopedLock l(lock);
- if (delayed.empty()) return; // No need if no delayed messages.
+ if (cancelled) return;
+ cancelled = true;
delayed.swap(removed);
}
completeRange(removed.begin(), removed.end());
@@ -116,12 +113,13 @@ void QueueGuard::attach(ReplicatingSubscription& rs) {
}
bool QueueGuard::subscriptionStart(SequenceNumber position) {
+ // Complete any messages before or at the ReplicatingSubscription start position.
+ // Those messages are already on the backup.
Delayed removed;
{
Mutex::ScopedLock l(lock);
- // Complete any messages before or at the ReplicatingSubscription start position.
- // Those messages are already on the backup.
- for (Delayed::iterator i = delayed.begin(); i != delayed.end() && i->first <= position;) {
+ Delayed::iterator i = delayed.begin();
+ while(i != delayed.end() && i->first <= position) {
removed.insert(*i);
delayed.erase(i++);
}
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h
index 3904b3bd3f..2d768b5b72 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.h
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.h
@@ -106,6 +106,7 @@ class QueueGuard {
class QueueObserver;
sys::Mutex lock;
+ bool cancelled;
std::string logPrefix;
broker::Queue& queue;
typedef std::map<framing::SequenceNumber, boost::intrusive_ptr<broker::AsyncCompletion> > Delayed;