diff options
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 91 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/InitialStatusMap.cpp | 32 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/InitialStatusMap.h | 11 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Multicaster.cpp | 21 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Multicaster.h | 23 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/PollableQueue.h | 20 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/StoreStatus.cpp | 39 |
8 files changed, 178 insertions, 63 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 92e2b65fe2..f8a875a30c 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -16,7 +16,8 @@ * */ -/** CLUSTER IMPLEMENTATION OVERVIEW +/** + * <h1>CLUSTER IMPLEMENTATION OVERVIEW</h1> * * The cluster works on the principle that if all members of the * cluster receive identical input, they will all produce identical @@ -41,12 +42,15 @@ * * The following are the current areas where broker uses timers or timestamps: * - * - Producer flow control: broker::SemanticState uses connection::getClusterOrderOutput. - * a FrameHandler that sends frames to the client via the cluster. Used by broker::SessionState + * - Producer flow control: broker::SemanticState uses + * connection::getClusterOrderOutput. a FrameHandler that sends + * frames to the client via the cluster. Used by broker::SessionState * - * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is implemented by cluster::ExpiryPolicy. + * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is + * implemented by cluster::ExpiryPolicy. * - * - Connection heartbeat: sends connection controls, not part of session command counting so OK to ignore. + * - Connection heartbeat: sends connection controls, not part of + * session command counting so OK to ignore. * * - LinkRegistry: only cluster elder is ever active for links. * @@ -57,7 +61,10 @@ * * cluster::ExpiryPolicy implements the strategy for message expiry. * - * CLUSTER PROTOCOL OVERVIEW + * ClusterTimer implements periodic timed events in the cluster context. + * Used for periodic management events. + * + * <h1>CLUSTER PROTOCOL OVERVIEW</h1> * * Messages sent to/from CPG are called Events. * @@ -84,12 +91,16 @@ * - Connection control events carrying non-cluster frames: frames sent to the client. * e.g. flow-control frames generated on a timer. * - * CLUSTER INITIALIZATION OVERVIEW + * <h1>CLUSTER INITIALIZATION OVERVIEW</h1> + * + * @see InitialStatusMap * * When a new member joins the CPG group, all members (including the * new one) multicast their "initial status." The new member is in - * INIT mode until it gets a complete set of initial status messages - * from all cluster members. + * PRE_INIT mode until it gets a complete set of initial status + * messages from all cluster members. In a newly-forming cluster is + * then in INIT mode until the configured cluster-size members have + * joined. * * The newcomer uses initial status to determine * - The cluster UUID @@ -97,11 +108,16 @@ * - Do I need to get an update from an existing active member? * - Can I recover from my own store? * - * Initialization happens in the Cluster constructor (plugin - * early-init phase) because it needs to be done before the store - * initializes. In INIT mode sending & receiving from the cluster are - * done single-threaded, bypassing the normal PollableQueues because - * the Poller is not active at this point to service them. + * Pre-initialization happens in the Cluster constructor (plugin + * early-init phase) because it needs to set the recovery flag before + * the store initializes. This phase lasts until inital-status is + * received for all active members. The PollableQueues and Multicaster + * are in "bypass" mode during this phase since the poller has not + * started so there are no threads to serve pollable queues. + * + * The remaining initialization happens in Cluster::initialize() or, + * if cluster-size=N is specified, in the deliver thread when an + * initial-status control is delivered that brings the total to N. */ #include "qpid/Exception.h" #include "qpid/cluster/Cluster.h" @@ -244,7 +260,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : quorum(boost::bind(&Cluster::leave, this)), decoder(boost::bind(&Cluster::deliverFrame, this, _1)), discarding(true), - state(INIT), + state(PRE_INIT), initMap(self, settings.size), store(broker.getDataDir().getPath()), elder(false), @@ -274,17 +290,18 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : // without modifying delivery-properties.exchange. broker.getExchanges().registerExchange( boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); + // Load my store status before we go into initialization if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { store.load(); - if (store.getState() == STORE_STATE_DIRTY_STORE) - broker.setRecovery(false); // Ditch my current store. if (store.getClusterId()) clusterId = store.getClusterId(); // Use stored ID if there is one. QPID_LOG(notice, "Cluster store state: " << store) } - cpg.join(name); + // pump the CPG dispatch manually till we get past PRE_INIT. + while (state == PRE_INIT) + cpg.dispatchOne(); } Cluster::~Cluster() { @@ -301,9 +318,14 @@ void Cluster::initialize() { dispatcher.start(); deliverEventQueue.start(); deliverFrameQueue.start(); + mcast.start(); + + // Run initMapCompleted immediately to process the initial configuration. + assert(state == INIT); + initMapCompleted(*(Mutex::ScopedLock*)0); // Fake lock, single-threaded context. // Add finalizer last for exception safety. - broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); + broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); } // Called in connection thread to insert a client connection. @@ -579,9 +601,27 @@ void Cluster::setReady(Lock&) { void Cluster::initMapCompleted(Lock& l) { // Called on completion of the initial status map. QPID_LOG(debug, *this << " initial status map complete. "); - if (state == INIT) { - // We have status for all members so we can make join descisions. + if (state == PRE_INIT) { + // PRE_INIT means we're still in the earlyInitialize phase, in the constructor. + // We decide here whether we want to recover from our store. + // We won't recover if we are joining an active cluster or our store is dirty. + if (store.hasStore() && + (initMap.isActive() || store.getState() == STORE_STATE_DIRTY_STORE)) + broker.setRecovery(false); // Ditch my current store. + state = INIT; + } + else if (state == INIT) { + // INIT means we are past Cluster::initialize(). + + // If we're forming an initial cluster (no active members) + // then we wait to reach the configured cluster-size + if (!initMap.isActive() && initMap.getActualSize() < initMap.getRequiredSize()) { + QPID_LOG(info, *this << initMap.getActualSize() + << " members, waiting for at least " << initMap.getRequiredSize()); + return; + } initMap.checkConsistent(); + elders = initMap.getElders(); QPID_LOG(debug, *this << " elders: " << elders); if (elders.empty()) @@ -969,7 +1009,8 @@ void Cluster::memberUpdate(Lock& l) { std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { static const char* STATE[] = { - "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" + "PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP", + "READY", "OFFER", "UPDATER", "LEFT" }; assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1); o << "cluster(" << cluster.self << " " << STATE[cluster.state]; @@ -1009,12 +1050,14 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu } void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) { - timer->deliverWakeup(name); + if (state >= CATCHUP) // Pre catchup our timer isn't set up. + timer->deliverWakeup(name); } void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) { QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name) - timer->deliverDrop(name); + if (state >= CATCHUP) // Pre catchup our timer isn't set up. + timer->deliverDrop(name); } bool Cluster::isElder() const { diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index e280a7e928..4a64ad73d6 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -180,6 +180,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void memberUpdate(Lock&); void setClusterId(const framing::Uuid&, Lock&); void erase(const ConnectionId&, Lock&); + void requestUpdate(Lock& ); void initMapCompleted(Lock&); void becomeElder(Lock&); @@ -251,7 +252,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Local cluster state, cluster map enum { - INIT, ///< Establishing inital cluster stattus. + PRE_INIT,///< Have not yet received complete initial status map. + INIT, ///< Waiting to reach cluster-size. JOINER, ///< Sent update request, waiting for update offer. UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete. CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event. diff --git a/cpp/src/qpid/cluster/InitialStatusMap.cpp b/cpp/src/qpid/cluster/InitialStatusMap.cpp index a1a1456618..c8ecc13f2c 100644 --- a/cpp/src/qpid/cluster/InitialStatusMap.cpp +++ b/cpp/src/qpid/cluster/InitialStatusMap.cpp @@ -86,8 +86,7 @@ bool InitialStatusMap::notInitialized(const Map::value_type& v) { } bool InitialStatusMap::isComplete() const { - return !map.empty() && find_if(map.begin(), map.end(), ¬Initialized) == map.end() - && (map.size() >= size); + return !map.empty() && find_if(map.begin(), map.end(), ¬Initialized) == map.end(); } bool InitialStatusMap::transitionToComplete() { @@ -100,7 +99,7 @@ bool InitialStatusMap::isResendNeeded() { return ret; } -bool InitialStatusMap::isActive(const Map::value_type& v) { +bool InitialStatusMap::isActiveEntry(const Map::value_type& v) { return v.second && v.second->getActive(); } @@ -110,10 +109,15 @@ bool InitialStatusMap::hasStore(const Map::value_type& v) { v.second->getStoreState() == STORE_STATE_DIRTY_STORE); } +bool InitialStatusMap::isActive() { + assert(isComplete()); + return (find_if(map.begin(), map.end(), &isActiveEntry) != map.end()); +} + bool InitialStatusMap::isUpdateNeeded() { assert(isComplete()); // We need an update if there are any active members. - if (find_if(map.begin(), map.end(), &isActive) != map.end()) return true; + if (isActive()) return true; // Otherwise it depends on store status, get my own status: Map::iterator me = map.find(self); @@ -154,7 +158,7 @@ MemberSet InitialStatusMap::getElders() const { Uuid InitialStatusMap::getClusterId() { assert(isComplete()); assert(!map.empty()); - Map::iterator i = find_if(map.begin(), map.end(), &isActive); + Map::iterator i = find_if(map.begin(), map.end(), &isActiveEntry); if (i != map.end()) return i->second->getClusterId(); // An active member else @@ -178,6 +182,7 @@ void InitialStatusMap::checkConsistent() { Uuid clusterId; Uuid shutdownId; + bool initialCluster = !isActive(); for (Map::iterator i = map.begin(); i != map.end(); ++i) { assert(i->second); if (i->second->getActive()) ++active; @@ -193,8 +198,10 @@ void InitialStatusMap::checkConsistent() { ++clean; checkId(clusterId, i->second->getClusterId(), "Cluster-ID mismatch. Stores belong to different clusters."); - checkId(shutdownId, i->second->getShutdownId(), - "Shutdown-ID mismatch. Stores were not shut down together"); + // Only need shutdownId to match if we are in an initially forming cluster. + if (initialCluster) + checkId(shutdownId, i->second->getShutdownId(), + "Shutdown-ID mismatch. Stores were not shut down together"); break; } } @@ -202,10 +209,13 @@ void InitialStatusMap::checkConsistent() { if (none && (clean+dirty+empty)) throw Exception("Mixing transient and persistent brokers in a cluster"); - // If there are no active members and there are dirty stores there - // must be at least one clean store. - if (!active && dirty && !clean) - throw Exception("Cannot recover, no clean store."); + if (map.size() >= size) { + // All initial members are present. If there are no active + // members and there are dirty stores there must be at least + // one clean store. + if (!active && dirty && !clean) + throw Exception("Cannot recover, no clean store."); + } } std::string InitialStatusMap::getFirstConfigStr() const { diff --git a/cpp/src/qpid/cluster/InitialStatusMap.h b/cpp/src/qpid/cluster/InitialStatusMap.h index eedc99b0b2..a5a600365e 100644 --- a/cpp/src/qpid/cluster/InitialStatusMap.h +++ b/cpp/src/qpid/cluster/InitialStatusMap.h @@ -51,12 +51,18 @@ class InitialStatusMap /** Process received status */ void received(const MemberId&, const Status& is); - /**@return true if the map is complete. */ + /**@return true if the map has an entry for all current cluster members. */ bool isComplete() const; + + size_t getActualSize() const { return map.size(); } + size_t getRequiredSize() const { return size; } + /**@return true if the map was completed by the last config change or received. */ bool transitionToComplete(); /**@pre isComplete(). @return this node's elders */ MemberSet getElders() const; + /**@pre isComplete(). @return True if there are active members of the cluster. */ + bool isActive(); /**@pre isComplete(). @return True if we need to request an update. */ bool isUpdateNeeded(); /**@pre isComplete(). @return Cluster-wide cluster ID. */ @@ -71,8 +77,9 @@ class InitialStatusMap private: typedef std::map<MemberId, boost::optional<Status> > Map; static bool notInitialized(const Map::value_type&); - static bool isActive(const Map::value_type&); + static bool isActiveEntry(const Map::value_type&); static bool hasStore(const Map::value_type&); + Map map; MemberSet firstConfig; MemberId self; diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index 4a8195438f..d57ff76941 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -33,10 +33,8 @@ Multicaster::Multicaster(Cpg& cpg_, boost::function<void()> onError_) : onError(onError_), cpg(cpg_), queue(boost::bind(&Multicaster::sendMcast, this, _1), poller), - ready(false) -{ - queue.start(); -} + ready(false), bypass(true) +{} void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId& id) { mcast(Event::control(body, id)); @@ -61,10 +59,16 @@ void Multicaster::mcast(const Event& e) { } } QPID_LOG(trace, "MCAST " << e); - queue.push(e); + if (bypass) { // direct, don't queue + iovec iov = e.toIovec(); + // FIXME aconway 2010-03-10: should do limited retry. + while (!cpg.mcast(&iov, 1)) + ; + } + else + queue.push(e); } - Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) { try { PollableEventQueue::Batch::const_iterator i = values.begin(); @@ -86,6 +90,11 @@ Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(co } } +void Multicaster::start() { + queue.start(); + bypass = false; +} + void Multicaster::setReady() { sys::Mutex::ScopedLock l(lock); ready = true; diff --git a/cpp/src/qpid/cluster/Multicaster.h b/cpp/src/qpid/cluster/Multicaster.h index 2db84a9ce0..f70bd5ca31 100644 --- a/cpp/src/qpid/cluster/Multicaster.h +++ b/cpp/src/qpid/cluster/Multicaster.h @@ -41,16 +41,18 @@ class Cpg; /** * Multicast to the cluster. Shared, thread safe object. - * - * Runs in two modes; * - * initializing: Hold connection mcast events. Multicast cluster - * events directly in the calling thread. This mode is used before - * joining the cluster where the poller may not yet be active and we - * want to hold any connection traffic till we join. + * holding mode: Hold connection events for later multicast. Cluster + * events are never held. Used during PRE_INIT/INIT state when we + * want to hold any connection traffic till we are read in the + * cluster. + * + * bypass mode: Multicast cluster events directly in the calling + * thread. This mode is used by cluster in PRE_INIT state the poller + * is not yet be active. * - * ready: normal operation. Queues all mcasts on a pollable queue, - * multicasts connection and cluster events. + * Multicaster is created in bypass+holding mode, they are disabled by + * start and setReady respectively. */ class Multicaster { @@ -65,7 +67,9 @@ class Multicaster void mcastBuffer(const char*, size_t, const ConnectionId&); void mcast(const Event& e); - /** Switch to ready mode. */ + /** Start the pollable queue, turn off bypass mode. */ + void start(); + /** Switch to ready mode, release held messages. */ void setReady(); private: @@ -81,6 +85,7 @@ class Multicaster bool ready; PlainEventQueue holdingQueue; std::vector<struct ::iovec> ioVector; + bool bypass; }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/PollableQueue.h b/cpp/src/qpid/cluster/PollableQueue.h index 2aed6de5b9..59d0bcd36a 100644 --- a/cpp/src/qpid/cluster/PollableQueue.h +++ b/cpp/src/qpid/cluster/PollableQueue.h @@ -31,6 +31,13 @@ namespace cluster { /** * More convenient version of PollableQueue that handles iterating * over the batch and error handling. + * + * Constructed in "bypass" mode where items are processed directly + * rather than put on the queue. This is important for the + * PRE_INIT stage when Cluster is pumping CPG dispatch directly + * before the poller has started. + * + * Calling start() starts the pollable queue and disabled bypass mode. */ template <class T> class PollableQueue : public sys::PollableQueue<T> { public: @@ -41,7 +48,7 @@ template <class T> class PollableQueue : public sys::PollableQueue<T> { const boost::shared_ptr<sys::Poller>& poller) : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1), poller), - callback(f), error(err), message(msg) + callback(f), error(err), message(msg), bypass(true) {} typename sys::PollableQueue<T>::Batch::const_iterator @@ -62,10 +69,21 @@ template <class T> class PollableQueue : public sys::PollableQueue<T> { } } + void push(const T& t) { + if (bypass) callback(t); + else sys::PollableQueue<T>::push(t); + } + + void start() { + bypass = false; + sys::PollableQueue<T>::start(); + } + private: Callback callback; ErrorCallback error; std::string message; + bool bypass; }; diff --git a/cpp/src/qpid/cluster/StoreStatus.cpp b/cpp/src/qpid/cluster/StoreStatus.cpp index 648fcfbbd5..b44c0e1a9a 100644 --- a/cpp/src/qpid/cluster/StoreStatus.cpp +++ b/cpp/src/qpid/cluster/StoreStatus.cpp @@ -21,6 +21,7 @@ #include "StoreStatus.h" #include "qpid/Exception.h" #include "qpid/Msg.h" +#include "qpid/log/Statement.h" #include <boost/filesystem/path.hpp> #include <boost/filesystem/fstream.hpp> #include <boost/filesystem/operations.hpp> @@ -54,24 +55,39 @@ Uuid loadUuid(const fs::path& path) { Uuid ret; if (exists(path)) { fs::ifstream i(path); - throw_exceptions(i); - i >> ret; + try { + throw_exceptions(i); + i >> ret; + } catch (const std::exception& e) { + QPID_LOG(error, "Cant load UUID from " << path.string() << ": " << e.what()); + throw; + } } return ret; } void saveUuid(const fs::path& path, const Uuid& uuid) { fs::ofstream o(path); - throw_exceptions(o); - o << uuid; + try { + throw_exceptions(o); + o << uuid; + } catch (const std::exception& e) { + QPID_LOG(error, "Cant save UUID to " << path.string() << ": " << e.what()); + throw; + } } framing::SequenceNumber loadSeqNum(const fs::path& path) { uint32_t n = 0; if (exists(path)) { fs::ifstream i(path); - throw_exceptions(i); - i >> n; + try { + throw_exceptions(i); + i >> n; + } catch (const std::exception& e) { + QPID_LOG(error, "Cant load sequence number from " << path.string() << ": " << e.what()); + throw; + } } return framing::SequenceNumber(n); } @@ -105,9 +121,14 @@ void StoreStatus::save() { create_directory(dir); saveUuid(dir/CLUSTER_ID_FILE, clusterId); saveUuid(dir/SHUTDOWN_ID_FILE, shutdownId); - fs::ofstream o(dir/CONFIG_SEQ_FILE); - throw_exceptions(o); - o << configSeq.getValue(); + try { + fs::ofstream o(dir/CONFIG_SEQ_FILE); + throw_exceptions(o); + o << configSeq.getValue(); + } catch (const std::exception& e) { + QPID_LOG(error, "Cant save sequence number to " << (dir/CONFIG_SEQ_FILE).string() << ": " << e.what()); + throw; + } } catch (const std::exception&e) { throw Exception(QPID_MSG("Cannot save cluster store status: " << e.what())); |
