summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h2
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp7
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp20
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h3
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp20
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py20
6 files changed, 58 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h b/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h
index 1878939aad..2386a01084 100644
--- a/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h
+++ b/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h
@@ -45,7 +45,7 @@ class AlternateExchangeSetter
/** If altEx is already known, call setter(altEx) now else save for later */
void setAlternate(const std::string& altEx, const SetFunction& setter) {
- broker::Exchange::shared_ptr ex = exchanges.find(altEx);
+ boost::shared_ptr<broker::Exchange> ex = exchanges.find(altEx);
if (ex) setter(ex); // Set immediately.
else setters.insert(Setters::value_type(altEx, setter)); // Save for later.
}
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 6852a58b0c..3024656daa 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -75,8 +75,13 @@ void Backup::initialize(const Url& brokers) {
}
Backup::~Backup() {
+ QPID_LOG(debug, logPrefix << "Backup shutting down.");
if (link) link->close();
- if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
+ if (replicator.get()) {
+ broker.getExchanges().destroy(replicator->getName());
+ replicator->shutdown();
+ replicator.reset();
+ }
}
// Called via management.
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 7572c7e516..73ab5327fc 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -68,7 +68,7 @@ using namespace broker;
namespace {
-const string QPID_CONFIGURATION_REPLICATOR("qpid.configuration-replicator");
+const string QPID_CONFIGURATION_REPLICATOR("qpid.broker-replicator");
const string CLASS_NAME("_class_name");
const string EVENT("_event");
@@ -208,7 +208,12 @@ void BrokerReplicator::initialize() {
);
}
-BrokerReplicator::~BrokerReplicator() { }
+BrokerReplicator::~BrokerReplicator() { shutdown(); }
+
+void BrokerReplicator::shutdown() {
+ QPID_LOG(debug, logPrefix << "BrokerReplicator shutting down.");
+ broker.getQueues().eachQueue(boost::bind(&BrokerReplicator::deactivate, this, _1));
+}
// This is called in the connection IO thread when the bridge is started.
void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
@@ -591,7 +596,7 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu
}
}
-void BrokerReplicator::deleteQueue(const std::string& name) {
+void BrokerReplicator::deactivateQueue(const std::string& name) {
boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name);
if (qr) {
qr->deactivate();
@@ -599,7 +604,14 @@ void BrokerReplicator::deleteQueue(const std::string& name) {
// actually be destroyed.
broker.getExchanges().destroy(qr->getName());
}
- qr.reset();
+}
+
+void BrokerReplicator::deactivate(boost::shared_ptr<broker::Queue> q) {
+ deactivateQueue(q->getName());
+}
+
+void BrokerReplicator::deleteQueue(const std::string& name) {
+ deactivateQueue(name);
try {
broker.deleteQueue(name, userId, remoteHost);
QPID_LOG(debug, logPrefix << "Queue deleted: " << name);
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index 11c828d50e..bbdf3e2c0e 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -76,6 +76,7 @@ class BrokerReplicator : public broker::Exchange,
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
void route(broker::Deliverable&);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
+ void shutdown();
private:
typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr;
@@ -141,6 +142,8 @@ class BrokerReplicator : public broker::Exchange,
const qpid::framing::FieldTable& args,
const std::string& alternateExchange);
+ void deactivateQueue(const std::string& name);
+ void deactivate(boost::shared_ptr<broker::Queue> q);
void deleteQueue(const std::string& name);
void deleteExchange(const std::string& name);
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index c872e408c5..c8341ccef3 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -44,6 +44,7 @@ namespace qpid {
namespace ha {
using namespace broker;
using namespace framing;
+using namespace std;
const std::string QPID_HA_EVENT_PREFIX("qpid.ha-");
const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue");
@@ -124,13 +125,18 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
SequenceNumber front, back;
queue->getRange(front, back, broker::REPLICATOR);
if (front <= back) settings.setInt(ReplicatingSubscription::QPID_FRONT, front);
- peer.getMessage().subscribe(
- args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
- false/*exclusive*/, "", 0, settings);
- // FIXME aconway 2012-05-22: use a finite credit window?
- peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
- peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
-
+ try {
+ peer.getMessage().subscribe(
+ args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
+ false/*exclusive*/, "", 0, settings);
+ // FIXME aconway 2012-05-22: use a finite credit window?
+ peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
+ peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
+ }
+ catch(const exception& e) {
+ QPID_LOG(error, QPID_MSG(logPrefix + "Cannot connect to primary: " << e.what()));
+ throw;
+ }
qpid::Address primary;
link->getRemoteAddress(primary);
QPID_LOG(info, logPrefix << "Connected to " << primary << "(" << bridgeName << ")");
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 86f33d8030..f1620cf55d 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -648,6 +648,24 @@ acl deny all all
self.assertRaises(NotFound, s.receiver, ("e1"));
+ def test_auto_delete_qpid_4285(self):
+ """Regression test for QPID-4285: an auto delete queue gets stuck in
+ a partially deleted state and causes replication errors."""
+ cluster = HaCluster(self,2)
+ cluster[1].wait_status("ready")
+ s = cluster[0].connect().session()
+ s.receiver("q;{create:always}")
+ cluster[1].wait_backup("q")
+ cluster.kill(0) # Make the backup take over.
+ s = cluster[1].connect().session()
+ s.receiver("q;{delete:always}").close() # Delete q on new primary
+ try:
+ s.receiver("q")
+ self.fail("Expected NotFound exception") # Should not be avaliable
+ except NotFound: pass
+ assert not cluster[1].agent().getQueue("q") # Should not be in QMF
+
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit
@@ -660,7 +678,7 @@ def fairshare(msgs, limit, levels):
msgs = postponed
count = 0
last_priority = None
- postponed = []
+ postponed = [ ]
msg = msgs.pop(0)
if last_priority and priority_level(msg.priority, levels) == last_priority:
count += 1