/* * * Copyright (c) 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ /** *

CLUSTER IMPLEMENTATION OVERVIEW

* * The cluster works on the principle that if all members of the * cluster receive identical input, they will all produce identical * results. cluster::Connections intercept data received from clients * and multicast it via CPG. The data is processed (passed to the * broker::Connection) only when it is received from CPG in cluster * order. Each cluster member has Connection objects for directly * connected clients and "shadow" Connection objects for connections * to other members. * * This assumes that all broker actions occur deterministically in * response to data arriving on client connections. There are two * situations where this assumption fails: * - sending data in response to polling local connections for writabiliy. * - taking actions based on a timer or timestamp comparison. * * IMPORTANT NOTE: any time code is added to the broker that uses timers, * the cluster may need to be updated to take account of this. * * * USE OF TIMESTAMPS IN THE BROKER * * The following are the current areas where broker uses timers or timestamps: * * - Producer flow control: broker::SemanticState uses * connection::getClusterOrderOutput. a FrameHandler that sends * frames to the client via the cluster. Used by broker::SessionState * * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is * implemented by cluster::ExpiryPolicy. * * - Connection heartbeat: sends connection controls, not part of * session command counting so OK to ignore. * * - LinkRegistry: only cluster elder is ever active for links. * * - management::ManagementBroker: uses MessageHandler supplied by cluster * to send messages to the broker via the cluster. * * - Dtx: not yet supported with cluster. * * cluster::ExpiryPolicy implements the strategy for message expiry. * * ClusterTimer implements periodic timed events in the cluster context. * Used for periodic management events. * *

CLUSTER PROTOCOL OVERVIEW

* * Messages sent to/from CPG are called Events. * * An Event carries a ConnectionId, which includes a MemberId and a * connection number. * * Events are either * - Connection events: non-0 connection number and are associated with a connection. * - Cluster Events: 0 connection number, are not associated with a connection. * * Events are further categorized as: * - Control: carries method frame(s) that affect cluster behavior. * - Data: carries raw data received from a client connection. * * The cluster defines extensions to the AMQP command set in ../../../xml/cluster.xml * which defines two classes: * - cluster: cluster control information. * - cluster.connection: control information for a specific connection. * * The following combinations are legal: * - Data frames carrying connection data. * - Cluster control events carrying cluster commands. * - Connection control events carrying cluster.connection commands. * - Connection control events carrying non-cluster frames: frames sent to the client. * e.g. flow-control frames generated on a timer. * *

CLUSTER INITIALIZATION OVERVIEW

* * @see InitialStatusMap * * When a new member joins the CPG group, all members (including the * new one) multicast their "initial status." The new member is in * PRE_INIT mode until it gets a complete set of initial status * messages from all cluster members. In a newly-forming cluster is * then in INIT mode until the configured cluster-size members have * joined. * * The newcomer uses initial status to determine * - The cluster UUID * - Am I speaking the correct version of the cluster protocol? * - Do I need to get an update from an existing active member? * - Can I recover from my own store? * * Pre-initialization happens in the Cluster constructor (plugin * early-init phase) because it needs to set the recovery flag before * the store initializes. This phase lasts until inital-status is * received for all active members. The PollableQueues and Multicaster * are in "bypass" mode during this phase since the poller has not * started so there are no threads to serve pollable queues. * * The remaining initialization happens in Cluster::initialize() or, * if cluster-size=N is specified, in the deliver thread when an * initial-status control is delivered that brings the total to N. */ #include "qpid/Exception.h" #include "qpid/cluster/Cluster.h" #include "qpid/sys/ClusterSafe.h" #include "qpid/cluster/ClusterSettings.h" #include "qpid/cluster/Connection.h" #include "qpid/cluster/UpdateClient.h" #include "qpid/cluster/RetractClient.h" #include "qpid/cluster/FailoverExchange.h" #include "qpid/cluster/UpdateExchange.h" #include "qpid/cluster/ClusterTimer.h" #include "qpid/assert.h" #include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" #include "qmf/org/apache/qpid/cluster/Package.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Connection.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/SignalHandler.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" #include "qpid/framing/ClusterConfigChangeBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionAbortBody.h" #include "qpid/framing/ClusterRetractOfferBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/framing/ClusterReadyBody.h" #include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterUpdateOfferBody.h" #include "qpid/framing/ClusterUpdateRequestBody.h" #include "qpid/framing/ClusterConnectionAnnounceBody.h" #include "qpid/framing/ClusterErrorCheckBody.h" #include "qpid/framing/ClusterTimerWakeupBody.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Helpers.h" #include "qpid/log/Statement.h" #include "qpid/management/IdAllocator.h" #include "qpid/management/ManagementAgent.h" #include "qpid/memory.h" #include "qpid/sys/Thread.h" #include #include #include #include #include #include #include #include namespace qpid { namespace cluster { using namespace qpid; using namespace qpid::framing; using namespace qpid::sys; using namespace qpid::cluster; using namespace framing::cluster; using namespace std; using management::ManagementAgent; using management::ManagementObject; using management::Manageable; using management::Args; namespace _qmf = ::qmf::org::apache::qpid::cluster; /** * NOTE: must increment this number whenever any incompatible changes in * cluster protocol/behavior are made. It allows early detection and * sensible reporting of an attempt to mix different versions in a * cluster. * * Currently use SVN revision to avoid clashes with versions from * different branches. */ const uint32_t Cluster::CLUSTER_VERSION = 904565; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; MemberId member; Cluster::Lock& l; ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {} void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } void initialStatus(uint32_t version, bool active, const Uuid& clusterId, uint8_t storeState, const Uuid& shutdownId, const std::string& firstConfig) { cluster.initialStatus( member, version, active, clusterId, framing::cluster::StoreState(storeState), shutdownId, firstConfig, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& members, const std::string& left, const std::string& joined) { cluster.configChange(member, members, left, joined, l); } void updateOffer(uint64_t updatee) { cluster.updateOffer(member, updatee, l); } void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); } void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { cluster.errorCheck(member, type, frameSeq, l); } void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); } void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); } void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } }; Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : settings(set), broker(b), mgmtObject(0), poller(b.getPoller()), cpg(*this), name(settings.name), myUrl(settings.url.empty() ? Url() : Url(settings.url)), self(cpg.self()), clusterId(true), expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())), mcast(cpg, poller, boost::bind(&Cluster::leave, this)), dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1), boost::bind(&Cluster::leave, this), "Error decoding events, may indicate a broker version mismatch", poller), deliverFrameQueue(boost::bind(&Cluster::deliveredFrame, this, _1), boost::bind(&Cluster::leave, this), "Error delivering frames", poller), quorum(boost::bind(&Cluster::leave, this)), decoder(boost::bind(&Cluster::deliverFrame, this, _1)), discarding(true), state(PRE_INIT), initMap(self, settings.size), store(broker.getDataDir().getPath()), elder(false), lastSize(0), lastBroker(false), updateRetracted(false), error(*this) { // We give ownership of the timer to the broker and keep a plain pointer. // This is OK as it means the timer has the same lifetime as the broker. timer = new ClusterTimer(*this); broker.setClusterTimer(std::auto_ptr(timer)); mAgent = broker.getManagementAgent(); if (mAgent != 0){ _qmf::Package packageInit(mAgent); mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str()); mAgent->addObject (mgmtObject); mgmtObject->set_status("JOINING"); } // Failover exchange provides membership updates to clients. failoverExchange.reset(new FailoverExchange(this)); broker.getExchanges().registerExchange(failoverExchange); // Update exchange is used during updates to replicate messages // without modifying delivery-properties.exchange. broker.getExchanges().registerExchange( boost::shared_ptr(new UpdateExchange(this))); // Load my store status before we go into initialization if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { store.load(); clusterId = store.getClusterId(); QPID_LOG(notice, "Cluster store state: " << store) } cpg.join(name); // pump the CPG dispatch manually till we get past PRE_INIT. while (state == PRE_INIT) cpg.dispatchOne(); } Cluster::~Cluster() { broker.setClusterTimer(std::auto_ptr(0)); // Delete cluster timer if (updateThread.id()) updateThread.join(); // Join the previous updatethread. } void Cluster::initialize() { if (settings.quorum) quorum.start(poller); if (myUrl.empty()) myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT)); broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); broker.setExpiryPolicy(expiryPolicy); dispatcher.start(); deliverEventQueue.bypassOff(); deliverEventQueue.start(); deliverFrameQueue.bypassOff(); deliverFrameQueue.start(); mcast.start(); // Run initMapCompleted immediately to process the initial configuration. assert(state == INIT); initMapCompleted(*(Mutex::ScopedLock*)0); // Fake lock, single-threaded context. // Add finalizer last for exception safety. broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); } // Called in connection thread to insert a client connection. void Cluster::addLocalConnection(const boost::intrusive_ptr& c) { assert(c->getId().getMember() == self); localConnections.insert(c); } // Called in connection thread to insert an updated shadow connection. void Cluster::addShadowConnection(const boost::intrusive_ptr& c) { QPID_LOG(info, *this << " new shadow connection " << c->getId()); // Safe to use connections here because we're pre-catchup, stalled // and discarding, so deliveredFrame is not processing any // connection events. assert(discarding); pair ib = connections.insert(ConnectionMap::value_type(c->getId(), c)); assert(ib.second); } void Cluster::erase(const ConnectionId& id) { Lock l(lock); erase(id,l); } // Called by Connection::deliverClose() in deliverFrameQueue thread. void Cluster::erase(const ConnectionId& id, Lock&) { QPID_LOG(info, *this << " connection closed " << id); connections.erase(id); decoder.erase(id); } std::vector Cluster::getIds() const { Lock l(lock); return getIds(l); } std::vector Cluster::getIds(Lock&) const { return map.memberIds(); } std::vector Cluster::getUrls() const { Lock l(lock); return getUrls(l); } std::vector Cluster::getUrls(Lock&) const { return map.memberUrls(); } void Cluster::leave() { Lock l(lock); leave(l); } #define LEAVE_TRY(STMT) try { STMT; } \ catch (const std::exception& e) { \ QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \ } do {} while(0) void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; QPID_LOG(notice, *this << " leaving cluster " << name); // Finalize connections now now to avoid problems later in destructor. ClusterSafeScope css; // Don't trigger cluster-safe assertions. LEAVE_TRY(localConnections.clear()); LEAVE_TRY(connections.clear()); LEAVE_TRY(broker::SignalHandler::shutdown()); } } // Deliver CPG message. void Cluster::deliver( cpg_handle_t /*handle*/, const cpg_name* /*group*/, uint32_t nodeid, uint32_t pid, void* msg, int msg_len) { MemberId from(nodeid, pid); framing::Buffer buf(static_cast(msg), msg_len); Event e(Event::decodeCopy(from, buf)); deliverEvent(e); } void Cluster::deliverEvent(const Event& e) { deliverEventQueue.push(e); } void Cluster::deliverFrame(const EventFrame& e) { deliverFrameQueue.push(e); } const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) { return (body && body->getMethod() && body->getMethod()->isA()) ? static_cast(body) : 0; } const ClusterConnectionAnnounceBody* castAnnounce( const framing::AMQBody *body) { return (body && body->getMethod() && body->getMethod()->isA()) ? static_cast(body) : 0; } // Handler for deliverEventQueue. // This thread decodes frames from events. void Cluster::deliveredEvent(const Event& e) { if (e.isCluster()) { EventFrame ef(e, e.getFrame()); // Stop the deliverEventQueue on update offers. // This preserves the connection decoder fragments for an update. const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody()); if (offer) { QPID_LOG(info, *this << " stall for update offer from " << e.getMemberId() << " to " << MemberId(offer->getUpdatee())); deliverEventQueue.stop(); } deliverFrame(ef); } else if(!discarding) { if (e.isControl()) deliverFrame(EventFrame(e, e.getFrame())); else { try { decoder.decode(e, e.getData()); } catch (const Exception& ex) { // Close a connection that is sending us invalid data. QPID_LOG(error, *this << " aborting connection " << e.getConnectionId() << ": " << ex.what()); framing::AMQFrame abort((ClusterConnectionAbortBody())); deliverFrame(EventFrame(EventHeader(CONTROL, e.getConnectionId()), abort)); } } } } void Cluster::flagError( Connection& connection, ErrorCheck::ErrorType type, const std::string& msg) { Mutex::ScopedLock l(lock); if (connection.isCatchUp()) { QPID_LOG(critical, *this << " error on update connection " << connection << ": " << msg); leave(l); } error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg); } // Handler for deliverFrameQueue. // This thread executes the main logic. void Cluster::deliveredFrame(const EventFrame& efConst) { Mutex::ScopedLock l(lock); sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts. if (state == LEFT) return; EventFrame e(efConst); const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody()); if (offer && error.isUnresolved()) { // We can't honour an update offer that is delivered while an // error is in progress so replace it with a retractOffer and re-start // the event queue. e.frame = AMQFrame( ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee())); deliverEventQueue.start(); } // Process each frame through the error checker. if (error.isUnresolved()) { error.delivered(e); while (error.canProcess()) // There is a frame ready to process. processFrame(error.getNext(), l); } else processFrame(e, l); } void Cluster::processFrame(const EventFrame& e, Lock& l) { if (e.isCluster()) { QPID_LOG(trace, *this << " DLVR: " << e); ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l); if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled()) throw Exception(QPID_MSG("Invalid cluster control")); } else if (state >= CATCHUP) { map.incrementFrameSeq(); ConnectionPtr connection = getConnection(e, l); if (connection) { QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e); connection->deliveredFrame(e); } else QPID_LOG(trace, *this << " DROP (no connection): " << e); } else // Drop connection frames while state < CATCHUP QPID_LOG(trace, *this << " DROP (joining): " << e); } // Called in deliverFrameQueue thread ConnectionPtr Cluster::getConnection(const EventFrame& e, Lock&) { ConnectionId id = e.connectionId; ConnectionMap::iterator i = connections.find(id); if (i != connections.end()) return i->second; ConnectionPtr cp; // If the frame is an announcement for a new connection, add it. const ClusterConnectionAnnounceBody *announce = castAnnounce(e.frame.getBody()); if (e.frame.getBody() && e.frame.getMethod() && announce) { if (id.getMember() == self) { // Announces one of my own cp = localConnections.getErase(id); assert(cp); } else { // New remote connection, create a shadow. qpid::sys::SecuritySettings secSettings; if (announce) { secSettings.ssf = announce->getSsf(); secSettings.authid = announce->getAuthid(); secSettings.nodict = announce->getNodict(); } cp = new Connection(*this, shadowOut, announce->getManagementId(), id, secSettings); } connections.insert(ConnectionMap::value_type(id, cp)); } return cp; } Cluster::ConnectionVector Cluster::getConnections(Lock&) { ConnectionVector result(connections.size()); std::transform(connections.begin(), connections.end(), result.begin(), boost::bind(&ConnectionMap::value_type::second, _1)); return result; } // CPG config-change callback. void Cluster::configChange ( cpg_handle_t /*handle*/, const cpg_name */*group*/, const cpg_address *members, int nMembers, const cpg_address *left, int nLeft, const cpg_address *joined, int nJoined) { Mutex::ScopedLock l(lock); string membersStr, leftStr, joinedStr; // Encode members and enqueue as an event so the config change can // be executed in the correct thread. for (const cpg_address* p = members; p < members+nMembers; ++p) membersStr.append(MemberId(*p).str()); for (const cpg_address* p = left; p < left+nLeft; ++p) leftStr.append(MemberId(*p).str()); for (const cpg_address* p = joined; p < joined+nJoined; ++p) joinedStr.append(MemberId(*p).str()); deliverEvent(Event::control(ClusterConfigChangeBody( ProtocolVersion(), membersStr, leftStr, joinedStr), self)); } void Cluster::setReady(Lock&) { state = READY; if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); mcast.setReady(); broker.getQueueEvents().enable(); enableClusterSafe(); // Enable cluster-safe assertions. } void Cluster::initMapCompleted(Lock& l) { // Called on completion of the initial status map. QPID_LOG(debug, *this << " initial status map complete. "); if (state == PRE_INIT) { // PRE_INIT means we're still in the earlyInitialize phase, in the constructor. // We decide here whether we want to recover from our store. // We won't recover if we are joining an active cluster or our store is dirty. if (store.hasStore() && store.getState() != STORE_STATE_EMPTY_STORE && (initMap.isActive() || store.getState() == STORE_STATE_DIRTY_STORE)) broker.setRecovery(false); // Ditch my current store. state = INIT; } else if (state == INIT) { // INIT means we are past Cluster::initialize(). // If we're forming an initial cluster (no active members) // then we wait to reach the configured cluster-size if (!initMap.isActive() && initMap.getActualSize() < initMap.getRequiredSize()) { QPID_LOG(info, *this << initMap.getActualSize() << " members, waiting for at least " << initMap.getRequiredSize()); return; } initMap.checkConsistent(); elders = initMap.getElders(); QPID_LOG(debug, *this << " elders: " << elders); if (elders.empty()) becomeElder(l); else { broker.getLinks().setPassive(true); broker.getQueueEvents().disable(); QPID_LOG(info, *this << " not active for links."); } setClusterId(initMap.getClusterId(), l); if (initMap.isUpdateNeeded()) { // Joining established cluster. broker.setRecovery(false); // Ditch my current store. broker.setClusterUpdatee(true); if (mAgent) mAgent->suppress(true); // Suppress mgmt output during update. state = JOINER; mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); QPID_LOG(notice, *this << " joining cluster " << name); } else { // I can go ready. discarding = false; setReady(l); memberUpdate(l); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); QPID_LOG(notice, *this << " joined cluster " << name); } } } void Cluster::configChange(const MemberId&, const std::string& membersStr, const std::string& leftStr, const std::string& joinedStr, Lock& l) { if (state == LEFT) return; MemberSet members = decodeMemberSet(membersStr); MemberSet left = decodeMemberSet(leftStr); MemberSet joined = decodeMemberSet(joinedStr); QPID_LOG(notice, *this << " Membership update: " << members); QPID_LOG_IF(notice, !left.empty(), *this << " Members left: " << left); QPID_LOG_IF(notice, !joined.empty(), *this << " Members joined: " << joined); // Update initital status for members joining or leaving. elders = intersection(elders, members); if (elders.empty() && INIT < state && state < CATCHUP) { QPID_LOG(critical, "Cannot update, all potential updaters left the cluster."); leave(l); return; } bool memberChange = map.configChange(members); // Update initital status for members joining or leaving. initMap.configChange(members); if (initMap.isResendNeeded()) { mcast.mcastControl( ClusterInitialStatusBody( ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, store.getState(), store.getShutdownId(), initMap.getFirstConfigStr() ), self); } if (initMap.transitionToComplete()) initMapCompleted(l); if (state >= CATCHUP && memberChange) { memberUpdate(l); if (elders.empty()) becomeElder(l); } } void Cluster::becomeElder(Lock&) { if (elder) return; // We were already the elder. // We are the oldest, reactive links if necessary QPID_LOG(info, *this << " became the elder, active for links."); elder = true; broker.getLinks().setPassive(false); timer->becomeElder(); } void Cluster::makeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isJoiner(id)) { state = OFFER; QPID_LOG(info, *this << " send update-offer to " << id); mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id), self); } } namespace { struct AppendQueue { ostream* os; AppendQueue(ostream& o) : os(&o) {} void operator()(const boost::shared_ptr& q) { (*os) << " " << q->getName() << "=" << q->getMessageCount(); } }; } // namespace // Log a snapshot of broker state, used for debugging inconsistency problems. // May only be called in deliver thread. std::string Cluster::debugSnapshot() { assertClusterSafe(); std::ostringstream msg; msg << "queue snapshot at " << map.getFrameSeq() << ":"; AppendQueue append(msg); broker.getQueues().eachQueue(append); return msg.str(); } // Called from Broker::~Broker when broker is shut down. At this // point we know the poller has stopped so no poller callbacks will be // invoked. We must ensure that CPG has also shut down so no CPG // callbacks will be invoked. // void Cluster::brokerShutdown() { sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts. try { cpg.shutdown(); } catch (const std::exception& e) { QPID_LOG(error, *this << " shutting down CPG: " << e.what()); } delete this; } void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) { map.updateRequest(id, url); makeOffer(id, l); } void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active, const framing::Uuid& id, framing::cluster::StoreState store, const framing::Uuid& shutdownId, const std::string& firstConfig, Lock& l) { if (version != CLUSTER_VERSION) { QPID_LOG(critical, *this << " incompatible cluster versions " << version << " != " << CLUSTER_VERSION); leave(l); return; } QPID_LOG_IF(debug, state == PRE_INIT, *this << " received initial status from " << member); initMap.received( member, ClusterInitialStatusBody(ProtocolVersion(), version, active, id, store, shutdownId, firstConfig) ); if (initMap.transitionToComplete()) initMapCompleted(l); } void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { try { if (map.ready(id, Url(url))) memberUpdate(l); if (state == CATCHUP && id == self) { setReady(l); QPID_LOG(notice, *this << " caught up."); } } catch (const Url::Invalid& e) { QPID_LOG(error, "Invalid URL in cluster ready command: " << url); } } void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) { // NOTE: deliverEventQueue has been stopped at the update offer by // deliveredEvent in case an update is required. if (state == LEFT) return; MemberId updatee(updateeInt); boost::optional url = map.updateOffer(updater, updatee); if (updater == self) { assert(state == OFFER); if (url) // My offer was first. updateStart(updatee, *url, l); else { // Another offer was first. QPID_LOG(info, *this << " cancelled offer to " << updatee << " unstall"); setReady(l); makeOffer(map.firstJoiner(), l); // Maybe make another offer. deliverEventQueue.start(); // Go back to normal processing } } else if (updatee == self && url) { assert(state == JOINER); state = UPDATEE; QPID_LOG(notice, *this << " receiving update from " << updater); checkUpdateIn(l); } else { QPID_LOG(debug,*this << " unstall, ignore update " << updater << " to " << updatee); deliverEventQueue.start(); // Not involved in update. } if (updatee != self && url) { QPID_LOG(debug, debugSnapshot()); if (mAgent) mAgent->clusterUpdate(); // Updatee will call clusterUpdate when update completes } } static client::ConnectionSettings connectionSettings(const ClusterSettings& settings) { client::ConnectionSettings cs; cs.username = settings.username; cs.password = settings.password; cs.mechanism = settings.mechanism; return cs; } void Cluster::retractOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) { // An offer was received while handling an error, and converted to a retract. // Behavior is very similar to updateOffer. if (state == LEFT) return; MemberId updatee(updateeInt); boost::optional url = map.updateOffer(updater, updatee); if (updater == self) { assert(state == OFFER); if (url) { // My offer was first. if (updateThread.id()) updateThread.join(); // Join the previous updateThread to avoid leaks. updateThread = Thread(new RetractClient(*url, connectionSettings(settings))); } setReady(l); makeOffer(map.firstJoiner(), l); // Maybe make another offer. // Don't unstall the event queue, that was already done in deliveredFrame } QPID_LOG(debug,*this << " retracted offer " << updater << " to " << updatee); } void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { // NOTE: deliverEventQueue is already stopped at the stall point by deliveredEvent. if (state == LEFT) return; assert(state == OFFER); state = UPDATER; QPID_LOG(notice, *this << " sending update to " << updatee << " at " << url); if (updateThread.id()) updateThread.join(); // Join the previous updateThread to avoid leaks. updateThread = Thread( new UpdateClient(self, updatee, url, broker, map, *expiryPolicy, getConnections(l), decoder, boost::bind(&Cluster::updateOutDone, this), boost::bind(&Cluster::updateOutError, this, _1), connectionSettings(settings))); } // Called in update thread. void Cluster::updateInDone(const ClusterMap& m) { Lock l(lock); updatedMap = m; checkUpdateIn(l); } void Cluster::updateInRetracted() { Lock l(lock); updateRetracted = true; map.clearStatus(); checkUpdateIn(l); } void Cluster::checkUpdateIn(Lock& l) { if (state != UPDATEE) return; // Wait till we reach the stall point. if (updatedMap) { // We're up to date map = *updatedMap; failoverExchange->setUrls(getUrls(l)); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; broker.setClusterUpdatee(false); if (mAgent) mAgent->suppress(false); // Enable management output. discarding = false; // ok to set, we're stalled for update. QPID_LOG(notice, *this << " update complete, starting catch-up."); QPID_LOG(debug, debugSnapshot()); if (mAgent) mAgent->clusterUpdate(); deliverEventQueue.start(); } else if (updateRetracted) { // Update was retracted, request another update updateRetracted = false; state = JOINER; QPID_LOG(notice, *this << " update retracted, sending new update request."); mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); deliverEventQueue.start(); } } void Cluster::updateOutDone() { Monitor::ScopedLock l(lock); updateOutDone(l); } void Cluster::updateOutDone(Lock& l) { QPID_LOG(notice, *this << " update sent"); assert(state == UPDATER); state = READY; deliverEventQueue.start(); // Start processing events again. makeOffer(map.firstJoiner(), l); // Try another offer } void Cluster::updateOutError(const std::exception& e) { Monitor::ScopedLock l(lock); QPID_LOG(error, *this << " error sending update: " << e.what()); updateOutDone(l); } void Cluster ::shutdown(const MemberId& , const Uuid& id, Lock& l) { QPID_LOG(notice, *this << " cluster shut down by administrator."); if (store.hasStore()) store.clean(id); leave(l); } ManagementObject* Cluster::GetManagementObject() const { return mgmtObject; } Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, string&) { Lock l(lock); QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]"); switch (methodId) { case _qmf::Cluster::METHOD_STOPCLUSTERNODE : { _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args; stringstream stream; stream << self; if (iargs.i_brokerId == stream.str()) stopClusterNode(l); } break; case _qmf::Cluster::METHOD_STOPFULLCLUSTER : stopFullCluster(l); break; default: return Manageable::STATUS_UNKNOWN_METHOD; } return Manageable::STATUS_OK; } void Cluster::stopClusterNode(Lock& l) { QPID_LOG(notice, *this << " cluster member stopped by administrator."); leave(l); } void Cluster::stopFullCluster(Lock& ) { QPID_LOG(notice, *this << " shutting down cluster " << name); mcast.mcastControl(ClusterShutdownBody(ProtocolVersion(), Uuid(true)), self); } void Cluster::memberUpdate(Lock& l) { QPID_LOG(info, *this << " member update: " << map); std::vector urls = getUrls(l); std::vector ids = getIds(l); size_t size = urls.size(); failoverExchange->updateUrls(urls); if (store.hasStore()) { // Mark store clean if I am the only broker, dirty otherwise. if (size == 1 ) { if (store.getState() != STORE_STATE_CLEAN_STORE) { QPID_LOG(notice, "Sole member of cluster, marking store clean."); store.clean(Uuid(true)); } } else { if (store.getState() != STORE_STATE_DIRTY_STORE) { QPID_LOG(notice, "No longer sole cluster member, marking store dirty."); store.dirty(); } } } if (size == 1 && lastSize > 1 && state >= CATCHUP) { QPID_LOG(notice, *this << " last broker standing, update queue policies"); lastBroker = true; broker.getQueues().updateQueueClusterState(true); } else if (size > 1 && lastBroker) { QPID_LOG(notice, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size); lastBroker = false; broker.getQueues().updateQueueClusterState(false); } lastSize = size; if (mgmtObject) { mgmtObject->set_clusterSize(size); string urlstr; for(std::vector::iterator iter = urls.begin(); iter != urls.end(); iter++ ) { if (iter != urls.begin()) urlstr += ";"; urlstr += iter->str(); } string idstr; for(std::vector::iterator iter = ids.begin(); iter != ids.end(); iter++ ) { if (iter != ids.begin()) idstr += ";"; idstr += (*iter); } mgmtObject->set_members(urlstr); mgmtObject->set_memberIDs(idstr); } // Close connections belonging to members that have left the cluster. ConnectionMap::iterator i = connections.begin(); while (i != connections.end()) { ConnectionMap::iterator j = i++; MemberId m = j->second->getId().getMember(); if (m != self && !map.isMember(m)) { j->second->getBrokerConnection().closed(); erase(j->second->getId(), l); } } } std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { static const char* STATE[] = { "PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" }; assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1); o << "cluster(" << cluster.self << " " << STATE[cluster.state]; if (cluster.error.isUnresolved()) o << "/error"; return o << ")";; } MemberId Cluster::getId() const { return self; // Immutable, no need to lock. } broker::Broker& Cluster::getBroker() const { return broker; // Immutable, no need to lock. } void Cluster::setClusterId(const Uuid& uuid, Lock&) { clusterId = uuid; if (store.hasStore()) store.setClusterId(uuid); if (mgmtObject) { stringstream stream; stream << self; mgmtObject->set_clusterID(clusterId.str()); mgmtObject->set_memberID(stream.str()); } QPID_LOG(debug, *this << " cluster-uuid = " << clusterId); } void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { expiryPolicy->deliverExpire(id); } void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) { // If we see an errorCheck here (rather than in the ErrorCheck // class) then we have processed succesfully past the point of the // error. if (state >= CATCHUP) // Don't respond pre catchup, we don't know what happened error.respondNone(from, type, frameSeq); } void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) { if (state >= CATCHUP) // Pre catchup our timer isn't set up. timer->deliverWakeup(name); } void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) { QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name) if (state >= CATCHUP) // Pre catchup our timer isn't set up. timer->deliverDrop(name); } bool Cluster::isElder() const { Monitor::ScopedLock l(lock); return elder; } }} // namespace qpid::cluster