/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "Membership.h" #include "HaBroker.h" #include "qpid/broker/Broker.h" #include "qpid/management/ManagementAgent.h" #include "qpid/types/Variant.h" #include "qmf/org/apache/qpid/ha/EventMembersUpdate.h" #include "qmf/org/apache/qpid/ha/HaBroker.h" #include #include #include namespace qpid { namespace ha { namespace _qmf = ::qmf::org::apache::qpid::ha; using sys::Mutex; using types::Variant; Membership::Membership(const BrokerInfo& info, HaBroker& b) : haBroker(b), self(info.getSystemId()) { brokers[self] = info; } void Membership::clear() { Mutex::ScopedLock l(lock); BrokerInfo me = brokers[self]; brokers.clear(); brokers[self] = me; } void Membership::add(const BrokerInfo& b) { Mutex::ScopedLock l(lock); brokers[b.getSystemId()] = b; update(l); } void Membership::remove(const types::Uuid& id) { Mutex::ScopedLock l(lock); if (id == self) return; // Never remove myself BrokerInfo::Map::iterator i = brokers.find(id); if (i != brokers.end()) { brokers.erase(i); update(l); } } bool Membership::contains(const types::Uuid& id) { Mutex::ScopedLock l(lock); return brokers.find(id) != brokers.end(); } void Membership::assign(const types::Variant::List& list) { Mutex::ScopedLock l(lock); clear(); for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { BrokerInfo b(i->asMap()); brokers[b.getSystemId()] = b; } update(l); } types::Variant::List Membership::asList() const { Mutex::ScopedLock l(lock); types::Variant::List list; for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i) list.push_back(i->second.asMap()); return list; } BrokerInfo::Set Membership::otherBackups() const { Mutex::ScopedLock l(lock); BrokerInfo::Set result; for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i) if (i->second.getStatus() == READY && i->second.getSystemId() != self) result.insert(i->second); return result; } bool Membership::get(const types::Uuid& id, BrokerInfo& result) const { Mutex::ScopedLock l(lock); BrokerInfo::Map::const_iterator i = brokers.find(id); if (i == brokers.end()) return false; result = i->second; return true; } void Membership::update(Mutex::ScopedLock& l) { QPID_LOG(info, "Membership: " << brokers); Variant::List brokers = asList(); if (mgmtObject) mgmtObject->set_status(printable(getStatus(l)).str()); if (mgmtObject) mgmtObject->set_members(brokers); haBroker.getBroker().getManagementAgent()->raiseEvent( _qmf::EventMembersUpdate(brokers)); } void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) { Mutex::ScopedLock l(lock); mgmtObject = mo; update(l); } namespace { bool checkTransition(BrokerStatus from, BrokerStatus to) { // Legal state transitions. Initial state is JOINING, ACTIVE is terminal. static const BrokerStatus TRANSITIONS[][2] = { { STANDALONE, JOINING }, // Initialization of backup broker { JOINING, CATCHUP }, // Connected to primary { JOINING, RECOVERING }, // Chosen as initial primary. { CATCHUP, READY }, // Caught up all queues, ready to take over. { READY, RECOVERING }, // Chosen as new primary { READY, CATCHUP }, // Timed out failing over, demoted to catch-up. { RECOVERING, ACTIVE } // All expected backups are ready }; static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]); for (size_t i = 0; i < N; ++i) { if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to) return true; } return false; } } // namespace void Membership::setStatus(BrokerStatus newStatus) { BrokerStatus status = getStatus(); QPID_LOG(info, "Status change: " << printable(status) << " -> " << printable(newStatus)); bool legal = checkTransition(status, newStatus); if (!legal) { haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(status) << " -> " << printable(newStatus))); } Mutex::ScopedLock l(lock); brokers[self].setStatus(newStatus); if (mgmtObject) mgmtObject->set_status(printable(newStatus).str()); update(l); } BrokerStatus Membership::getStatus() const { Mutex::ScopedLock l(lock); return getStatus(l); } BrokerStatus Membership::getStatus(sys::Mutex::ScopedLock&) const { BrokerInfo::Map::const_iterator i = brokers.find(self); assert(i != brokers.end()); return i->second.getStatus(); } BrokerInfo Membership::getInfo() const { Mutex::ScopedLock l(lock); BrokerInfo::Map::const_iterator i = brokers.find(self); assert(i != brokers.end()); return i->second; } // FIXME aconway 2013-01-23: move to .h? }} // namespace qpid::ha