diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 136 |
1 files changed, 101 insertions, 35 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 5e962e9767..07fdc6fc93 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -83,6 +83,25 @@ * - Connection control events carrying cluster.connection commands. * - Connection control events carrying non-cluster frames: frames sent to the client. * e.g. flow-control frames generated on a timer. + * + * CLUSTER INITIALIZATION OVERVIEW + * + * When a new member joins the CPG group, all members (including the + * new one) multicast their "initial status." The new member is in + * INIT mode until it gets a complete set of initial status messages + * from all cluster members. + * + * The newcomer uses initial status to determine + * - The cluster UUID + * - Am I speaking the correct version of the cluster protocol? + * - Do I need to get an update from an existing active member? + * - Can I recover from my own store? + * + * Initialization happens in the Cluster constructor (plugin + * early-init phase) because it needs to be done before the store + * initializes. In INIT mode sending & receiving from the cluster are + * done single-threaded, bypassing the normal PollableQueues because + * the Poller is not active at this point to service them. */ #include "qpid/cluster/Cluster.h" #include "qpid/cluster/ClusterSettings.h" @@ -97,6 +116,7 @@ #include "qmf/org/apache/qpid/cluster/Package.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Connection.h" +#include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/SignalHandler.h" @@ -162,9 +182,12 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {} void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } - void initialStatus(bool active, bool persistent, const Uuid& clusterId, - uint32_t version, const std::string& url) { - cluster.initialStatus(member, active, persistent, clusterId, version, url, l); + + void initialStatus(uint32_t version, bool active, const Uuid& clusterId, + uint8_t storeState, const Uuid& start, const Uuid& stop) + { + cluster.initialStatus(member, version, active, clusterId, + framing::cluster::StoreState(storeState), start, stop, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& current) { cluster.configChange(member, current, l); } @@ -204,11 +227,11 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : "Error delivering frames", poller), quorum(boost::bind(&Cluster::leave, this)), - initialized(false), decoder(boost::bind(&Cluster::deliverFrame, this, _1)), discarding(true), state(INIT), initMap(self, settings.size), + store(broker.getDataDir().getPath()), lastSize(0), lastBroker(false), updateRetracted(false), @@ -226,12 +249,17 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : failoverExchange.reset(new FailoverExchange(this)); broker.getExchanges().registerExchange(failoverExchange); - // Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange. - broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); + // Update exchange is used during updates to replicate messages + // without modifying delivery-properties.exchange. + broker.getExchanges().registerExchange( + boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); + // Load my store status before we go into initialization + if (! broker::NullMessageStore::isNullStore(&broker.getStore())) + store.load(); cpg.join(name); - // pump the CPG dispatch manually till we get initialized. - while (!initialized) + // Pump the CPG dispatch manually till we get initialized. + while (state == INIT) cpg.dispatchOne(); } @@ -243,12 +271,24 @@ void Cluster::initialize() { if (settings.quorum) quorum.start(poller); if (myUrl.empty()) myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT)); - QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); + // Cluster constructor will leave us in either READY or JOINER state. + switch (state) { + case READY: + mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); + break; + case JOINER: + mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); + break; + default: + assert(0); + } + QPID_LOG(notice, *this << (state == READY ? "joined" : "joining") << " cluster " << name << " with url=" << myUrl); broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); broker.setExpiryPolicy(expiryPolicy); dispatcher.start(); deliverEventQueue.start(); deliverFrameQueue.start(); + // Add finalizer last for exception safety. broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); } @@ -344,11 +384,21 @@ void Cluster::deliver( } void Cluster::deliverEvent(const Event& e) { - deliverEventQueue.push(e); + // During initialization, execute events directly in the same thread. + // Once initialized, push to pollable queue to be processed in another thread. + if (state == INIT) + deliveredEvent(e); + else + deliverEventQueue.push(e); } void Cluster::deliverFrame(const EventFrame& e) { - deliverFrameQueue.push(e); + // During initialization, execute events directly in the same thread. + // Once initialized, push to pollable queue to be processed in another thread. + if (state == INIT) + deliveredFrame(e); + else + deliverFrameQueue.push(e); } const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) { @@ -524,12 +574,6 @@ void Cluster::configChange ( const cpg_address *joined, int nJoined) { Mutex::ScopedLock l(lock); - if (state == INIT) { - // FIXME aconway 2009-11-16: persistent restart - // Recover only if we are first in cluster. - broker.setRecovery(nCurrent == 1); - initialized = true; - } QPID_LOG(notice, *this << " membership change: " << AddrList(current, nCurrent) << "(" << AddrList(joined, nJoined, "joined: ") @@ -544,30 +588,42 @@ void Cluster::configChange ( void Cluster::setReady(Lock&) { state = READY; if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); - mcast.release(); + mcast.setReady(); broker.getQueueEvents().enable(); } void Cluster::initMapCompleted(Lock& l) { + // Called on completion of the initial status map. if (state == INIT) { + // We have status for all members so we can make join descisions. elders = initMap.getElders(); + QPID_LOG(debug, *this << " elders: " << elders); if (!elders.empty()) { // I'm not the elder, I don't handle links & replication. broker.getLinks().setPassive(true); broker.getQueueEvents().disable(); + QPID_LOG(info, *this << " not active for links."); } + else { + QPID_LOG(info, this << " active for links."); + } + setClusterId(initMap.getClusterId(), l); + // FIXME aconway 2009-11-20: store id == cluster id. + // Clean up redundant copy of id in InitialStatus + // Use store ID as advertized cluster ID. + // Consistency check on cluster ID vs. locally stored ID. + // throw rathr than assert in StoreStatus. + if (store.hasStore()) store.dirty(clusterId); if (initMap.isUpdateNeeded()) { // Joining established cluster. + broker.setRecovery(false); // Ditch my current store. state = JOINER; - mcast.mcastControl( - ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); } else { // I can go ready. - QPID_LOG(notice, *this << " ready."); discarding = false; setReady(l); - map = ClusterMap(initMap.getMemberUrls()); memberUpdate(l); } + QPID_LOG(debug, *this << "Initialization complete"); } } @@ -587,9 +643,11 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& initMap.configChange(config); if (initMap.isResendNeeded()) { mcast.mcastControl( - // FIXME aconway 2009-11-17: persistent restart, set persistence bit. - ClusterInitialStatusBody(ProtocolVersion(), state > INIT, false, clusterId, - CLUSTER_VERSION, myUrl.str()), self); + ClusterInitialStatusBody( + ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, + store.getState(), store.getStart(), store.getStop() + ), + self); } if (initMap.transitionToComplete()) initMapCompleted(l); @@ -597,6 +655,7 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& memberUpdate(l); if (elders.empty()) { // We are the oldest, reactive links if necessary + QPID_LOG(info, this << " becoming active for links."); broker.getLinks().setPassive(false); } } @@ -628,9 +687,11 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) makeOffer(id, l); } -void Cluster::initialStatus(const MemberId& member, bool active, bool persistent, - const framing::Uuid& id, uint32_t version, - const std::string& url, Lock& l) +void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active, + const framing::Uuid& id, + framing::cluster::StoreState store, + const framing::Uuid& start, const framing::Uuid& end, + Lock& l) { if (version != CLUSTER_VERSION) { QPID_LOG(critical, *this << " incompatible cluster versions " << @@ -640,9 +701,13 @@ void Cluster::initialStatus(const MemberId& member, bool active, bool persistent } initMap.received( member, - ClusterInitialStatusBody(ProtocolVersion(), active, persistent, id, version, url) + ClusterInitialStatusBody( + ProtocolVersion(), version, active, id, store, start, end) ); - if (initMap.transitionToComplete()) initMapCompleted(l); + if (initMap.transitionToComplete()) { + QPID_LOG(debug, *this << " initial status map complete. "); + initMapCompleted(l); + } } void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { @@ -650,7 +715,7 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { memberUpdate(l); if (state == CATCHUP && id == self) { setReady(l); - QPID_LOG(notice, *this << " caught up, active cluster member."); + QPID_LOG(notice, *this << " caught up."); } } @@ -770,8 +835,7 @@ void Cluster::updateOutDone(Lock& l) { QPID_LOG(notice, *this << " update sent"); assert(state == UPDATER); state = READY; - mcast.release(); - deliverEventQueue.start(); // Start processing events again. + deliverEventQueue.start(); // Start processing events again. makeOffer(map.firstJoiner(), l); // Try another offer } @@ -781,8 +845,10 @@ void Cluster::updateOutError(const std::exception& e) { updateOutDone(l); } -void Cluster ::shutdown(const MemberId& id, Lock& l) { - QPID_LOG(notice, *this << " received shutdown from " << id); +void Cluster ::shutdown(const MemberId& , Lock& l) { + QPID_LOG(notice, *this << " cluster shut down by administrator."); + // FIXME aconway 2009-11-20: incorrect! Need to pass UUID on shutdown command. + if (store.hasStore()) store.clean(Uuid(true)); leave(l); } |
