diff options
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 20 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 20 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 20 |
6 files changed, 58 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h b/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h index 1878939aad..2386a01084 100644 --- a/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h +++ b/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h @@ -45,7 +45,7 @@ class AlternateExchangeSetter /** If altEx is already known, call setter(altEx) now else save for later */ void setAlternate(const std::string& altEx, const SetFunction& setter) { - broker::Exchange::shared_ptr ex = exchanges.find(altEx); + boost::shared_ptr<broker::Exchange> ex = exchanges.find(altEx); if (ex) setter(ex); // Set immediately. else setters.insert(Setters::value_type(altEx, setter)); // Save for later. } diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 6852a58b0c..3024656daa 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -75,8 +75,13 @@ void Backup::initialize(const Url& brokers) { } Backup::~Backup() { + QPID_LOG(debug, logPrefix << "Backup shutting down."); if (link) link->close(); - if (replicator.get()) broker.getExchanges().destroy(replicator->getName()); + if (replicator.get()) { + broker.getExchanges().destroy(replicator->getName()); + replicator->shutdown(); + replicator.reset(); + } } // Called via management. diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 7572c7e516..73ab5327fc 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -68,7 +68,7 @@ using namespace broker; namespace { -const string QPID_CONFIGURATION_REPLICATOR("qpid.configuration-replicator"); +const string QPID_CONFIGURATION_REPLICATOR("qpid.broker-replicator"); const string CLASS_NAME("_class_name"); const string EVENT("_event"); @@ -208,7 +208,12 @@ void BrokerReplicator::initialize() { ); } -BrokerReplicator::~BrokerReplicator() { } +BrokerReplicator::~BrokerReplicator() { shutdown(); } + +void BrokerReplicator::shutdown() { + QPID_LOG(debug, logPrefix << "BrokerReplicator shutting down."); + broker.getQueues().eachQueue(boost::bind(&BrokerReplicator::deactivate, this, _1)); +} // This is called in the connection IO thread when the bridge is started. void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { @@ -591,7 +596,7 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu } } -void BrokerReplicator::deleteQueue(const std::string& name) { +void BrokerReplicator::deactivateQueue(const std::string& name) { boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name); if (qr) { qr->deactivate(); @@ -599,7 +604,14 @@ void BrokerReplicator::deleteQueue(const std::string& name) { // actually be destroyed. broker.getExchanges().destroy(qr->getName()); } - qr.reset(); +} + +void BrokerReplicator::deactivate(boost::shared_ptr<broker::Queue> q) { + deactivateQueue(q->getName()); +} + +void BrokerReplicator::deleteQueue(const std::string& name) { + deactivateQueue(name); try { broker.deleteQueue(name, userId, remoteHost); QPID_LOG(debug, logPrefix << "Queue deleted: " << name); diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index 11c828d50e..bbdf3e2c0e 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -76,6 +76,7 @@ class BrokerReplicator : public broker::Exchange, bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); void route(broker::Deliverable&); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); + void shutdown(); private: typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr; @@ -141,6 +142,8 @@ class BrokerReplicator : public broker::Exchange, const qpid::framing::FieldTable& args, const std::string& alternateExchange); + void deactivateQueue(const std::string& name); + void deactivate(boost::shared_ptr<broker::Queue> q); void deleteQueue(const std::string& name); void deleteExchange(const std::string& name); diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index c872e408c5..c8341ccef3 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -44,6 +44,7 @@ namespace qpid { namespace ha { using namespace broker; using namespace framing; +using namespace std; const std::string QPID_HA_EVENT_PREFIX("qpid.ha-"); const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue"); @@ -124,13 +125,18 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa SequenceNumber front, back; queue->getRange(front, back, broker::REPLICATOR); if (front <= back) settings.setInt(ReplicatingSubscription::QPID_FRONT, front); - peer.getMessage().subscribe( - args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, - false/*exclusive*/, "", 0, settings); - // FIXME aconway 2012-05-22: use a finite credit window? - peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); - peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); - + try { + peer.getMessage().subscribe( + args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, + false/*exclusive*/, "", 0, settings); + // FIXME aconway 2012-05-22: use a finite credit window? + peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); + peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); + } + catch(const exception& e) { + QPID_LOG(error, QPID_MSG(logPrefix + "Cannot connect to primary: " << e.what())); + throw; + } qpid::Address primary; link->getRemoteAddress(primary); QPID_LOG(info, logPrefix << "Connected to " << primary << "(" << bridgeName << ")"); diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 86f33d8030..f1620cf55d 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -648,6 +648,24 @@ acl deny all all self.assertRaises(NotFound, s.receiver, ("e1")); + def test_auto_delete_qpid_4285(self): + """Regression test for QPID-4285: an auto delete queue gets stuck in + a partially deleted state and causes replication errors.""" + cluster = HaCluster(self,2) + cluster[1].wait_status("ready") + s = cluster[0].connect().session() + s.receiver("q;{create:always}") + cluster[1].wait_backup("q") + cluster.kill(0) # Make the backup take over. + s = cluster[1].connect().session() + s.receiver("q;{delete:always}").close() # Delete q on new primary + try: + s.receiver("q") + self.fail("Expected NotFound exception") # Should not be avaliable + except NotFound: pass + assert not cluster[1].agent().getQueue("q") # Should not be in QMF + + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit @@ -660,7 +678,7 @@ def fairshare(msgs, limit, levels): msgs = postponed count = 0 last_priority = None - postponed = [] + postponed = [ ] msg = msgs.pop(0) if last_priority and priority_level(msg.priority, levels) == last_priority: count += 1 |
