summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp19
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h1
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp4
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp2
4 files changed, 18 insertions, 8 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 83c2eaa144..1fabff6a09 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -312,6 +312,7 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
if (broker.getQueues().find(name)) {
QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name);
broker.getQueues().destroy(name);
+ stopQueueReplicator(name);
}
std::pair<boost::shared_ptr<Queue>, bool> result =
broker.createQueue(
@@ -343,13 +344,7 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
if (queue && replicationTest.replicateLevel(queue->getSettings())) {
QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
- boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name);
- if (qr) {
- qr->deactivate();
- // QueueReplicator's bridge is now queued for destruction but may not
- // actually be destroyed.
- broker.getExchanges().destroy(qr->getName());
- }
+ stopQueueReplicator(name);
broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString());
}
}
@@ -563,6 +558,16 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu
}
}
+void BrokerReplicator::stopQueueReplicator(const std::string& name) {
+ boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name);
+ if (qr) {
+ qr->deactivate();
+ // QueueReplicator's bridge is now queued for destruction but may not
+ // actually be destroyed.
+ broker.getExchanges().destroy(qr->getName());
+ }
+}
+
bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index f7439fe892..8289ad7e9d 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -94,6 +94,7 @@ class BrokerReplicator : public broker::Exchange,
QueueReplicatorPtr findQueueReplicator(const std::string& qname);
void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
+ void stopQueueReplicator(const std::string& name);
std::string logPrefix;
ReplicationTest replicationTest;
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 5eb6b292f7..56598c2b5a 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -28,6 +28,7 @@
#include "qpid/assert.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/ConfigurationObserver.h"
+#include "qpid/broker/Connection.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/log/Statement.h"
@@ -161,6 +162,9 @@ void Primary::opened(broker::Connection& connection) {
}
haBroker.addBroker(info);
}
+ else
+ QPID_LOG(debug, logPrefix << "Accepted client connection "
+ << connection.getMgmtId())
}
void Primary::closed(broker::Connection& connection) {
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index dbed7e1537..70dd91f57f 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -94,7 +94,7 @@ void QueueReplicator::activate() {
bridge = result.first;
}
-QueueReplicator::~QueueReplicator() {}
+QueueReplicator::~QueueReplicator() { deactivate(); }
void QueueReplicator::deactivate() {
// destroy the route