/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "ConnectionObserver.h" #include "HaBroker.h" #include "Membership.h" #include "qpid/broker/Broker.h" #include "qpid/framing/FieldTable.h" #include "qpid/management/ManagementAgent.h" #include "qpid/types/Variant.h" #include "qmf/org/apache/qpid/ha/EventMembersUpdate.h" #include "qmf/org/apache/qpid/ha/HaBroker.h" #include #include #include namespace qpid { namespace ha { namespace _qmf = ::qmf::org::apache::qpid::ha; using sys::Mutex; using types::Variant; Membership::Membership(const BrokerInfo& info, HaBroker& b) : haBroker(b), self(info.getSystemId()) { brokers[self] = info; setPrefix(); oldStatus = info.getStatus(); } void Membership::setPrefix() { haBroker.logPrefix = Msg() << shortStr(brokers[self].getSystemId()) << "(" << printable(brokers[self].getStatus()) << ") "; } void Membership::clear() { Mutex::ScopedLock l(lock); BrokerInfo me = brokers[self]; brokers.clear(); brokers[self] = me; } void Membership::add(const BrokerInfo& b) { Mutex::ScopedLock l(lock); assert(b.getSystemId() != self); brokers[b.getSystemId()] = b; update(true, l); } void Membership::remove(const types::Uuid& id) { Mutex::ScopedLock l(lock); if (id == self) return; // Never remove myself BrokerInfo::Map::iterator i = brokers.find(id); if (i != brokers.end()) { brokers.erase(i); update(true, l); } } bool Membership::contains(const types::Uuid& id) { Mutex::ScopedLock l(lock); return brokers.find(id) != brokers.end(); } void Membership::assign(const types::Variant::List& list) { Mutex::ScopedLock l(lock); clear(); for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { BrokerInfo b(i->asMap()); brokers[b.getSystemId()] = b; } update(true, l); } types::Variant::List Membership::asList() const { Mutex::ScopedLock l(lock); return asList(l); } types::Variant::List Membership::asList(sys::Mutex::ScopedLock&) const { types::Variant::List list; for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i) list.push_back(i->second.asMap()); return list; } BrokerInfo::Set Membership::otherBackups() const { Mutex::ScopedLock l(lock); BrokerInfo::Set result; for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i) if (i->second.getStatus() == READY && i->second.getSystemId() != self) result.insert(i->second); return result; } BrokerInfo::Set Membership::getBrokers() const { Mutex::ScopedLock l(lock); BrokerInfo::Set result; transform(brokers.begin(), brokers.end(), inserter(result, result.begin()), boost::bind(&BrokerInfo::Map::value_type::second, _1)); return result; } bool Membership::get(const types::Uuid& id, BrokerInfo& result) const { Mutex::ScopedLock l(lock); BrokerInfo::Map::const_iterator i = brokers.find(id); if (i == brokers.end()) return false; result = i->second; return true; } namespace { bool checkTransition(BrokerStatus from, BrokerStatus to) { // Legal state transitions. Initial state is JOINING, ACTIVE is terminal. static const BrokerStatus TRANSITIONS[][2] = { { STANDALONE, JOINING }, // Initialization of backup broker { JOINING, CATCHUP }, // Connected to primary { JOINING, RECOVERING }, // Chosen as initial primary. { CATCHUP, READY }, // Caught up all queues, ready to take over. { READY, RECOVERING }, // Chosen as new primary { READY, CATCHUP }, // Timed out failing over, demoted to catch-up. { RECOVERING, ACTIVE } // All expected backups are ready }; static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]); for (size_t i = 0; i < N; ++i) { if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to) return true; } return false; } } // namespace void Membership::update(bool log, Mutex::ScopedLock& l) { // Update managment and send update event. BrokerStatus newStatus = getStatus(l); Variant::List brokerList = asList(l); if (mgmtObject) { mgmtObject->set_status(printable(newStatus).str()); mgmtObject->set_members(brokerList); } haBroker.getBroker().getManagementAgent()->raiseEvent( _qmf::EventMembersUpdate(brokerList)); // Update link client properties framing::FieldTable linkProperties = haBroker.getBroker().getLinkClientProperties(); if (isBackup(newStatus)) { // Set backup tag on outgoing link properties. linkProperties.setTable( ConnectionObserver::BACKUP_TAG, brokers[types::Uuid(self)].asFieldTable()); haBroker.getBroker().setLinkClientProperties(linkProperties); } else { // Remove backup tag property from outgoing link properties. linkProperties.erase(ConnectionObserver::BACKUP_TAG); haBroker.getBroker().setLinkClientProperties(linkProperties); } // Check status transitions if (oldStatus != newStatus) { QPID_LOG(info, haBroker.logPrefix << "Status change: " << printable(oldStatus) << " -> " << printable(newStatus)); if (!checkTransition(oldStatus, newStatus)) { haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(oldStatus) << " -> " << printable(newStatus))); } oldStatus = newStatus; setPrefix(); if (newStatus == READY) QPID_LOG(notice, haBroker.logPrefix << "Backup is ready"); } if (log) QPID_LOG(info, haBroker.logPrefix << "Membership update: " << brokers); } void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) { Mutex::ScopedLock l(lock); mgmtObject = mo; update(false, l); } void Membership::setStatus(BrokerStatus newStatus) { Mutex::ScopedLock l(lock); brokers[self].setStatus(newStatus); update(false, l); } BrokerStatus Membership::getStatus() const { Mutex::ScopedLock l(lock); return getStatus(l); } BrokerStatus Membership::getStatus(sys::Mutex::ScopedLock&) const { BrokerInfo::Map::const_iterator i = brokers.find(self); assert(i != brokers.end()); return i->second.getStatus(); } BrokerInfo Membership::getSelf() const { Mutex::ScopedLock l(lock); BrokerInfo::Map::const_iterator i = brokers.find(self); assert(i != brokers.end()); return i->second; } void Membership::setSelfAddress(const Address& a) { Mutex::ScopedLock l(lock); brokers[self].setAddress(a); update(false, l); } }} // namespace qpid::ha