From ee1608c509fd71adc827c2d96c7cefebe61dd642 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 5 Jul 2012 19:57:25 +0000 Subject: QPID-4085: HA message-loss race condition, handling replication event after response. If the backup broker receives a declare event for a queue after receiving a queue response for the same queue, it removes the queue and replaces it with the new one from the reponse. Previously it did not remove the corresponding bridge so things fail when we attempt to create it. Corrected to remove the bridge also. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1357846 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 19 ++++++++++++------- qpid/cpp/src/qpid/ha/BrokerReplicator.h | 1 + qpid/cpp/src/qpid/ha/Primary.cpp | 4 ++++ qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 2 +- 4 files changed, 18 insertions(+), 8 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 83c2eaa144..1fabff6a09 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -312,6 +312,7 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { if (broker.getQueues().find(name)) { QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name); broker.getQueues().destroy(name); + stopQueueReplicator(name); } std::pair, bool> result = broker.createQueue( @@ -343,13 +344,7 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { boost::shared_ptr queue = broker.getQueues().find(name); if (queue && replicationTest.replicateLevel(queue->getSettings())) { QPID_LOG(debug, logPrefix << "Queue delete event: " << name); - boost::shared_ptr qr = findQueueReplicator(name); - if (qr) { - qr->deactivate(); - // QueueReplicator's bridge is now queued for destruction but may not - // actually be destroyed. - broker.getExchanges().destroy(qr->getName()); - } + stopQueueReplicator(name); broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString()); } } @@ -563,6 +558,16 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr& queu } } +void BrokerReplicator::stopQueueReplicator(const std::string& name) { + boost::shared_ptr qr = findQueueReplicator(name); + if (qr) { + qr->deactivate(); + // QueueReplicator's bridge is now queued for destruction but may not + // actually be destroyed. + broker.getExchanges().destroy(qr->getName()); + } +} + bool BrokerReplicator::bind(boost::shared_ptr, const string&, const framing::FieldTable*) { return false; } bool BrokerReplicator::unbind(boost::shared_ptr, const string&, const framing::FieldTable*) { return false; } bool BrokerReplicator::isBound(boost::shared_ptr, const string* const, const framing::FieldTable* const) { return false; } diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index f7439fe892..8289ad7e9d 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -94,6 +94,7 @@ class BrokerReplicator : public broker::Exchange, QueueReplicatorPtr findQueueReplicator(const std::string& qname); void startQueueReplicator(const boost::shared_ptr&); + void stopQueueReplicator(const std::string& name); std::string logPrefix; ReplicationTest replicationTest; diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 5eb6b292f7..56598c2b5a 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -28,6 +28,7 @@ #include "qpid/assert.h" #include "qpid/broker/Broker.h" #include "qpid/broker/ConfigurationObserver.h" +#include "qpid/broker/Connection.h" #include "qpid/broker/Queue.h" #include "qpid/framing/FieldTable.h" #include "qpid/log/Statement.h" @@ -161,6 +162,9 @@ void Primary::opened(broker::Connection& connection) { } haBroker.addBroker(info); } + else + QPID_LOG(debug, logPrefix << "Accepted client connection " + << connection.getMgmtId()) } void Primary::closed(broker::Connection& connection) { diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index dbed7e1537..70dd91f57f 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -94,7 +94,7 @@ void QueueReplicator::activate() { bridge = result.first; } -QueueReplicator::~QueueReplicator() {} +QueueReplicator::~QueueReplicator() { deactivate(); } void QueueReplicator::deactivate() { // destroy the route -- cgit v1.2.1