diff options
| author | Alan Conway <aconway@apache.org> | 2013-01-11 20:34:19 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2013-01-11 20:34:19 +0000 |
| commit | 9ad9300d032b5e0aba5af4f7affd81600c9f8a5d (patch) | |
| tree | d6e42e51dab4048c09f1805ac9793a7eb03c4fc9 /qpid/cpp/src | |
| parent | 6a38c54635931afef32f80a3943a923d5f41cd95 (diff) | |
| download | qpid-python-9ad9300d032b5e0aba5af4f7affd81600c9f8a5d.tar.gz | |
QPID-4516: Sporadic failure in ha_tests test_failover_send_receive
Several fixes were required in the code to correct this problem:
- Missing break statement in switch.
- Remove unused function HaBroker::resetMembership
- Abort connection of timed-out backups so they can attempt to reconnect.
- New primary resets membership before allowing backups to connect.
- Test for and ignore double-promotion.
- HaBroker: dynamic logPrefix() shows status. Made status atomic for efficient access for log messages.
- Update primary status in membership.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1432273 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 79 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.h | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/RemoteBackup.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/StatusCheck.h | 9 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 13 |
6 files changed, 73 insertions, 41 deletions
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 8c9669f8f5..8f12f5cbe2 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -59,8 +59,7 @@ using boost::shared_ptr; // Called in Plugin::earlyInitialize HaBroker::HaBroker(broker::Broker& b, const Settings& s) - : logPrefix("Broker: "), - broker(b), + : broker(b), systemId(broker.getSystem()->getSystemId().data()), settings(s), observer(new ConnectionObserver(*this, systemId)), @@ -72,7 +71,8 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) // otherwise there's a window for a client to connect before we get to // initialize() if (settings.cluster) { - QPID_LOG(debug, logPrefix << "Rejecting client connections."); + status = JOINING; + QPID_LOG(debug, logPrefix() << "Rejecting client connections."); shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder); observer->setObserver(excluder, "Backup: "); broker.getConnectionObservers().add(observer); @@ -80,6 +80,16 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) } namespace { +const std::string PREFIX_PRIMARY("Primary("); +const std::string PREFIX_BACKUP("Backup("); +const std::string PREFIX_END("): "); +} +std::string HaBroker::logPrefix() const { + BrokerStatus s = status.get(); + return (isPrimary(s) ? PREFIX_PRIMARY : PREFIX_BACKUP) + printable(s).str()+PREFIX_END; +} + +namespace { const std::string NONE("none"); bool isNone(const std::string& x) { return x.empty() || x == NONE; } } @@ -92,7 +102,7 @@ void HaBroker::initialize() { broker.getSystem()->getNodeName(), broker.getPort(broker::Broker::TCP_TRANSPORT), systemId); - QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo); + QPID_LOG(notice, logPrefix() << "Initializing: " << brokerInfo); // Set up the management object. ManagementAgent* ma = broker.getManagementAgent(); @@ -114,7 +124,7 @@ void HaBroker::initialize() { status = JOINING; backup.reset(new Backup(*this, settings)); broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this); - statusCheck.reset(new StatusCheck(logPrefix, broker.getLinkHearbeatInterval(), brokerInfo)); + statusCheck.reset(new StatusCheck(logPrefix(), broker.getLinkHearbeatInterval(), brokerInfo)); if (!isNone(settings.publicUrl)) setPublicUrl(Url(settings.publicUrl)); if (!isNone(settings.brokerUrl)) setBrokerUrl(Url(settings.brokerUrl)); } @@ -127,7 +137,7 @@ void HaBroker::initialize() { } HaBroker::~HaBroker() { - QPID_LOG(notice, logPrefix << "Shut down"); + QPID_LOG(notice, logPrefix() << "Shut down"); broker.getConnectionObservers().remove(observer); } @@ -137,6 +147,11 @@ void HaBroker::recover() { BrokerInfo::Set backups; { Mutex::ScopedLock l(lock); + if (isPrimary(status.get())) { + QPID_LOG(info, "Ignoring promotion, already primary: " << brokerInfo); + return; + } + QPID_LOG(notice, "Promoting to primary: " << brokerInfo); // Reset membership before allowing backups to connect. backups = membership.otherBackups(); membership.reset(brokerInfo); @@ -167,12 +182,13 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, if (statusCheck->canPromote()) recover(); else { - QPID_LOG(error, logPrefix << "Cluster already active, cannot be promoted"); + QPID_LOG(error, + logPrefix() << "Joining active cluster, cannot be promoted."); throw Exception("Cluster already active, cannot be promoted."); } break; case CATCHUP: - QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted."); + QPID_LOG(error, logPrefix() << "Still catching up, cannot be promoted."); throw Exception("Still catching up, cannot be promoted."); break; case READY: recover(); break; @@ -193,7 +209,7 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, case _qmf::HaBroker::METHOD_REPLICATE: { _qmf::ArgsHaBrokerReplicate& bq_args = dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args); - QPID_LOG(debug, logPrefix << "Replicate individual queue " + QPID_LOG(debug, logPrefix() << "Replicate individual queue " << bq_args.i_queue << " from " << bq_args.i_broker); boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue); @@ -228,7 +244,7 @@ void HaBroker::setPublicUrl(const Url& url) { mgmtObject->set_publicUrl(url.str()); knownBrokers.clear(); knownBrokers.push_back(url); - QPID_LOG(debug, logPrefix << "Setting public URL to: " << url); + QPID_LOG(debug, logPrefix() << "Setting public URL to: " << url); } void HaBroker::setBrokerUrl(const Url& url) { @@ -237,8 +253,8 @@ void HaBroker::setBrokerUrl(const Url& url) { Mutex::ScopedLock l(lock); brokerUrl = url; mgmtObject->set_brokersUrl(brokerUrl.str()); - QPID_LOG(info, logPrefix << "Brokers URL set to: " << url); - if (status == JOINING && statusCheck.get()) statusCheck->setUrl(url); + QPID_LOG(info, logPrefix() << "Brokers URL set to: " << url); + if (status.get() == JOINING && statusCheck.get()) statusCheck->setUrl(url); b = backup; } if (b) b->setBrokerUrl(url); // Oustside lock, avoid deadlock @@ -250,13 +266,12 @@ std::vector<Url> HaBroker::getKnownBrokers() const { } void HaBroker::shutdown() { - QPID_LOG(critical, logPrefix << "Critical error, shutting down."); + QPID_LOG(critical, logPrefix() << "Critical error, shutting down."); broker.shutdown(); } BrokerStatus HaBroker::getStatus() const { - Mutex::ScopedLock l(lock); - return status; + return status.get(); } void HaBroker::setStatus(BrokerStatus newStatus) { @@ -285,12 +300,12 @@ bool checkTransition(BrokerStatus from, BrokerStatus to) { } // namespace void HaBroker::setStatus(BrokerStatus newStatus, Mutex::ScopedLock& l) { - QPID_LOG(info, logPrefix << "Status change: " - << printable(status) << " -> " << printable(newStatus)); - bool legal = checkTransition(status, newStatus); + QPID_LOG(info, logPrefix() << "Status change: " + << printable(status.get()) << " -> " << printable(newStatus)); + bool legal = checkTransition(status.get(), newStatus); if (!legal) { - QPID_LOG(critical, logPrefix << "Illegal state transition: " - << printable(status) << " -> " << printable(newStatus)); + QPID_LOG(critical, logPrefix() << "Illegal state transition: " + << printable(status.get()) << " -> " << printable(newStatus)); shutdown(); } assert(legal); // FIXME aconway 2012-12-07: fail @@ -299,13 +314,15 @@ void HaBroker::setStatus(BrokerStatus newStatus, Mutex::ScopedLock& l) { } void HaBroker::statusChanged(Mutex::ScopedLock& l) { - mgmtObject->set_status(printable(status).str()); - brokerInfo.setStatus(status); + mgmtObject->set_status(printable(status.get()).str()); + brokerInfo.setStatus(status.get()); + membership.add(brokerInfo); + membershipUpdated(l); setLinkProperties(l); } void HaBroker::membershipUpdated(Mutex::ScopedLock&) { - QPID_LOG(info, logPrefix << "Membership changed: " << membership); + QPID_LOG(info, logPrefix() << "Membership: " << membership); Variant::List brokers = membership.asList(); mgmtObject->set_members(brokers); broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers)); @@ -316,23 +333,24 @@ void HaBroker::setMembership(const Variant::List& brokers) { { Mutex::ScopedLock l(lock); membership.assign(brokers); - QPID_LOG(info, logPrefix << "Membership update: " << membership); BrokerInfo info; - // Update my status to what the primary says it is. The primary can toggle - // status between READY and CATCHUP based on the state of our subscriptions. - if (membership.get(systemId, info) && status != info.getStatus()) { + // Update my status to what the primary says it is. The primary sets + // status to READY when we are caught up, and sets status to CATCHUP + // (from READY) if we are timed out during recovery. + if (membership.get(systemId, info) && status.get() != info.getStatus()) { + assert((status.get() == CATCHUP && info.getStatus() == READY) || + (status.get() == READY && info.getStatus() == CATCHUP)); setStatus(info.getStatus(), l); b = backup; } membershipUpdated(l); } - if (b) b->setStatus(status); // Oustside lock, avoid deadlock + if (b) b->setStatus(status.get()); // Oustside lock, avoid deadlock } void HaBroker::addBroker(const BrokerInfo& b) { Mutex::ScopedLock l(lock); membership.add(b); - QPID_LOG(debug, logPrefix << "Membership add: " << b); membershipUpdated(l); } @@ -341,14 +359,13 @@ void HaBroker::removeBroker(const Uuid& id) { BrokerInfo info; if (membership.get(id, info)) { membership.remove(id); - QPID_LOG(debug, logPrefix << "Membership remove: " << info); membershipUpdated(l); } } void HaBroker::setLinkProperties(Mutex::ScopedLock&) { framing::FieldTable linkProperties = broker.getLinkClientProperties(); - if (isBackup(status)) { + if (isBackup(status.get())) { // If this is a backup then any outgoing links are backup // links and need to be tagged. linkProperties.setTable(ConnectionObserver::BACKUP_TAG, brokerInfo.asFieldTable()); diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 92eb2e078f..e6d6e1a94f 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -32,6 +32,7 @@ #include "qmf/org/apache/qpid/ha/HaBroker.h" #include "qpid/management/Manageable.h" #include "qpid/types/Variant.h" +#include "qpid/sys/AtomicValue.h" #include <set> #include <boost/shared_ptr.hpp> @@ -103,18 +104,16 @@ class HaBroker : public management::Manageable void setBrokerUrl(const Url&); void updateClientUrl(sys::Mutex::ScopedLock&); - bool isPrimary(sys::Mutex::ScopedLock&) { return !backup.get(); } - void setStatus(BrokerStatus, sys::Mutex::ScopedLock&); void recover(); void statusChanged(sys::Mutex::ScopedLock&); void setLinkProperties(sys::Mutex::ScopedLock&); + std::string logPrefix() const; std::vector<Url> getKnownBrokers() const; void membershipUpdated(sys::Mutex::ScopedLock&); - std::string logPrefix; broker::Broker& broker; types::Uuid systemId; const Settings settings; @@ -126,7 +125,7 @@ class HaBroker : public management::Manageable qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject; Url publicUrl, brokerUrl; std::vector<Url> knownBrokers; - BrokerStatus status; + sys::AtomicValue<BrokerStatus> status; BrokerInfo brokerInfo; Membership membership; ReplicationTest replicationTest; diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 40ae9ff07b..259b043bef 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -161,6 +161,10 @@ void Primary::timeoutExpectedBackups() { expectedBackups.erase(i++); backups.erase(info.getSystemId()); rb->cancel(); + // Downgrade the broker's status to CATCHUP + // The broker will get this status change when it eventually connects. + info.setStatus(CATCHUP); + haBroker.addBroker(info); } else ++i; } diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp index 6829737f29..394ba3041b 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp @@ -48,6 +48,8 @@ void RemoteBackup::setCatchupQueues(broker::QueueRegistry& queues, bool createGu RemoteBackup::~RemoteBackup() { cancel(); } void RemoteBackup::cancel() { + QPID_LOG(debug, logPrefix << "Cancelled " << (connection? "connected":"disconnected") + << " backup: " << brokerInfo); for (GuardMap::iterator i = guards.begin(); i != guards.end(); ++i) i->second->cancel(); guards.clear(); diff --git a/qpid/cpp/src/qpid/ha/StatusCheck.h b/qpid/cpp/src/qpid/ha/StatusCheck.h index 3c62c43a22..997ced4159 100644 --- a/qpid/cpp/src/qpid/ha/StatusCheck.h +++ b/qpid/cpp/src/qpid/ha/StatusCheck.h @@ -32,6 +32,11 @@ namespace qpid { namespace ha { +// FIXME aconway 2012-12-21: This solution is incomplete. It will only protect +// against bad promotion if there are READY brokers when this broker starts. +// It will not help the situation where brokers became READY after this one starts. +// + /** * Check whether a JOINING broker can be promoted . * @@ -49,8 +54,10 @@ class StatusCheck ~StatusCheck(); void setUrl(const Url&); bool canPromote(); - void setPromote(bool p); + private: + void setPromote(bool p); + std::string logPrefix; sys::Mutex lock; std::vector<sys::Thread> threads; diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index fdbd8a153b..1725c594de 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -1068,13 +1068,15 @@ class RecoveryTests(HaBrokerTest): l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. try: # We don't want backups to time out for this test, set long timeout. - cluster = HaCluster(self, 4, args=["--ha-backup-timeout=100000"]); + cluster = HaCluster(self, 4, args=["--ha-backup-timeout=120"]); # Wait for the primary to be ready cluster[0].wait_status("active") + for b in cluster[1:4]: b.wait_status("ready") # Create a queue before the failure. s1 = cluster.connect(0).session().sender("q1;{create:always}") for b in cluster: b.wait_backup("q1") for i in xrange(100): s1.send(str(i)) + # Kill primary and 2 backups cluster[3].wait_status("ready") for i in [0,1,2]: cluster.kill(i, False) @@ -1091,14 +1093,16 @@ class RecoveryTests(HaBrokerTest): s2 = cluster.connect(3).session().sender("q2;{create:always}") # Verify that messages sent are not completed - for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False) + for i in xrange(100,200): + s1.send(str(i), sync=False); + s2.send(str(i), sync=False) assertSyncTimeout(s1) self.assertEqual(s1.unsettled(), 100) assertSyncTimeout(s2) self.assertEqual(s2.unsettled(), 100) # Verify we can receive even if sending is on hold: - cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)]) + cluster[3].assert_browse("q1", [str(i) for i in range(200)]) # Restart backups, verify queues are released only when both backups are up cluster.restart(1) @@ -1106,11 +1110,10 @@ class RecoveryTests(HaBrokerTest): self.assertEqual(s1.unsettled(), 100) assertSyncTimeout(s2) self.assertEqual(s2.unsettled(), 100) - self.assertEqual(cluster[3].ha_status(), "recovering") cluster.restart(2) # Verify everything is up to date and active - def settled(sender): sender.sync(); return sender.unsettled() == 0; + def settled(sender): sender.sync(timeout=1); return sender.unsettled() == 0; assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled()) assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled()) cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)]) |
