diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 1176 |
1 files changed, 0 insertions, 1176 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp deleted file mode 100644 index 0daf0c7f5a..0000000000 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ /dev/null @@ -1,1176 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -/** - * <h1>CLUSTER IMPLEMENTATION OVERVIEW</h1> - * - * The cluster works on the principle that if all members of the - * cluster receive identical input, they will all produce identical - * results. cluster::Connections intercept data received from clients - * and multicast it via CPG. The data is processed (passed to the - * broker::Connection) only when it is received from CPG in cluster - * order. Each cluster member has Connection objects for directly - * connected clients and "shadow" Connection objects for connections - * to other members. - * - * This assumes that all broker actions occur deterministically in - * response to data arriving on client connections. There are two - * situations where this assumption fails: - * - sending data in response to polling local connections for writabiliy. - * - taking actions based on a timer or timestamp comparison. - * - * IMPORTANT NOTE: any time code is added to the broker that uses timers, - * the cluster may need to be updated to take account of this. - * - * - * USE OF TIMESTAMPS IN THE BROKER - * - * The following are the current areas where broker uses timers or timestamps: - * - * - Producer flow control: broker::SemanticState uses - * connection::getClusterOrderOutput. a FrameHandler that sends - * frames to the client via the cluster. Used by broker::SessionState - * - * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is - * implemented by cluster::ExpiryPolicy. - * - * - Connection heartbeat: sends connection controls, not part of - * session command counting so OK to ignore. - * - * - LinkRegistry: only cluster elder is ever active for links. - * - * - management::ManagementBroker: uses MessageHandler supplied by cluster - * to send messages to the broker via the cluster. - * - * - Dtx: not yet supported with cluster. - * - * cluster::ExpiryPolicy implements the strategy for message expiry. - * - * ClusterTimer implements periodic timed events in the cluster context. - * Used for periodic management events. - * - * <h1>CLUSTER PROTOCOL OVERVIEW</h1> - * - * Messages sent to/from CPG are called Events. - * - * An Event carries a ConnectionId, which includes a MemberId and a - * connection number. - * - * Events are either - * - Connection events: non-0 connection number and are associated with a connection. - * - Cluster Events: 0 connection number, are not associated with a connection. - * - * Events are further categorized as: - * - Control: carries method frame(s) that affect cluster behavior. - * - Data: carries raw data received from a client connection. - * - * The cluster defines extensions to the AMQP command set in ../../../xml/cluster.xml - * which defines two classes: - * - cluster: cluster control information. - * - cluster.connection: control information for a specific connection. - * - * The following combinations are legal: - * - Data frames carrying connection data. - * - Cluster control events carrying cluster commands. - * - Connection control events carrying cluster.connection commands. - * - Connection control events carrying non-cluster frames: frames sent to the client. - * e.g. flow-control frames generated on a timer. - * - * <h1>CLUSTER INITIALIZATION OVERVIEW</h1> - * - * @see InitialStatusMap - * - * When a new member joins the CPG group, all members (including the - * new one) multicast their "initial status." The new member is in - * PRE_INIT mode until it gets a complete set of initial status - * messages from all cluster members. In a newly-forming cluster is - * then in INIT mode until the configured cluster-size members have - * joined. - * - * The newcomer uses initial status to determine - * - The cluster UUID - * - Am I speaking the correct version of the cluster protocol? - * - Do I need to get an update from an existing active member? - * - Can I recover from my own store? - * - * Pre-initialization happens in the Cluster constructor (plugin - * early-init phase) because it needs to set the recovery flag before - * the store initializes. This phase lasts until inital-status is - * received for all active members. The PollableQueues and Multicaster - * are in "bypass" mode during this phase since the poller has not - * started so there are no threads to serve pollable queues. - * - * The remaining initialization happens in Cluster::initialize() or, - * if cluster-size=N is specified, in the deliver thread when an - * initial-status control is delivered that brings the total to N. - */ -#include "qpid/Exception.h" -#include "qpid/cluster/Cluster.h" -#include "qpid/sys/ClusterSafe.h" -#include "qpid/cluster/ClusterSettings.h" -#include "qpid/cluster/Connection.h" -#include "qpid/cluster/UpdateClient.h" -#include "qpid/cluster/RetractClient.h" -#include "qpid/cluster/FailoverExchange.h" -#include "qpid/cluster/UpdateDataExchange.h" -#include "qpid/cluster/UpdateExchange.h" -#include "qpid/cluster/ClusterTimer.h" - -#include "qpid/assert.h" -#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" -#include "qmf/org/apache/qpid/cluster/Package.h" -#include "qpid/broker/Broker.h" -#include "qpid/broker/Connection.h" -#include "qpid/broker/NullMessageStore.h" -#include "qpid/broker/QueueRegistry.h" -#include "qpid/broker/Queue.h" -#include "qpid/broker/Message.h" -#include "qpid/broker/SessionState.h" -#include "qpid/broker/SignalHandler.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/AMQP_AllOperations.h" -#include "qpid/framing/AllInvoker.h" -#include "qpid/framing/ClusterConfigChangeBody.h" -#include "qpid/framing/ClusterConnectionDeliverCloseBody.h" -#include "qpid/framing/ClusterConnectionAbortBody.h" -#include "qpid/framing/ClusterRetractOfferBody.h" -#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" -#include "qpid/framing/ClusterReadyBody.h" -#include "qpid/framing/ClusterShutdownBody.h" -#include "qpid/framing/ClusterUpdateOfferBody.h" -#include "qpid/framing/ClusterUpdateRequestBody.h" -#include "qpid/framing/ClusterConnectionAnnounceBody.h" -#include "qpid/framing/ClusterErrorCheckBody.h" -#include "qpid/framing/ClusterTimerWakeupBody.h" -#include "qpid/framing/ClusterDeliverToQueueBody.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/log/Helpers.h" -#include "qpid/log/Statement.h" -#include "qpid/management/ManagementAgent.h" -#include "qpid/memory.h" -#include "qpid/sys/Thread.h" - -#include <boost/shared_ptr.hpp> -#include <boost/bind.hpp> -#include <boost/cast.hpp> -#include <boost/current_function.hpp> -#include <algorithm> -#include <iterator> -#include <map> -#include <ostream> - - -namespace qpid { -namespace cluster { -using namespace qpid; -using namespace qpid::framing; -using namespace qpid::sys; -using namespace qpid::cluster; -using namespace framing::cluster; -using namespace std; -using management::ManagementAgent; -using management::ManagementObject; -using management::Manageable; -using management::Args; -namespace _qmf = ::qmf::org::apache::qpid::cluster; - -/** - * NOTE: must increment this number whenever any incompatible changes in - * cluster protocol/behavior are made. It allows early detection and - * sensible reporting of an attempt to mix different versions in a - * cluster. - * - * Currently use SVN revision to avoid clashes with versions from - * different branches. - */ -const uint32_t Cluster::CLUSTER_VERSION = 1097431; - -struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { - qpid::cluster::Cluster& cluster; - MemberId member; - Cluster::Lock& l; - ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {} - - void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } - - void initialStatus(uint32_t version, bool active, const Uuid& clusterId, - uint8_t storeState, const Uuid& shutdownId, - const std::string& firstConfig) - { - cluster.initialStatus( - member, version, active, clusterId, - framing::cluster::StoreState(storeState), shutdownId, - firstConfig, l); - } - void ready(const std::string& url) { - cluster.ready(member, url, l); - } - void configChange(const std::string& members, - const std::string& left, - const std::string& joined) - { - cluster.configChange(member, members, left, joined, l); - } - void updateOffer(uint64_t updatee) { - cluster.updateOffer(member, updatee, l); - } - void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); } - void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } - void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { - cluster.errorCheck(member, type, frameSeq, l); - } - void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); } - void timerDrop(const std::string& name) { cluster.timerDrop(member, name, l); } - void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); } - void deliverToQueue(const std::string& queue, const std::string& message) { - cluster.deliverToQueue(queue, message, l); - } - bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } -}; - -Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : - settings(set), - broker(b), - mgmtObject(0), - poller(b.getPoller()), - cpg(*this), - name(settings.name), - self(cpg.self()), - clusterId(true), - mAgent(0), - expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())), - mcast(cpg, poller, boost::bind(&Cluster::leave, this)), - dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), - deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1), - boost::bind(&Cluster::leave, this), - "Error decoding events, may indicate a broker version mismatch", - poller), - deliverFrameQueue(boost::bind(&Cluster::deliveredFrame, this, _1), - boost::bind(&Cluster::leave, this), - "Error delivering frames", - poller), - failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)), - updateDataExchange(new UpdateDataExchange(*this)), - quorum(boost::bind(&Cluster::leave, this)), - decoder(boost::bind(&Cluster::deliverFrame, this, _1)), - discarding(true), - state(PRE_INIT), - initMap(self, settings.size), - store(broker.getDataDir().getPath()), - elder(false), - lastAliveCount(0), - lastBroker(false), - updateRetracted(false), - updateClosed(false), - error(*this) -{ - broker.setInCluster(true); - - // We give ownership of the timer to the broker and keep a plain pointer. - // This is OK as it means the timer has the same lifetime as the broker. - timer = new ClusterTimer(*this); - broker.setClusterTimer(std::auto_ptr<sys::Timer>(timer)); - - // Failover exchange provides membership updates to clients. - broker.getExchanges().registerExchange(failoverExchange); - - // Update exchange is used during updates to replicate messages - // without modifying delivery-properties.exchange. - broker.getExchanges().registerExchange( - boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); - - // Update-data exchange is used for passing data that may be too large - // for single control frame. - broker.getExchanges().registerExchange(updateDataExchange); - - // Load my store status before we go into initialization - if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { - store.load(); - clusterId = store.getClusterId(); - QPID_LOG(notice, "Cluster store state: " << store) - } - cpg.join(name); - // pump the CPG dispatch manually till we get past PRE_INIT. - while (state == PRE_INIT) - cpg.dispatchOne(); -} - -Cluster::~Cluster() { - broker.setClusterTimer(std::auto_ptr<sys::Timer>(0)); // Delete cluster timer - if (updateThread) updateThread.join(); // Join the previous updatethread. -} - -void Cluster::initialize() { - if (settings.quorum) quorum.start(poller); - if (settings.url.empty()) - myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT)); - else - myUrl = settings.url; - broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); - broker.deferDelivery = boost::bind(&Cluster::deferDeliveryImpl, this, _1, _2); - broker.setExpiryPolicy(expiryPolicy); - deliverEventQueue.bypassOff(); - deliverEventQueue.start(); - deliverFrameQueue.bypassOff(); - deliverFrameQueue.start(); - mcast.start(); - - /// Create management object - mAgent = broker.getManagementAgent(); - if (mAgent != 0){ - _qmf::Package packageInit(mAgent); - mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str()); - mAgent->addObject (mgmtObject); - } - - // Run initMapCompleted immediately to process the initial configuration - // that allowed us to transition out of PRE_INIT - assert(state == INIT); - initMapCompleted(*(Mutex::ScopedLock*)0); // Fake lock, single-threaded context. - - // Add finalizer last for exception safety. - broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); - - // Start dispatching CPG events. - dispatcher.start(); -} - -// Called in connection thread to insert a client connection. -void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) { - assert(c->getId().getMember() == self); - localConnections.insert(c); -} - -// Called in connection thread to insert an updated shadow connection. -void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { - QPID_LOG(debug, *this << " new shadow connection " << c->getId()); - // Safe to use connections here because we're pre-catchup, stalled - // and discarding, so deliveredFrame is not processing any - // connection events. - assert(discarding); - pair<ConnectionMap::iterator, bool> ib - = connections.insert(ConnectionMap::value_type(c->getId(), c)); - assert(ib.second); -} - -void Cluster::erase(const ConnectionId& id) { - Lock l(lock); - erase(id,l); -} - -// Called by Connection::deliverClose() in deliverFrameQueue thread. -void Cluster::erase(const ConnectionId& id, Lock&) { - connections.erase(id); - decoder.erase(id); -} - -std::vector<string> Cluster::getIds() const { - Lock l(lock); - return getIds(l); -} - -std::vector<string> Cluster::getIds(Lock&) const { - return map.memberIds(); -} - -std::vector<Url> Cluster::getUrls() const { - Lock l(lock); - return getUrls(l); -} - -std::vector<Url> Cluster::getUrls(Lock&) const { - return map.memberUrls(); -} - -void Cluster::leave() { - Lock l(lock); - leave(l); -} - -#define LEAVE_TRY(STMT) try { STMT; } \ - catch (const std::exception& e) { \ - QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \ - } do {} while(0) - -void Cluster::leave(Lock&) { - if (state != LEFT) { - state = LEFT; - QPID_LOG(notice, *this << " leaving cluster " << name); - // Finalize connections now now to avoid problems later in destructor. - ClusterSafeScope css; // Don't trigger cluster-safe assertions. - LEAVE_TRY(localConnections.clear()); - LEAVE_TRY(connections.clear()); - LEAVE_TRY(broker::SignalHandler::shutdown()); - } -} - -// Deliver CPG message. -void Cluster::deliver( - cpg_handle_t /*handle*/, - const cpg_name* /*group*/, - uint32_t nodeid, - uint32_t pid, - void* msg, - int msg_len) -{ - MemberId from(nodeid, pid); - framing::Buffer buf(static_cast<char*>(msg), msg_len); - Event e(Event::decodeCopy(from, buf)); - deliverEvent(e); -} - -void Cluster::deliverEvent(const Event& e) { deliverEventQueue.push(e); } - -void Cluster::deliverFrame(const EventFrame& e) { deliverFrameQueue.push(e); } - -const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) { - return (body && body->getMethod() && - body->getMethod()->isA<ClusterUpdateOfferBody>()) ? - static_cast<const ClusterUpdateOfferBody*>(body) : 0; -} - -const ClusterConnectionAnnounceBody* castAnnounce( const framing::AMQBody *body) { - return (body && body->getMethod() && - body->getMethod()->isA<ClusterConnectionAnnounceBody>()) ? - static_cast<const ClusterConnectionAnnounceBody*>(body) : 0; -} - -// Handler for deliverEventQueue. -// This thread decodes frames from events. -void Cluster::deliveredEvent(const Event& e) { - if (e.isCluster()) { - EventFrame ef(e, e.getFrame()); - // Stop the deliverEventQueue on update offers. - // This preserves the connection decoder fragments for an update. - // Only do this for the two brokers that are directly involved in this - // offer: the one making the offer, or the one receiving it. - const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody()); - if (offer && ( e.getMemberId() == self || MemberId(offer->getUpdatee()) == self) ) { - QPID_LOG(info, *this << " stall for update offer from " << e.getMemberId() - << " to " << MemberId(offer->getUpdatee())); - deliverEventQueue.stop(); - } - deliverFrame(ef); - } - else if(!discarding) { - if (e.isControl()) - deliverFrame(EventFrame(e, e.getFrame())); - else { - try { decoder.decode(e, e.getData()); } - catch (const Exception& ex) { - // Close a connection that is sending us invalid data. - QPID_LOG(error, *this << " aborting connection " - << e.getConnectionId() << ": " << ex.what()); - framing::AMQFrame abort((ClusterConnectionAbortBody())); - deliverFrame(EventFrame(EventHeader(CONTROL, e.getConnectionId()), abort)); - } - } - } -} - -void Cluster::flagError( - Connection& connection, ErrorCheck::ErrorType type, const std::string& msg) -{ - Mutex::ScopedLock l(lock); - if (connection.isCatchUp()) { - QPID_LOG(critical, *this << " error on update connection " << connection - << ": " << msg); - leave(l); - } - error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg); -} - -// Handler for deliverFrameQueue. -// This thread executes the main logic. -void Cluster::deliveredFrame(const EventFrame& efConst) { - Mutex::ScopedLock l(lock); - sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts. - if (state == LEFT) return; - EventFrame e(efConst); - const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody()); - if (offer && error.isUnresolved()) { - // We can't honour an update offer that is delivered while an - // error is in progress so replace it with a retractOffer and re-start - // the event queue. - e.frame = AMQFrame( - ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee())); - deliverEventQueue.start(); - } - // Process each frame through the error checker. - if (error.isUnresolved()) { - error.delivered(e); - while (error.canProcess()) // There is a frame ready to process. - processFrame(error.getNext(), l); - } - else - processFrame(e, l); -} - - -void Cluster::processFrame(const EventFrame& e, Lock& l) { - if (e.isCluster()) { - QPID_LOG(trace, *this << " DLVR: " << e); - ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l); - if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled()) - throw Exception(QPID_MSG("Invalid cluster control")); - } - else if (state >= CATCHUP) { - map.incrementFrameSeq(); - ConnectionPtr connection = getConnection(e, l); - if (connection) { - QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e); - connection->deliveredFrame(e); - } - else - throw Exception(QPID_MSG("Unknown connection: " << e)); - } - else // Drop connection frames while state < CATCHUP - QPID_LOG(trace, *this << " DROP (joining): " << e); -} - -// Called in deliverFrameQueue thread -ConnectionPtr Cluster::getConnection(const EventFrame& e, Lock&) { - ConnectionId id = e.connectionId; - ConnectionMap::iterator i = connections.find(id); - if (i != connections.end()) return i->second; - ConnectionPtr cp; - // If the frame is an announcement for a new connection, add it. - const ClusterConnectionAnnounceBody *announce = castAnnounce(e.frame.getBody()); - if (e.frame.getBody() && e.frame.getMethod() && announce) - { - if (id.getMember() == self) { // Announces one of my own - cp = localConnections.getErase(id); - assert(cp); - } - else { // New remote connection, create a shadow. - qpid::sys::SecuritySettings secSettings; - if (announce) { - secSettings.ssf = announce->getSsf(); - secSettings.authid = announce->getAuthid(); - secSettings.nodict = announce->getNodict(); - } - cp = new Connection(*this, shadowOut, announce->getManagementId(), id, secSettings); - } - connections.insert(ConnectionMap::value_type(id, cp)); - } - return cp; -} - -Cluster::ConnectionVector Cluster::getConnections(Lock&) { - ConnectionVector result(connections.size()); - std::transform(connections.begin(), connections.end(), result.begin(), - boost::bind(&ConnectionMap::value_type::second, _1)); - return result; -} - -// CPG config-change callback. -void Cluster::configChange ( - cpg_handle_t /*handle*/, - const cpg_name */*group*/, - const cpg_address *members, int nMembers, - const cpg_address *left, int nLeft, - const cpg_address *joined, int nJoined) -{ - Mutex::ScopedLock l(lock); - string membersStr, leftStr, joinedStr; - // Encode members and enqueue as an event so the config change can - // be executed in the correct thread. - for (const cpg_address* p = members; p < members+nMembers; ++p) - membersStr.append(MemberId(*p).str()); - for (const cpg_address* p = left; p < left+nLeft; ++p) - leftStr.append(MemberId(*p).str()); - for (const cpg_address* p = joined; p < joined+nJoined; ++p) - joinedStr.append(MemberId(*p).str()); - deliverEvent(Event::control(ClusterConfigChangeBody( - ProtocolVersion(), membersStr, leftStr, joinedStr), - self)); -} - -void Cluster::setReady(Lock&) { - state = READY; - mcast.setReady(); - broker.getQueueEvents().enable(); - enableClusterSafe(); // Enable cluster-safe assertions. -} - -// Set the management status from the Cluster::state. -// -// NOTE: Management updates are sent based on property changes. In -// order to keep consistency across the cluster, we touch the local -// management status property even if it is locally unchanged for any -// event that could have cause a cluster property change on any cluster member. -void Cluster::setMgmtStatus(Lock&) { - if (mgmtObject) - mgmtObject->set_status(state >= CATCHUP ? "ACTIVE" : "JOINING"); -} - -void Cluster::initMapCompleted(Lock& l) { - // Called on completion of the initial status map. - QPID_LOG(debug, *this << " initial status map complete. "); - setMgmtStatus(l); - if (state == PRE_INIT) { - // PRE_INIT means we're still in the earlyInitialize phase, in the constructor. - // We decide here whether we want to recover from our store. - // We won't recover if we are joining an active cluster or our store is dirty. - if (store.hasStore() && - store.getState() != STORE_STATE_EMPTY_STORE && - (initMap.isActive() || store.getState() == STORE_STATE_DIRTY_STORE)) - broker.setRecovery(false); // Ditch my current store. - state = INIT; - } - else if (state == INIT) { - // INIT means we are past Cluster::initialize(). - - // If we're forming an initial cluster (no active members) - // then we wait to reach the configured cluster-size - if (!initMap.isActive() && initMap.getActualSize() < initMap.getRequiredSize()) { - QPID_LOG(info, *this << initMap.getActualSize() - << " members, waiting for at least " << initMap.getRequiredSize()); - return; - } - - initMap.checkConsistent(); - elders = initMap.getElders(); - QPID_LOG(debug, *this << " elders: " << elders); - if (elders.empty()) - becomeElder(l); - else { - broker.getLinks().setPassive(true); - broker.getQueueEvents().disable(); - QPID_LOG(info, *this << " not active for links."); - } - setClusterId(initMap.getClusterId(), l); - - if (initMap.isUpdateNeeded()) { // Joining established cluster. - broker.setRecovery(false); // Ditch my current store. - broker.setClusterUpdatee(true); - if (mAgent) mAgent->suppress(true); // Suppress mgmt output during update. - state = JOINER; - mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); - QPID_LOG(notice, *this << " joining cluster " << name); - } - else { // I can go ready. - discarding = false; - setReady(l); - memberUpdate(l); - updateMgmtMembership(l); - mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); - QPID_LOG(notice, *this << " joined cluster " << name); - } - } -} - -void Cluster::configChange(const MemberId&, - const std::string& membersStr, - const std::string& leftStr, - const std::string& joinedStr, - Lock& l) -{ - if (state == LEFT) return; - MemberSet members = decodeMemberSet(membersStr); - MemberSet left = decodeMemberSet(leftStr); - MemberSet joined = decodeMemberSet(joinedStr); - QPID_LOG(notice, *this << " configuration change: " << members); - QPID_LOG_IF(notice, !left.empty(), *this << " Members left: " << left); - QPID_LOG_IF(notice, !joined.empty(), *this << " Members joined: " << joined); - - // If we are still joining, make sure there is someone to give us an update. - elders = intersection(elders, members); - if (elders.empty() && INIT < state && state < CATCHUP) { - QPID_LOG(critical, "Cannot update, all potential updaters left the cluster."); - leave(l); - return; - } - bool memberChange = map.configChange(members); - - // Update initital status for members joining or leaving. - initMap.configChange(members); - if (initMap.isResendNeeded()) { - mcast.mcastControl( - ClusterInitialStatusBody( - ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, - store.getState(), store.getShutdownId(), - initMap.getFirstConfigStr() - ), - self); - } - if (initMap.transitionToComplete()) initMapCompleted(l); - - if (state >= CATCHUP && memberChange) { - memberUpdate(l); - if (elders.empty()) becomeElder(l); - } - - updateMgmtMembership(l); // Update on every config change for consistency -} - -void Cluster::becomeElder(Lock&) { - if (elder) return; // We were already the elder. - // We are the oldest, reactive links if necessary - QPID_LOG(info, *this << " became the elder, active for links."); - elder = true; - broker.getLinks().setPassive(false); - timer->becomeElder(); -} - -void Cluster::makeOffer(const MemberId& id, Lock& ) { - if (state == READY && map.isJoiner(id)) { - state = OFFER; - QPID_LOG(info, *this << " send update-offer to " << id); - mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id), self); - } -} - -namespace { -struct AppendQueue { - ostream* os; - AppendQueue(ostream& o) : os(&o) {} - void operator()(const boost::shared_ptr<broker::Queue>& q) { - (*os) << " " << q->getName() << "=" << q->getMessageCount(); - } -}; -} // namespace - -// Log a snapshot of broker state, used for debugging inconsistency problems. -// May only be called in deliver thread. -std::string Cluster::debugSnapshot() { - assertClusterSafe(); - std::ostringstream msg; - msg << "Member joined, frameSeq=" << map.getFrameSeq() << ", queue snapshot:"; - AppendQueue append(msg); - broker.getQueues().eachQueue(append); - return msg.str(); -} - -// Called from Broker::~Broker when broker is shut down. At this -// point we know the poller has stopped so no poller callbacks will be -// invoked. We must ensure that CPG has also shut down so no CPG -// callbacks will be invoked. -// -void Cluster::brokerShutdown() { - sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts. - try { cpg.shutdown(); } - catch (const std::exception& e) { - QPID_LOG(error, *this << " shutting down CPG: " << e.what()); - } - delete this; -} - -void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) { - map.updateRequest(id, url); - makeOffer(id, l); -} - -void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active, - const framing::Uuid& id, - framing::cluster::StoreState store, - const framing::Uuid& shutdownId, - const std::string& firstConfig, - Lock& l) -{ - if (version != CLUSTER_VERSION) { - QPID_LOG(critical, *this << " incompatible cluster versions " << - version << " != " << CLUSTER_VERSION); - leave(l); - return; - } - QPID_LOG_IF(debug, state == PRE_INIT, *this - << " received initial status from " << member); - initMap.received( - member, - ClusterInitialStatusBody(ProtocolVersion(), version, active, id, - store, shutdownId, firstConfig) - ); - if (initMap.transitionToComplete()) initMapCompleted(l); -} - -void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { - try { - if (map.ready(id, Url(url))) - memberUpdate(l); - if (state == CATCHUP && id == self) { - setReady(l); - QPID_LOG(notice, *this << " caught up."); - } - } catch (const Url::Invalid& e) { - QPID_LOG(error, "Invalid URL in cluster ready command: " << url); - } - // Update management on every ready event to be consistent across cluster. - setMgmtStatus(l); - updateMgmtMembership(l); -} - -void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) { - // NOTE: deliverEventQueue has been stopped at the update offer by - // deliveredEvent in case an update is required. - if (state == LEFT) return; - MemberId updatee(updateeInt); - boost::optional<Url> url = map.updateOffer(updater, updatee); - if (updater == self) { - assert(state == OFFER); - if (url) // My offer was first. - updateStart(updatee, *url, l); - else { // Another offer was first. - QPID_LOG(info, *this << " cancelled offer to " << updatee << " unstall"); - setReady(l); - makeOffer(map.firstJoiner(), l); // Maybe make another offer. - deliverEventQueue.start(); // Go back to normal processing - } - } - else if (updatee == self && url) { - assert(state == JOINER); - state = UPDATEE; - QPID_LOG(notice, *this << " receiving update from " << updater); - checkUpdateIn(l); - } - else { - QPID_LOG(info, *this << " unstall, ignore update " << updater - << " to " << updatee); - deliverEventQueue.start(); // Not involved in update. - } - if (updatee != self && url) { - QPID_LOG(debug, debugSnapshot()); - if (mAgent) mAgent->clusterUpdate(); - // Updatee will call clusterUpdate when update completes - } -} - -static client::ConnectionSettings connectionSettings(const ClusterSettings& settings) { - client::ConnectionSettings cs; - cs.username = settings.username; - cs.password = settings.password; - cs.mechanism = settings.mechanism; - return cs; -} - -void Cluster::retractOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) { - // An offer was received while handling an error, and converted to a retract. - // Behavior is very similar to updateOffer. - if (state == LEFT) return; - MemberId updatee(updateeInt); - boost::optional<Url> url = map.updateOffer(updater, updatee); - if (updater == self) { - assert(state == OFFER); - if (url) { // My offer was first. - if (updateThread) - updateThread.join(); // Join the previous updateThread to avoid leaks. - updateThread = Thread(new RetractClient(*url, connectionSettings(settings))); - } - setReady(l); - makeOffer(map.firstJoiner(), l); // Maybe make another offer. - // Don't unstall the event queue, that was already done in deliveredFrame - } - QPID_LOG(debug,*this << " retracted offer " << updater << " to " << updatee); -} - -void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { - // NOTE: deliverEventQueue is already stopped at the stall point by deliveredEvent. - if (state == LEFT) return; - assert(state == OFFER); - state = UPDATER; - QPID_LOG(notice, *this << " sending update to " << updatee << " at " << url); - if (updateThread) - updateThread.join(); // Join the previous updateThread to avoid leaks. - updateThread = Thread( - new UpdateClient(self, updatee, url, broker, map, *expiryPolicy, - getConnections(l), decoder, - boost::bind(&Cluster::updateOutDone, this), - boost::bind(&Cluster::updateOutError, this, _1), - connectionSettings(settings))); -} - -// Called in network thread -void Cluster::updateInClosed() { - Lock l(lock); - assert(!updateClosed); - updateClosed = true; - checkUpdateIn(l); -} - -// Called in update thread. -void Cluster::updateInDone(const ClusterMap& m) { - Lock l(lock); - updatedMap = m; - checkUpdateIn(l); -} - -void Cluster::updateInRetracted() { - Lock l(lock); - updateRetracted = true; - map.clearStatus(); - checkUpdateIn(l); -} - -bool Cluster::isExpectingUpdate() { - Lock l(lock); - return state <= UPDATEE; -} - -// Called in update thread or deliver thread. -void Cluster::checkUpdateIn(Lock& l) { - if (state != UPDATEE) return; // Wait till we reach the stall point. - if (!updateClosed) return; // Wait till update connection closes. - if (updatedMap) { // We're up to date - map = *updatedMap; - failoverExchange->setUrls(getUrls(l)); - mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); - state = CATCHUP; - memberUpdate(l); - // NB: don't updateMgmtMembership() here as we are not in the deliver - // thread. It will be updated on delivery of the "ready" we just mcast. - broker.setClusterUpdatee(false); - discarding = false; // OK to set, we're stalled for update. - QPID_LOG(notice, *this << " update complete, starting catch-up."); - QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled. - if (mAgent) { - // Update management agent now, after all update activity is complete. - updateDataExchange->updateManagementAgent(mAgent); - mAgent->suppress(false); // Enable management output. - mAgent->clusterUpdate(); - } - // Restore alternate exchange settings on exchanges. - broker.getExchanges().eachExchange( - boost::bind(&broker::Exchange::recoveryComplete, _1, - boost::ref(broker.getExchanges()))); - enableClusterSafe(); // Enable cluster-safe assertions - deliverEventQueue.start(); - } - else if (updateRetracted) { // Update was retracted, request another update - updateRetracted = false; - updateClosed = false; - state = JOINER; - QPID_LOG(notice, *this << " update retracted, sending new update request."); - mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); - deliverEventQueue.start(); - } -} - -void Cluster::updateOutDone() { - Monitor::ScopedLock l(lock); - updateOutDone(l); -} - -void Cluster::updateOutDone(Lock& l) { - QPID_LOG(notice, *this << " update sent"); - assert(state == UPDATER); - state = READY; - deliverEventQueue.start(); // Start processing events again. - makeOffer(map.firstJoiner(), l); // Try another offer -} - -void Cluster::updateOutError(const std::exception& e) { - Monitor::ScopedLock l(lock); - QPID_LOG(error, *this << " error sending update: " << e.what()); - updateOutDone(l); -} - -void Cluster ::shutdown(const MemberId& , const Uuid& id, Lock& l) { - QPID_LOG(notice, *this << " cluster shut down by administrator."); - if (store.hasStore()) store.clean(id); - leave(l); -} - -ManagementObject* Cluster::GetManagementObject() const { return mgmtObject; } - -Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, string&) { - Lock l(lock); - QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]"); - switch (methodId) { - case _qmf::Cluster::METHOD_STOPCLUSTERNODE : - { - _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args; - stringstream stream; - stream << self; - if (iargs.i_brokerId == stream.str()) - stopClusterNode(l); - } - break; - case _qmf::Cluster::METHOD_STOPFULLCLUSTER : - stopFullCluster(l); - break; - default: - return Manageable::STATUS_UNKNOWN_METHOD; - } - return Manageable::STATUS_OK; -} - -void Cluster::stopClusterNode(Lock& l) { - QPID_LOG(notice, *this << " cluster member stopped by administrator."); - leave(l); -} - -void Cluster::stopFullCluster(Lock& ) { - QPID_LOG(notice, *this << " shutting down cluster " << name); - mcast.mcastControl(ClusterShutdownBody(ProtocolVersion(), Uuid(true)), self); -} - -void Cluster::memberUpdate(Lock& l) { - // Ignore config changes while we are joining. - if (state < CATCHUP) return; - QPID_LOG(info, *this << " member update: " << map); - size_t aliveCount = map.aliveCount(); - assert(map.isAlive(self)); - failoverExchange->updateUrls(getUrls(l)); - - // Mark store clean if I am the only broker, dirty otherwise. - if (store.hasStore()) { - if (aliveCount == 1) { - if (store.getState() != STORE_STATE_CLEAN_STORE) { - QPID_LOG(notice, *this << "Sole member of cluster, marking store clean."); - store.clean(Uuid(true)); - } - } - else { - if (store.getState() != STORE_STATE_DIRTY_STORE) { - QPID_LOG(notice, "Running in a cluster, marking store dirty."); - store.dirty(); - } - } - } - - // If I am the last member standing, set queue policies. - if (aliveCount == 1 && lastAliveCount > 1 && state >= CATCHUP) { - QPID_LOG(notice, *this << " last broker standing, update queue policies"); - lastBroker = true; - broker.getQueues().updateQueueClusterState(true); - } - else if (aliveCount > 1 && lastBroker) { - QPID_LOG(notice, *this << " last broker standing joined by " << aliveCount-1 - << " replicas, updating queue policies."); - lastBroker = false; - broker.getQueues().updateQueueClusterState(false); - } - lastAliveCount = aliveCount; - - // Close connections belonging to members that have left the cluster. - ConnectionMap::iterator i = connections.begin(); - while (i != connections.end()) { - ConnectionMap::iterator j = i++; - MemberId m = j->second->getId().getMember(); - if (m != self && !map.isMember(m)) { - j->second->close(); - erase(j->second->getId(), l); - } - } -} - -// See comment on Cluster::setMgmtStatus -void Cluster::updateMgmtMembership(Lock& l) { - if (!mgmtObject) return; - std::vector<Url> urls = getUrls(l); - mgmtObject->set_clusterSize(urls.size()); - string urlstr; - for(std::vector<Url>::iterator i = urls.begin(); i != urls.end(); i++ ) { - if (i != urls.begin()) urlstr += ";"; - urlstr += i->str(); - } - std::vector<string> ids = getIds(l); - string idstr; - for(std::vector<string>::iterator i = ids.begin(); i != ids.end(); i++ ) { - if (i != ids.begin()) idstr += ";"; - idstr += *i; - } - mgmtObject->set_members(urlstr); - mgmtObject->set_memberIDs(idstr); -} - -std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { - static const char* STATE[] = { - "PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP", - "READY", "OFFER", "UPDATER", "LEFT" - }; - assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1); - o << "cluster(" << cluster.self << " " << STATE[cluster.state]; - if (cluster.error.isUnresolved()) o << "/error"; - return o << ")"; -} - -MemberId Cluster::getId() const { - return self; // Immutable, no need to lock. -} - -broker::Broker& Cluster::getBroker() const { - return broker; // Immutable, no need to lock. -} - -void Cluster::setClusterId(const Uuid& uuid, Lock&) { - clusterId = uuid; - if (store.hasStore()) store.setClusterId(uuid); - if (mgmtObject) { - stringstream stream; - stream << self; - mgmtObject->set_clusterID(clusterId.str()); - mgmtObject->set_memberID(stream.str()); - } - QPID_LOG(notice, *this << " cluster-uuid = " << clusterId); -} - -void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { - expiryPolicy->deliverExpire(id); -} - -void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) { - // If we see an errorCheck here (rather than in the ErrorCheck - // class) then we have processed succesfully past the point of the - // error. - if (state >= CATCHUP) // Don't respond pre catchup, we don't know what happened - error.respondNone(from, type, frameSeq); -} - -void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) { - if (state >= CATCHUP) // Pre catchup our timer isn't set up. - timer->deliverWakeup(name); -} - -void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) { - QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name) - if (state >= CATCHUP) // Pre catchup our timer isn't set up. - timer->deliverDrop(name); -} - -bool Cluster::isElder() const { - return elder; -} - -void Cluster::deliverToQueue(const std::string& queue, const std::string& message, Lock& l) -{ - broker::Queue::shared_ptr q = broker.getQueues().find(queue); - if (!q) { - QPID_LOG(critical, *this << " cluster delivery to non-existent queue: " << queue); - leave(l); - } - framing::Buffer buf(const_cast<char*>(message.data()), message.size()); - boost::intrusive_ptr<broker::Message> msg(new broker::Message); - msg->decodeHeader(buf); - msg->decodeContent(buf); - q->deliver(msg); -} - -bool Cluster::deferDeliveryImpl(const std::string& queue, - const boost::intrusive_ptr<broker::Message>& msg) -{ - if (isClusterSafe()) return false; - std::string message; - message.resize(msg->encodedSize()); - framing::Buffer buf(const_cast<char*>(message.data()), message.size()); - msg->encode(buf); - mcast.mcastControl(ClusterDeliverToQueueBody(ProtocolVersion(), queue, message), self); - return true; -} - -}} // namespace qpid::cluster |