diff options
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 19 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 2 |
4 files changed, 18 insertions, 8 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 83c2eaa144..1fabff6a09 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -312,6 +312,7 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { if (broker.getQueues().find(name)) { QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name); broker.getQueues().destroy(name); + stopQueueReplicator(name); } std::pair<boost::shared_ptr<Queue>, bool> result = broker.createQueue( @@ -343,13 +344,7 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { boost::shared_ptr<Queue> queue = broker.getQueues().find(name); if (queue && replicationTest.replicateLevel(queue->getSettings())) { QPID_LOG(debug, logPrefix << "Queue delete event: " << name); - boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name); - if (qr) { - qr->deactivate(); - // QueueReplicator's bridge is now queued for destruction but may not - // actually be destroyed. - broker.getExchanges().destroy(qr->getName()); - } + stopQueueReplicator(name); broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString()); } } @@ -563,6 +558,16 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu } } +void BrokerReplicator::stopQueueReplicator(const std::string& name) { + boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name); + if (qr) { + qr->deactivate(); + // QueueReplicator's bridge is now queued for destruction but may not + // actually be destroyed. + broker.getExchanges().destroy(qr->getName()); + } +} + bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; } diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index f7439fe892..8289ad7e9d 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -94,6 +94,7 @@ class BrokerReplicator : public broker::Exchange, QueueReplicatorPtr findQueueReplicator(const std::string& qname); void startQueueReplicator(const boost::shared_ptr<broker::Queue>&); + void stopQueueReplicator(const std::string& name); std::string logPrefix; ReplicationTest replicationTest; diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 5eb6b292f7..56598c2b5a 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -28,6 +28,7 @@ #include "qpid/assert.h" #include "qpid/broker/Broker.h" #include "qpid/broker/ConfigurationObserver.h" +#include "qpid/broker/Connection.h" #include "qpid/broker/Queue.h" #include "qpid/framing/FieldTable.h" #include "qpid/log/Statement.h" @@ -161,6 +162,9 @@ void Primary::opened(broker::Connection& connection) { } haBroker.addBroker(info); } + else + QPID_LOG(debug, logPrefix << "Accepted client connection " + << connection.getMgmtId()) } void Primary::closed(broker::Connection& connection) { diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index dbed7e1537..70dd91f57f 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -94,7 +94,7 @@ void QueueReplicator::activate() { bridge = result.first; } -QueueReplicator::~QueueReplicator() {} +QueueReplicator::~QueueReplicator() { deactivate(); } void QueueReplicator::deactivate() { // destroy the route |
