summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
commit66765100f4257159622cefe57bed50125a5ad017 (patch)
treea88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /cpp/src/qpid/cluster/Cluster.cpp
parent1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff)
parent88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff)
downloadqpid-python-rajith_jms_client.tar.gz
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp1176
1 files changed, 0 insertions, 1176 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
deleted file mode 100644
index 0daf0c7f5a..0000000000
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ /dev/null
@@ -1,1176 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-/**
- * <h1>CLUSTER IMPLEMENTATION OVERVIEW</h1>
- *
- * The cluster works on the principle that if all members of the
- * cluster receive identical input, they will all produce identical
- * results. cluster::Connections intercept data received from clients
- * and multicast it via CPG. The data is processed (passed to the
- * broker::Connection) only when it is received from CPG in cluster
- * order. Each cluster member has Connection objects for directly
- * connected clients and "shadow" Connection objects for connections
- * to other members.
- *
- * This assumes that all broker actions occur deterministically in
- * response to data arriving on client connections. There are two
- * situations where this assumption fails:
- * - sending data in response to polling local connections for writabiliy.
- * - taking actions based on a timer or timestamp comparison.
- *
- * IMPORTANT NOTE: any time code is added to the broker that uses timers,
- * the cluster may need to be updated to take account of this.
- *
- *
- * USE OF TIMESTAMPS IN THE BROKER
- *
- * The following are the current areas where broker uses timers or timestamps:
- *
- * - Producer flow control: broker::SemanticState uses
- * connection::getClusterOrderOutput. a FrameHandler that sends
- * frames to the client via the cluster. Used by broker::SessionState
- *
- * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is
- * implemented by cluster::ExpiryPolicy.
- *
- * - Connection heartbeat: sends connection controls, not part of
- * session command counting so OK to ignore.
- *
- * - LinkRegistry: only cluster elder is ever active for links.
- *
- * - management::ManagementBroker: uses MessageHandler supplied by cluster
- * to send messages to the broker via the cluster.
- *
- * - Dtx: not yet supported with cluster.
- *
- * cluster::ExpiryPolicy implements the strategy for message expiry.
- *
- * ClusterTimer implements periodic timed events in the cluster context.
- * Used for periodic management events.
- *
- * <h1>CLUSTER PROTOCOL OVERVIEW</h1>
- *
- * Messages sent to/from CPG are called Events.
- *
- * An Event carries a ConnectionId, which includes a MemberId and a
- * connection number.
- *
- * Events are either
- * - Connection events: non-0 connection number and are associated with a connection.
- * - Cluster Events: 0 connection number, are not associated with a connection.
- *
- * Events are further categorized as:
- * - Control: carries method frame(s) that affect cluster behavior.
- * - Data: carries raw data received from a client connection.
- *
- * The cluster defines extensions to the AMQP command set in ../../../xml/cluster.xml
- * which defines two classes:
- * - cluster: cluster control information.
- * - cluster.connection: control information for a specific connection.
- *
- * The following combinations are legal:
- * - Data frames carrying connection data.
- * - Cluster control events carrying cluster commands.
- * - Connection control events carrying cluster.connection commands.
- * - Connection control events carrying non-cluster frames: frames sent to the client.
- * e.g. flow-control frames generated on a timer.
- *
- * <h1>CLUSTER INITIALIZATION OVERVIEW</h1>
- *
- * @see InitialStatusMap
- *
- * When a new member joins the CPG group, all members (including the
- * new one) multicast their "initial status." The new member is in
- * PRE_INIT mode until it gets a complete set of initial status
- * messages from all cluster members. In a newly-forming cluster is
- * then in INIT mode until the configured cluster-size members have
- * joined.
- *
- * The newcomer uses initial status to determine
- * - The cluster UUID
- * - Am I speaking the correct version of the cluster protocol?
- * - Do I need to get an update from an existing active member?
- * - Can I recover from my own store?
- *
- * Pre-initialization happens in the Cluster constructor (plugin
- * early-init phase) because it needs to set the recovery flag before
- * the store initializes. This phase lasts until inital-status is
- * received for all active members. The PollableQueues and Multicaster
- * are in "bypass" mode during this phase since the poller has not
- * started so there are no threads to serve pollable queues.
- *
- * The remaining initialization happens in Cluster::initialize() or,
- * if cluster-size=N is specified, in the deliver thread when an
- * initial-status control is delivered that brings the total to N.
- */
-#include "qpid/Exception.h"
-#include "qpid/cluster/Cluster.h"
-#include "qpid/sys/ClusterSafe.h"
-#include "qpid/cluster/ClusterSettings.h"
-#include "qpid/cluster/Connection.h"
-#include "qpid/cluster/UpdateClient.h"
-#include "qpid/cluster/RetractClient.h"
-#include "qpid/cluster/FailoverExchange.h"
-#include "qpid/cluster/UpdateDataExchange.h"
-#include "qpid/cluster/UpdateExchange.h"
-#include "qpid/cluster/ClusterTimer.h"
-
-#include "qpid/assert.h"
-#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
-#include "qmf/org/apache/qpid/cluster/Package.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
-#include "qpid/broker/NullMessageStore.h"
-#include "qpid/broker/QueueRegistry.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/Message.h"
-#include "qpid/broker/SessionState.h"
-#include "qpid/broker/SignalHandler.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AMQP_AllOperations.h"
-#include "qpid/framing/AllInvoker.h"
-#include "qpid/framing/ClusterConfigChangeBody.h"
-#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
-#include "qpid/framing/ClusterConnectionAbortBody.h"
-#include "qpid/framing/ClusterRetractOfferBody.h"
-#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
-#include "qpid/framing/ClusterReadyBody.h"
-#include "qpid/framing/ClusterShutdownBody.h"
-#include "qpid/framing/ClusterUpdateOfferBody.h"
-#include "qpid/framing/ClusterUpdateRequestBody.h"
-#include "qpid/framing/ClusterConnectionAnnounceBody.h"
-#include "qpid/framing/ClusterErrorCheckBody.h"
-#include "qpid/framing/ClusterTimerWakeupBody.h"
-#include "qpid/framing/ClusterDeliverToQueueBody.h"
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/log/Helpers.h"
-#include "qpid/log/Statement.h"
-#include "qpid/management/ManagementAgent.h"
-#include "qpid/memory.h"
-#include "qpid/sys/Thread.h"
-
-#include <boost/shared_ptr.hpp>
-#include <boost/bind.hpp>
-#include <boost/cast.hpp>
-#include <boost/current_function.hpp>
-#include <algorithm>
-#include <iterator>
-#include <map>
-#include <ostream>
-
-
-namespace qpid {
-namespace cluster {
-using namespace qpid;
-using namespace qpid::framing;
-using namespace qpid::sys;
-using namespace qpid::cluster;
-using namespace framing::cluster;
-using namespace std;
-using management::ManagementAgent;
-using management::ManagementObject;
-using management::Manageable;
-using management::Args;
-namespace _qmf = ::qmf::org::apache::qpid::cluster;
-
-/**
- * NOTE: must increment this number whenever any incompatible changes in
- * cluster protocol/behavior are made. It allows early detection and
- * sensible reporting of an attempt to mix different versions in a
- * cluster.
- *
- * Currently use SVN revision to avoid clashes with versions from
- * different branches.
- */
-const uint32_t Cluster::CLUSTER_VERSION = 1097431;
-
-struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
- qpid::cluster::Cluster& cluster;
- MemberId member;
- Cluster::Lock& l;
- ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {}
-
- void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
-
- void initialStatus(uint32_t version, bool active, const Uuid& clusterId,
- uint8_t storeState, const Uuid& shutdownId,
- const std::string& firstConfig)
- {
- cluster.initialStatus(
- member, version, active, clusterId,
- framing::cluster::StoreState(storeState), shutdownId,
- firstConfig, l);
- }
- void ready(const std::string& url) {
- cluster.ready(member, url, l);
- }
- void configChange(const std::string& members,
- const std::string& left,
- const std::string& joined)
- {
- cluster.configChange(member, members, left, joined, l);
- }
- void updateOffer(uint64_t updatee) {
- cluster.updateOffer(member, updatee, l);
- }
- void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
- void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
- void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) {
- cluster.errorCheck(member, type, frameSeq, l);
- }
- void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); }
- void timerDrop(const std::string& name) { cluster.timerDrop(member, name, l); }
- void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); }
- void deliverToQueue(const std::string& queue, const std::string& message) {
- cluster.deliverToQueue(queue, message, l);
- }
- bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
-};
-
-Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
- settings(set),
- broker(b),
- mgmtObject(0),
- poller(b.getPoller()),
- cpg(*this),
- name(settings.name),
- self(cpg.self()),
- clusterId(true),
- mAgent(0),
- expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())),
- mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
- dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
- deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1),
- boost::bind(&Cluster::leave, this),
- "Error decoding events, may indicate a broker version mismatch",
- poller),
- deliverFrameQueue(boost::bind(&Cluster::deliveredFrame, this, _1),
- boost::bind(&Cluster::leave, this),
- "Error delivering frames",
- poller),
- failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)),
- updateDataExchange(new UpdateDataExchange(*this)),
- quorum(boost::bind(&Cluster::leave, this)),
- decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
- discarding(true),
- state(PRE_INIT),
- initMap(self, settings.size),
- store(broker.getDataDir().getPath()),
- elder(false),
- lastAliveCount(0),
- lastBroker(false),
- updateRetracted(false),
- updateClosed(false),
- error(*this)
-{
- broker.setInCluster(true);
-
- // We give ownership of the timer to the broker and keep a plain pointer.
- // This is OK as it means the timer has the same lifetime as the broker.
- timer = new ClusterTimer(*this);
- broker.setClusterTimer(std::auto_ptr<sys::Timer>(timer));
-
- // Failover exchange provides membership updates to clients.
- broker.getExchanges().registerExchange(failoverExchange);
-
- // Update exchange is used during updates to replicate messages
- // without modifying delivery-properties.exchange.
- broker.getExchanges().registerExchange(
- boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
-
- // Update-data exchange is used for passing data that may be too large
- // for single control frame.
- broker.getExchanges().registerExchange(updateDataExchange);
-
- // Load my store status before we go into initialization
- if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
- store.load();
- clusterId = store.getClusterId();
- QPID_LOG(notice, "Cluster store state: " << store)
- }
- cpg.join(name);
- // pump the CPG dispatch manually till we get past PRE_INIT.
- while (state == PRE_INIT)
- cpg.dispatchOne();
-}
-
-Cluster::~Cluster() {
- broker.setClusterTimer(std::auto_ptr<sys::Timer>(0)); // Delete cluster timer
- if (updateThread) updateThread.join(); // Join the previous updatethread.
-}
-
-void Cluster::initialize() {
- if (settings.quorum) quorum.start(poller);
- if (settings.url.empty())
- myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
- else
- myUrl = settings.url;
- broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
- broker.deferDelivery = boost::bind(&Cluster::deferDeliveryImpl, this, _1, _2);
- broker.setExpiryPolicy(expiryPolicy);
- deliverEventQueue.bypassOff();
- deliverEventQueue.start();
- deliverFrameQueue.bypassOff();
- deliverFrameQueue.start();
- mcast.start();
-
- /// Create management object
- mAgent = broker.getManagementAgent();
- if (mAgent != 0){
- _qmf::Package packageInit(mAgent);
- mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str());
- mAgent->addObject (mgmtObject);
- }
-
- // Run initMapCompleted immediately to process the initial configuration
- // that allowed us to transition out of PRE_INIT
- assert(state == INIT);
- initMapCompleted(*(Mutex::ScopedLock*)0); // Fake lock, single-threaded context.
-
- // Add finalizer last for exception safety.
- broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
-
- // Start dispatching CPG events.
- dispatcher.start();
-}
-
-// Called in connection thread to insert a client connection.
-void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
- assert(c->getId().getMember() == self);
- localConnections.insert(c);
-}
-
-// Called in connection thread to insert an updated shadow connection.
-void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
- QPID_LOG(debug, *this << " new shadow connection " << c->getId());
- // Safe to use connections here because we're pre-catchup, stalled
- // and discarding, so deliveredFrame is not processing any
- // connection events.
- assert(discarding);
- pair<ConnectionMap::iterator, bool> ib
- = connections.insert(ConnectionMap::value_type(c->getId(), c));
- assert(ib.second);
-}
-
-void Cluster::erase(const ConnectionId& id) {
- Lock l(lock);
- erase(id,l);
-}
-
-// Called by Connection::deliverClose() in deliverFrameQueue thread.
-void Cluster::erase(const ConnectionId& id, Lock&) {
- connections.erase(id);
- decoder.erase(id);
-}
-
-std::vector<string> Cluster::getIds() const {
- Lock l(lock);
- return getIds(l);
-}
-
-std::vector<string> Cluster::getIds(Lock&) const {
- return map.memberIds();
-}
-
-std::vector<Url> Cluster::getUrls() const {
- Lock l(lock);
- return getUrls(l);
-}
-
-std::vector<Url> Cluster::getUrls(Lock&) const {
- return map.memberUrls();
-}
-
-void Cluster::leave() {
- Lock l(lock);
- leave(l);
-}
-
-#define LEAVE_TRY(STMT) try { STMT; } \
- catch (const std::exception& e) { \
- QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \
- } do {} while(0)
-
-void Cluster::leave(Lock&) {
- if (state != LEFT) {
- state = LEFT;
- QPID_LOG(notice, *this << " leaving cluster " << name);
- // Finalize connections now now to avoid problems later in destructor.
- ClusterSafeScope css; // Don't trigger cluster-safe assertions.
- LEAVE_TRY(localConnections.clear());
- LEAVE_TRY(connections.clear());
- LEAVE_TRY(broker::SignalHandler::shutdown());
- }
-}
-
-// Deliver CPG message.
-void Cluster::deliver(
- cpg_handle_t /*handle*/,
- const cpg_name* /*group*/,
- uint32_t nodeid,
- uint32_t pid,
- void* msg,
- int msg_len)
-{
- MemberId from(nodeid, pid);
- framing::Buffer buf(static_cast<char*>(msg), msg_len);
- Event e(Event::decodeCopy(from, buf));
- deliverEvent(e);
-}
-
-void Cluster::deliverEvent(const Event& e) { deliverEventQueue.push(e); }
-
-void Cluster::deliverFrame(const EventFrame& e) { deliverFrameQueue.push(e); }
-
-const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) {
- return (body && body->getMethod() &&
- body->getMethod()->isA<ClusterUpdateOfferBody>()) ?
- static_cast<const ClusterUpdateOfferBody*>(body) : 0;
-}
-
-const ClusterConnectionAnnounceBody* castAnnounce( const framing::AMQBody *body) {
- return (body && body->getMethod() &&
- body->getMethod()->isA<ClusterConnectionAnnounceBody>()) ?
- static_cast<const ClusterConnectionAnnounceBody*>(body) : 0;
-}
-
-// Handler for deliverEventQueue.
-// This thread decodes frames from events.
-void Cluster::deliveredEvent(const Event& e) {
- if (e.isCluster()) {
- EventFrame ef(e, e.getFrame());
- // Stop the deliverEventQueue on update offers.
- // This preserves the connection decoder fragments for an update.
- // Only do this for the two brokers that are directly involved in this
- // offer: the one making the offer, or the one receiving it.
- const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody());
- if (offer && ( e.getMemberId() == self || MemberId(offer->getUpdatee()) == self) ) {
- QPID_LOG(info, *this << " stall for update offer from " << e.getMemberId()
- << " to " << MemberId(offer->getUpdatee()));
- deliverEventQueue.stop();
- }
- deliverFrame(ef);
- }
- else if(!discarding) {
- if (e.isControl())
- deliverFrame(EventFrame(e, e.getFrame()));
- else {
- try { decoder.decode(e, e.getData()); }
- catch (const Exception& ex) {
- // Close a connection that is sending us invalid data.
- QPID_LOG(error, *this << " aborting connection "
- << e.getConnectionId() << ": " << ex.what());
- framing::AMQFrame abort((ClusterConnectionAbortBody()));
- deliverFrame(EventFrame(EventHeader(CONTROL, e.getConnectionId()), abort));
- }
- }
- }
-}
-
-void Cluster::flagError(
- Connection& connection, ErrorCheck::ErrorType type, const std::string& msg)
-{
- Mutex::ScopedLock l(lock);
- if (connection.isCatchUp()) {
- QPID_LOG(critical, *this << " error on update connection " << connection
- << ": " << msg);
- leave(l);
- }
- error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg);
-}
-
-// Handler for deliverFrameQueue.
-// This thread executes the main logic.
-void Cluster::deliveredFrame(const EventFrame& efConst) {
- Mutex::ScopedLock l(lock);
- sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
- if (state == LEFT) return;
- EventFrame e(efConst);
- const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody());
- if (offer && error.isUnresolved()) {
- // We can't honour an update offer that is delivered while an
- // error is in progress so replace it with a retractOffer and re-start
- // the event queue.
- e.frame = AMQFrame(
- ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee()));
- deliverEventQueue.start();
- }
- // Process each frame through the error checker.
- if (error.isUnresolved()) {
- error.delivered(e);
- while (error.canProcess()) // There is a frame ready to process.
- processFrame(error.getNext(), l);
- }
- else
- processFrame(e, l);
-}
-
-
-void Cluster::processFrame(const EventFrame& e, Lock& l) {
- if (e.isCluster()) {
- QPID_LOG(trace, *this << " DLVR: " << e);
- ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
- if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
- throw Exception(QPID_MSG("Invalid cluster control"));
- }
- else if (state >= CATCHUP) {
- map.incrementFrameSeq();
- ConnectionPtr connection = getConnection(e, l);
- if (connection) {
- QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e);
- connection->deliveredFrame(e);
- }
- else
- throw Exception(QPID_MSG("Unknown connection: " << e));
- }
- else // Drop connection frames while state < CATCHUP
- QPID_LOG(trace, *this << " DROP (joining): " << e);
-}
-
-// Called in deliverFrameQueue thread
-ConnectionPtr Cluster::getConnection(const EventFrame& e, Lock&) {
- ConnectionId id = e.connectionId;
- ConnectionMap::iterator i = connections.find(id);
- if (i != connections.end()) return i->second;
- ConnectionPtr cp;
- // If the frame is an announcement for a new connection, add it.
- const ClusterConnectionAnnounceBody *announce = castAnnounce(e.frame.getBody());
- if (e.frame.getBody() && e.frame.getMethod() && announce)
- {
- if (id.getMember() == self) { // Announces one of my own
- cp = localConnections.getErase(id);
- assert(cp);
- }
- else { // New remote connection, create a shadow.
- qpid::sys::SecuritySettings secSettings;
- if (announce) {
- secSettings.ssf = announce->getSsf();
- secSettings.authid = announce->getAuthid();
- secSettings.nodict = announce->getNodict();
- }
- cp = new Connection(*this, shadowOut, announce->getManagementId(), id, secSettings);
- }
- connections.insert(ConnectionMap::value_type(id, cp));
- }
- return cp;
-}
-
-Cluster::ConnectionVector Cluster::getConnections(Lock&) {
- ConnectionVector result(connections.size());
- std::transform(connections.begin(), connections.end(), result.begin(),
- boost::bind(&ConnectionMap::value_type::second, _1));
- return result;
-}
-
-// CPG config-change callback.
-void Cluster::configChange (
- cpg_handle_t /*handle*/,
- const cpg_name */*group*/,
- const cpg_address *members, int nMembers,
- const cpg_address *left, int nLeft,
- const cpg_address *joined, int nJoined)
-{
- Mutex::ScopedLock l(lock);
- string membersStr, leftStr, joinedStr;
- // Encode members and enqueue as an event so the config change can
- // be executed in the correct thread.
- for (const cpg_address* p = members; p < members+nMembers; ++p)
- membersStr.append(MemberId(*p).str());
- for (const cpg_address* p = left; p < left+nLeft; ++p)
- leftStr.append(MemberId(*p).str());
- for (const cpg_address* p = joined; p < joined+nJoined; ++p)
- joinedStr.append(MemberId(*p).str());
- deliverEvent(Event::control(ClusterConfigChangeBody(
- ProtocolVersion(), membersStr, leftStr, joinedStr),
- self));
-}
-
-void Cluster::setReady(Lock&) {
- state = READY;
- mcast.setReady();
- broker.getQueueEvents().enable();
- enableClusterSafe(); // Enable cluster-safe assertions.
-}
-
-// Set the management status from the Cluster::state.
-//
-// NOTE: Management updates are sent based on property changes. In
-// order to keep consistency across the cluster, we touch the local
-// management status property even if it is locally unchanged for any
-// event that could have cause a cluster property change on any cluster member.
-void Cluster::setMgmtStatus(Lock&) {
- if (mgmtObject)
- mgmtObject->set_status(state >= CATCHUP ? "ACTIVE" : "JOINING");
-}
-
-void Cluster::initMapCompleted(Lock& l) {
- // Called on completion of the initial status map.
- QPID_LOG(debug, *this << " initial status map complete. ");
- setMgmtStatus(l);
- if (state == PRE_INIT) {
- // PRE_INIT means we're still in the earlyInitialize phase, in the constructor.
- // We decide here whether we want to recover from our store.
- // We won't recover if we are joining an active cluster or our store is dirty.
- if (store.hasStore() &&
- store.getState() != STORE_STATE_EMPTY_STORE &&
- (initMap.isActive() || store.getState() == STORE_STATE_DIRTY_STORE))
- broker.setRecovery(false); // Ditch my current store.
- state = INIT;
- }
- else if (state == INIT) {
- // INIT means we are past Cluster::initialize().
-
- // If we're forming an initial cluster (no active members)
- // then we wait to reach the configured cluster-size
- if (!initMap.isActive() && initMap.getActualSize() < initMap.getRequiredSize()) {
- QPID_LOG(info, *this << initMap.getActualSize()
- << " members, waiting for at least " << initMap.getRequiredSize());
- return;
- }
-
- initMap.checkConsistent();
- elders = initMap.getElders();
- QPID_LOG(debug, *this << " elders: " << elders);
- if (elders.empty())
- becomeElder(l);
- else {
- broker.getLinks().setPassive(true);
- broker.getQueueEvents().disable();
- QPID_LOG(info, *this << " not active for links.");
- }
- setClusterId(initMap.getClusterId(), l);
-
- if (initMap.isUpdateNeeded()) { // Joining established cluster.
- broker.setRecovery(false); // Ditch my current store.
- broker.setClusterUpdatee(true);
- if (mAgent) mAgent->suppress(true); // Suppress mgmt output during update.
- state = JOINER;
- mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
- QPID_LOG(notice, *this << " joining cluster " << name);
- }
- else { // I can go ready.
- discarding = false;
- setReady(l);
- memberUpdate(l);
- updateMgmtMembership(l);
- mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
- QPID_LOG(notice, *this << " joined cluster " << name);
- }
- }
-}
-
-void Cluster::configChange(const MemberId&,
- const std::string& membersStr,
- const std::string& leftStr,
- const std::string& joinedStr,
- Lock& l)
-{
- if (state == LEFT) return;
- MemberSet members = decodeMemberSet(membersStr);
- MemberSet left = decodeMemberSet(leftStr);
- MemberSet joined = decodeMemberSet(joinedStr);
- QPID_LOG(notice, *this << " configuration change: " << members);
- QPID_LOG_IF(notice, !left.empty(), *this << " Members left: " << left);
- QPID_LOG_IF(notice, !joined.empty(), *this << " Members joined: " << joined);
-
- // If we are still joining, make sure there is someone to give us an update.
- elders = intersection(elders, members);
- if (elders.empty() && INIT < state && state < CATCHUP) {
- QPID_LOG(critical, "Cannot update, all potential updaters left the cluster.");
- leave(l);
- return;
- }
- bool memberChange = map.configChange(members);
-
- // Update initital status for members joining or leaving.
- initMap.configChange(members);
- if (initMap.isResendNeeded()) {
- mcast.mcastControl(
- ClusterInitialStatusBody(
- ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
- store.getState(), store.getShutdownId(),
- initMap.getFirstConfigStr()
- ),
- self);
- }
- if (initMap.transitionToComplete()) initMapCompleted(l);
-
- if (state >= CATCHUP && memberChange) {
- memberUpdate(l);
- if (elders.empty()) becomeElder(l);
- }
-
- updateMgmtMembership(l); // Update on every config change for consistency
-}
-
-void Cluster::becomeElder(Lock&) {
- if (elder) return; // We were already the elder.
- // We are the oldest, reactive links if necessary
- QPID_LOG(info, *this << " became the elder, active for links.");
- elder = true;
- broker.getLinks().setPassive(false);
- timer->becomeElder();
-}
-
-void Cluster::makeOffer(const MemberId& id, Lock& ) {
- if (state == READY && map.isJoiner(id)) {
- state = OFFER;
- QPID_LOG(info, *this << " send update-offer to " << id);
- mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id), self);
- }
-}
-
-namespace {
-struct AppendQueue {
- ostream* os;
- AppendQueue(ostream& o) : os(&o) {}
- void operator()(const boost::shared_ptr<broker::Queue>& q) {
- (*os) << " " << q->getName() << "=" << q->getMessageCount();
- }
-};
-} // namespace
-
-// Log a snapshot of broker state, used for debugging inconsistency problems.
-// May only be called in deliver thread.
-std::string Cluster::debugSnapshot() {
- assertClusterSafe();
- std::ostringstream msg;
- msg << "Member joined, frameSeq=" << map.getFrameSeq() << ", queue snapshot:";
- AppendQueue append(msg);
- broker.getQueues().eachQueue(append);
- return msg.str();
-}
-
-// Called from Broker::~Broker when broker is shut down. At this
-// point we know the poller has stopped so no poller callbacks will be
-// invoked. We must ensure that CPG has also shut down so no CPG
-// callbacks will be invoked.
-//
-void Cluster::brokerShutdown() {
- sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
- try { cpg.shutdown(); }
- catch (const std::exception& e) {
- QPID_LOG(error, *this << " shutting down CPG: " << e.what());
- }
- delete this;
-}
-
-void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) {
- map.updateRequest(id, url);
- makeOffer(id, l);
-}
-
-void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active,
- const framing::Uuid& id,
- framing::cluster::StoreState store,
- const framing::Uuid& shutdownId,
- const std::string& firstConfig,
- Lock& l)
-{
- if (version != CLUSTER_VERSION) {
- QPID_LOG(critical, *this << " incompatible cluster versions " <<
- version << " != " << CLUSTER_VERSION);
- leave(l);
- return;
- }
- QPID_LOG_IF(debug, state == PRE_INIT, *this
- << " received initial status from " << member);
- initMap.received(
- member,
- ClusterInitialStatusBody(ProtocolVersion(), version, active, id,
- store, shutdownId, firstConfig)
- );
- if (initMap.transitionToComplete()) initMapCompleted(l);
-}
-
-void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
- try {
- if (map.ready(id, Url(url)))
- memberUpdate(l);
- if (state == CATCHUP && id == self) {
- setReady(l);
- QPID_LOG(notice, *this << " caught up.");
- }
- } catch (const Url::Invalid& e) {
- QPID_LOG(error, "Invalid URL in cluster ready command: " << url);
- }
- // Update management on every ready event to be consistent across cluster.
- setMgmtStatus(l);
- updateMgmtMembership(l);
-}
-
-void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) {
- // NOTE: deliverEventQueue has been stopped at the update offer by
- // deliveredEvent in case an update is required.
- if (state == LEFT) return;
- MemberId updatee(updateeInt);
- boost::optional<Url> url = map.updateOffer(updater, updatee);
- if (updater == self) {
- assert(state == OFFER);
- if (url) // My offer was first.
- updateStart(updatee, *url, l);
- else { // Another offer was first.
- QPID_LOG(info, *this << " cancelled offer to " << updatee << " unstall");
- setReady(l);
- makeOffer(map.firstJoiner(), l); // Maybe make another offer.
- deliverEventQueue.start(); // Go back to normal processing
- }
- }
- else if (updatee == self && url) {
- assert(state == JOINER);
- state = UPDATEE;
- QPID_LOG(notice, *this << " receiving update from " << updater);
- checkUpdateIn(l);
- }
- else {
- QPID_LOG(info, *this << " unstall, ignore update " << updater
- << " to " << updatee);
- deliverEventQueue.start(); // Not involved in update.
- }
- if (updatee != self && url) {
- QPID_LOG(debug, debugSnapshot());
- if (mAgent) mAgent->clusterUpdate();
- // Updatee will call clusterUpdate when update completes
- }
-}
-
-static client::ConnectionSettings connectionSettings(const ClusterSettings& settings) {
- client::ConnectionSettings cs;
- cs.username = settings.username;
- cs.password = settings.password;
- cs.mechanism = settings.mechanism;
- return cs;
-}
-
-void Cluster::retractOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) {
- // An offer was received while handling an error, and converted to a retract.
- // Behavior is very similar to updateOffer.
- if (state == LEFT) return;
- MemberId updatee(updateeInt);
- boost::optional<Url> url = map.updateOffer(updater, updatee);
- if (updater == self) {
- assert(state == OFFER);
- if (url) { // My offer was first.
- if (updateThread)
- updateThread.join(); // Join the previous updateThread to avoid leaks.
- updateThread = Thread(new RetractClient(*url, connectionSettings(settings)));
- }
- setReady(l);
- makeOffer(map.firstJoiner(), l); // Maybe make another offer.
- // Don't unstall the event queue, that was already done in deliveredFrame
- }
- QPID_LOG(debug,*this << " retracted offer " << updater << " to " << updatee);
-}
-
-void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
- // NOTE: deliverEventQueue is already stopped at the stall point by deliveredEvent.
- if (state == LEFT) return;
- assert(state == OFFER);
- state = UPDATER;
- QPID_LOG(notice, *this << " sending update to " << updatee << " at " << url);
- if (updateThread)
- updateThread.join(); // Join the previous updateThread to avoid leaks.
- updateThread = Thread(
- new UpdateClient(self, updatee, url, broker, map, *expiryPolicy,
- getConnections(l), decoder,
- boost::bind(&Cluster::updateOutDone, this),
- boost::bind(&Cluster::updateOutError, this, _1),
- connectionSettings(settings)));
-}
-
-// Called in network thread
-void Cluster::updateInClosed() {
- Lock l(lock);
- assert(!updateClosed);
- updateClosed = true;
- checkUpdateIn(l);
-}
-
-// Called in update thread.
-void Cluster::updateInDone(const ClusterMap& m) {
- Lock l(lock);
- updatedMap = m;
- checkUpdateIn(l);
-}
-
-void Cluster::updateInRetracted() {
- Lock l(lock);
- updateRetracted = true;
- map.clearStatus();
- checkUpdateIn(l);
-}
-
-bool Cluster::isExpectingUpdate() {
- Lock l(lock);
- return state <= UPDATEE;
-}
-
-// Called in update thread or deliver thread.
-void Cluster::checkUpdateIn(Lock& l) {
- if (state != UPDATEE) return; // Wait till we reach the stall point.
- if (!updateClosed) return; // Wait till update connection closes.
- if (updatedMap) { // We're up to date
- map = *updatedMap;
- failoverExchange->setUrls(getUrls(l));
- mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
- state = CATCHUP;
- memberUpdate(l);
- // NB: don't updateMgmtMembership() here as we are not in the deliver
- // thread. It will be updated on delivery of the "ready" we just mcast.
- broker.setClusterUpdatee(false);
- discarding = false; // OK to set, we're stalled for update.
- QPID_LOG(notice, *this << " update complete, starting catch-up.");
- QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled.
- if (mAgent) {
- // Update management agent now, after all update activity is complete.
- updateDataExchange->updateManagementAgent(mAgent);
- mAgent->suppress(false); // Enable management output.
- mAgent->clusterUpdate();
- }
- // Restore alternate exchange settings on exchanges.
- broker.getExchanges().eachExchange(
- boost::bind(&broker::Exchange::recoveryComplete, _1,
- boost::ref(broker.getExchanges())));
- enableClusterSafe(); // Enable cluster-safe assertions
- deliverEventQueue.start();
- }
- else if (updateRetracted) { // Update was retracted, request another update
- updateRetracted = false;
- updateClosed = false;
- state = JOINER;
- QPID_LOG(notice, *this << " update retracted, sending new update request.");
- mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
- deliverEventQueue.start();
- }
-}
-
-void Cluster::updateOutDone() {
- Monitor::ScopedLock l(lock);
- updateOutDone(l);
-}
-
-void Cluster::updateOutDone(Lock& l) {
- QPID_LOG(notice, *this << " update sent");
- assert(state == UPDATER);
- state = READY;
- deliverEventQueue.start(); // Start processing events again.
- makeOffer(map.firstJoiner(), l); // Try another offer
-}
-
-void Cluster::updateOutError(const std::exception& e) {
- Monitor::ScopedLock l(lock);
- QPID_LOG(error, *this << " error sending update: " << e.what());
- updateOutDone(l);
-}
-
-void Cluster ::shutdown(const MemberId& , const Uuid& id, Lock& l) {
- QPID_LOG(notice, *this << " cluster shut down by administrator.");
- if (store.hasStore()) store.clean(id);
- leave(l);
-}
-
-ManagementObject* Cluster::GetManagementObject() const { return mgmtObject; }
-
-Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, string&) {
- Lock l(lock);
- QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]");
- switch (methodId) {
- case _qmf::Cluster::METHOD_STOPCLUSTERNODE :
- {
- _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args;
- stringstream stream;
- stream << self;
- if (iargs.i_brokerId == stream.str())
- stopClusterNode(l);
- }
- break;
- case _qmf::Cluster::METHOD_STOPFULLCLUSTER :
- stopFullCluster(l);
- break;
- default:
- return Manageable::STATUS_UNKNOWN_METHOD;
- }
- return Manageable::STATUS_OK;
-}
-
-void Cluster::stopClusterNode(Lock& l) {
- QPID_LOG(notice, *this << " cluster member stopped by administrator.");
- leave(l);
-}
-
-void Cluster::stopFullCluster(Lock& ) {
- QPID_LOG(notice, *this << " shutting down cluster " << name);
- mcast.mcastControl(ClusterShutdownBody(ProtocolVersion(), Uuid(true)), self);
-}
-
-void Cluster::memberUpdate(Lock& l) {
- // Ignore config changes while we are joining.
- if (state < CATCHUP) return;
- QPID_LOG(info, *this << " member update: " << map);
- size_t aliveCount = map.aliveCount();
- assert(map.isAlive(self));
- failoverExchange->updateUrls(getUrls(l));
-
- // Mark store clean if I am the only broker, dirty otherwise.
- if (store.hasStore()) {
- if (aliveCount == 1) {
- if (store.getState() != STORE_STATE_CLEAN_STORE) {
- QPID_LOG(notice, *this << "Sole member of cluster, marking store clean.");
- store.clean(Uuid(true));
- }
- }
- else {
- if (store.getState() != STORE_STATE_DIRTY_STORE) {
- QPID_LOG(notice, "Running in a cluster, marking store dirty.");
- store.dirty();
- }
- }
- }
-
- // If I am the last member standing, set queue policies.
- if (aliveCount == 1 && lastAliveCount > 1 && state >= CATCHUP) {
- QPID_LOG(notice, *this << " last broker standing, update queue policies");
- lastBroker = true;
- broker.getQueues().updateQueueClusterState(true);
- }
- else if (aliveCount > 1 && lastBroker) {
- QPID_LOG(notice, *this << " last broker standing joined by " << aliveCount-1
- << " replicas, updating queue policies.");
- lastBroker = false;
- broker.getQueues().updateQueueClusterState(false);
- }
- lastAliveCount = aliveCount;
-
- // Close connections belonging to members that have left the cluster.
- ConnectionMap::iterator i = connections.begin();
- while (i != connections.end()) {
- ConnectionMap::iterator j = i++;
- MemberId m = j->second->getId().getMember();
- if (m != self && !map.isMember(m)) {
- j->second->close();
- erase(j->second->getId(), l);
- }
- }
-}
-
-// See comment on Cluster::setMgmtStatus
-void Cluster::updateMgmtMembership(Lock& l) {
- if (!mgmtObject) return;
- std::vector<Url> urls = getUrls(l);
- mgmtObject->set_clusterSize(urls.size());
- string urlstr;
- for(std::vector<Url>::iterator i = urls.begin(); i != urls.end(); i++ ) {
- if (i != urls.begin()) urlstr += ";";
- urlstr += i->str();
- }
- std::vector<string> ids = getIds(l);
- string idstr;
- for(std::vector<string>::iterator i = ids.begin(); i != ids.end(); i++ ) {
- if (i != ids.begin()) idstr += ";";
- idstr += *i;
- }
- mgmtObject->set_members(urlstr);
- mgmtObject->set_memberIDs(idstr);
-}
-
-std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
- static const char* STATE[] = {
- "PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP",
- "READY", "OFFER", "UPDATER", "LEFT"
- };
- assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
- o << "cluster(" << cluster.self << " " << STATE[cluster.state];
- if (cluster.error.isUnresolved()) o << "/error";
- return o << ")";
-}
-
-MemberId Cluster::getId() const {
- return self; // Immutable, no need to lock.
-}
-
-broker::Broker& Cluster::getBroker() const {
- return broker; // Immutable, no need to lock.
-}
-
-void Cluster::setClusterId(const Uuid& uuid, Lock&) {
- clusterId = uuid;
- if (store.hasStore()) store.setClusterId(uuid);
- if (mgmtObject) {
- stringstream stream;
- stream << self;
- mgmtObject->set_clusterID(clusterId.str());
- mgmtObject->set_memberID(stream.str());
- }
- QPID_LOG(notice, *this << " cluster-uuid = " << clusterId);
-}
-
-void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
- expiryPolicy->deliverExpire(id);
-}
-
-void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) {
- // If we see an errorCheck here (rather than in the ErrorCheck
- // class) then we have processed succesfully past the point of the
- // error.
- if (state >= CATCHUP) // Don't respond pre catchup, we don't know what happened
- error.respondNone(from, type, frameSeq);
-}
-
-void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) {
- if (state >= CATCHUP) // Pre catchup our timer isn't set up.
- timer->deliverWakeup(name);
-}
-
-void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) {
- QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name)
- if (state >= CATCHUP) // Pre catchup our timer isn't set up.
- timer->deliverDrop(name);
-}
-
-bool Cluster::isElder() const {
- return elder;
-}
-
-void Cluster::deliverToQueue(const std::string& queue, const std::string& message, Lock& l)
-{
- broker::Queue::shared_ptr q = broker.getQueues().find(queue);
- if (!q) {
- QPID_LOG(critical, *this << " cluster delivery to non-existent queue: " << queue);
- leave(l);
- }
- framing::Buffer buf(const_cast<char*>(message.data()), message.size());
- boost::intrusive_ptr<broker::Message> msg(new broker::Message);
- msg->decodeHeader(buf);
- msg->decodeContent(buf);
- q->deliver(msg);
-}
-
-bool Cluster::deferDeliveryImpl(const std::string& queue,
- const boost::intrusive_ptr<broker::Message>& msg)
-{
- if (isClusterSafe()) return false;
- std::string message;
- message.resize(msg->encodedSize());
- framing::Buffer buf(const_cast<char*>(message.data()), message.size());
- msg->encode(buf);
- mcast.mcastControl(ClusterDeliverToQueueBody(ProtocolVersion(), queue, message), self);
- return true;
-}
-
-}} // namespace qpid::cluster