diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/ha/Membership.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha/Membership.cpp')
-rw-r--r-- | cpp/src/qpid/ha/Membership.cpp | 113 |
1 files changed, 105 insertions, 8 deletions
diff --git a/cpp/src/qpid/ha/Membership.cpp b/cpp/src/qpid/ha/Membership.cpp index 74580f9b1e..6c64d86fd7 100644 --- a/cpp/src/qpid/ha/Membership.cpp +++ b/cpp/src/qpid/ha/Membership.cpp @@ -19,6 +19,12 @@ * */ #include "Membership.h" +#include "HaBroker.h" +#include "qpid/broker/Broker.h" +#include "qpid/management/ManagementAgent.h" +#include "qpid/types/Variant.h" +#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h" +#include "qmf/org/apache/qpid/ha/HaBroker.h" #include <boost/bind.hpp> #include <iostream> #include <iterator> @@ -26,37 +32,58 @@ namespace qpid { namespace ha { +namespace _qmf = ::qmf::org::apache::qpid::ha; -void Membership::reset(const BrokerInfo& b) { +using sys::Mutex; +using types::Variant; + +Membership::Membership(const BrokerInfo& info, HaBroker& b) + : haBroker(b), self(info.getSystemId()) +{ + brokers[self] = info; +} + +void Membership::clear() { + Mutex::ScopedLock l(lock); + BrokerInfo me = brokers[self]; brokers.clear(); - brokers[b.getSystemId()] = b; + brokers[self] = me; } void Membership::add(const BrokerInfo& b) { + Mutex::ScopedLock l(lock); brokers[b.getSystemId()] = b; + update(l); } void Membership::remove(const types::Uuid& id) { + Mutex::ScopedLock l(lock); + if (id == self) return; // Never remove myself BrokerInfo::Map::iterator i = brokers.find(id); if (i != brokers.end()) { brokers.erase(i); - } + update(l); + } } bool Membership::contains(const types::Uuid& id) { + Mutex::ScopedLock l(lock); return brokers.find(id) != brokers.end(); } void Membership::assign(const types::Variant::List& list) { - brokers.clear(); + Mutex::ScopedLock l(lock); + clear(); for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { BrokerInfo b(i->asMap()); brokers[b.getSystemId()] = b; } + update(l); } types::Variant::List Membership::asList() const { + Mutex::ScopedLock l(lock); types::Variant::List list; for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i) list.push_back(i->second.asMap()); @@ -64,6 +91,7 @@ types::Variant::List Membership::asList() const { } BrokerInfo::Set Membership::otherBackups() const { + Mutex::ScopedLock l(lock); BrokerInfo::Set result; for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i) if (i->second.getStatus() == READY && i->second.getSystemId() != self) @@ -71,15 +99,84 @@ BrokerInfo::Set Membership::otherBackups() const { return result; } -bool Membership::get(const types::Uuid& id, BrokerInfo& result) { - BrokerInfo::Map::iterator i = brokers.find(id); +bool Membership::get(const types::Uuid& id, BrokerInfo& result) const { + Mutex::ScopedLock l(lock); + BrokerInfo::Map::const_iterator i = brokers.find(id); if (i == brokers.end()) return false; result = i->second; return true; } -std::ostream& operator<<(std::ostream& o, const Membership& members) { - return o << members.brokers; +void Membership::update(Mutex::ScopedLock& l) { + QPID_LOG(info, "Membership: " << brokers); + Variant::List brokers = asList(); + if (mgmtObject) mgmtObject->set_status(printable(getStatus(l)).str()); + if (mgmtObject) mgmtObject->set_members(brokers); + haBroker.getBroker().getManagementAgent()->raiseEvent( + _qmf::EventMembersUpdate(brokers)); +} + +void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) { + Mutex::ScopedLock l(lock); + mgmtObject = mo; + update(l); +} + + +namespace { +bool checkTransition(BrokerStatus from, BrokerStatus to) { + // Legal state transitions. Initial state is JOINING, ACTIVE is terminal. + static const BrokerStatus TRANSITIONS[][2] = { + { STANDALONE, JOINING }, // Initialization of backup broker + { JOINING, CATCHUP }, // Connected to primary + { JOINING, RECOVERING }, // Chosen as initial primary. + { CATCHUP, READY }, // Caught up all queues, ready to take over. + { READY, RECOVERING }, // Chosen as new primary + { READY, CATCHUP }, // Timed out failing over, demoted to catch-up. + { RECOVERING, ACTIVE } // All expected backups are ready + }; + static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]); + for (size_t i = 0; i < N; ++i) { + if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to) + return true; + } + return false; +} +} // namespace + +void Membership::setStatus(BrokerStatus newStatus) { + BrokerStatus status = getStatus(); + QPID_LOG(info, "Status change: " + << printable(status) << " -> " << printable(newStatus)); + bool legal = checkTransition(status, newStatus); + if (!legal) { + haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(status) + << " -> " << printable(newStatus))); + } + + Mutex::ScopedLock l(lock); + brokers[self].setStatus(newStatus); + if (mgmtObject) mgmtObject->set_status(printable(newStatus).str()); + update(l); +} + +BrokerStatus Membership::getStatus() const { + Mutex::ScopedLock l(lock); + return getStatus(l); +} + +BrokerStatus Membership::getStatus(sys::Mutex::ScopedLock&) const { + BrokerInfo::Map::const_iterator i = brokers.find(self); + assert(i != brokers.end()); + return i->second.getStatus(); +} + +BrokerInfo Membership::getInfo() const { + Mutex::ScopedLock l(lock); + BrokerInfo::Map::const_iterator i = brokers.find(self); + assert(i != brokers.end()); + return i->second; } +// FIXME aconway 2013-01-23: move to .h? }} // namespace qpid::ha |