summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-03-12 20:11:31 +0000
committerAlan Conway <aconway@apache.org>2010-03-12 20:11:31 +0000
commitef9268528d3147173dfb0d2ef707ee3e4fc4f210 (patch)
tree4d8a9851683812bd04392f57c695a5143c80ca79 /cpp/src/qpid/cluster
parent937fe6e7295efff28cb680642fca28ebf65e7d4e (diff)
downloadqpid-python-ef9268528d3147173dfb0d2ef707ee3e4fc4f210.tar.gz
New cluster member pushes store when joining an active cluster.
Previously a broker with a clean store would not be able to join an active cluster because the shtudown-id did not match. This commit ensures that when a broker joins an active cluster, it always pushes its store regardless of status. Clean/dirty status is only compared when forming an initial cluster. This change required splitting initialization into two phases: PRE_INIT: occurs in the Cluster ctor during early-initialize. This phase determines whether or not to push the store. INIT: occurs after Cluster::initialize and does the remaining initialization chores. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@922412 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp91
-rw-r--r--cpp/src/qpid/cluster/Cluster.h4
-rw-r--r--cpp/src/qpid/cluster/InitialStatusMap.cpp32
-rw-r--r--cpp/src/qpid/cluster/InitialStatusMap.h11
-rw-r--r--cpp/src/qpid/cluster/Multicaster.cpp21
-rw-r--r--cpp/src/qpid/cluster/Multicaster.h23
-rw-r--r--cpp/src/qpid/cluster/PollableQueue.h20
-rw-r--r--cpp/src/qpid/cluster/StoreStatus.cpp39
8 files changed, 178 insertions, 63 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 92e2b65fe2..f8a875a30c 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -16,7 +16,8 @@
*
*/
-/** CLUSTER IMPLEMENTATION OVERVIEW
+/**
+ * <h1>CLUSTER IMPLEMENTATION OVERVIEW</h1>
*
* The cluster works on the principle that if all members of the
* cluster receive identical input, they will all produce identical
@@ -41,12 +42,15 @@
*
* The following are the current areas where broker uses timers or timestamps:
*
- * - Producer flow control: broker::SemanticState uses connection::getClusterOrderOutput.
- * a FrameHandler that sends frames to the client via the cluster. Used by broker::SessionState
+ * - Producer flow control: broker::SemanticState uses
+ * connection::getClusterOrderOutput. a FrameHandler that sends
+ * frames to the client via the cluster. Used by broker::SessionState
*
- * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is implemented by cluster::ExpiryPolicy.
+ * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is
+ * implemented by cluster::ExpiryPolicy.
*
- * - Connection heartbeat: sends connection controls, not part of session command counting so OK to ignore.
+ * - Connection heartbeat: sends connection controls, not part of
+ * session command counting so OK to ignore.
*
* - LinkRegistry: only cluster elder is ever active for links.
*
@@ -57,7 +61,10 @@
*
* cluster::ExpiryPolicy implements the strategy for message expiry.
*
- * CLUSTER PROTOCOL OVERVIEW
+ * ClusterTimer implements periodic timed events in the cluster context.
+ * Used for periodic management events.
+ *
+ * <h1>CLUSTER PROTOCOL OVERVIEW</h1>
*
* Messages sent to/from CPG are called Events.
*
@@ -84,12 +91,16 @@
* - Connection control events carrying non-cluster frames: frames sent to the client.
* e.g. flow-control frames generated on a timer.
*
- * CLUSTER INITIALIZATION OVERVIEW
+ * <h1>CLUSTER INITIALIZATION OVERVIEW</h1>
+ *
+ * @see InitialStatusMap
*
* When a new member joins the CPG group, all members (including the
* new one) multicast their "initial status." The new member is in
- * INIT mode until it gets a complete set of initial status messages
- * from all cluster members.
+ * PRE_INIT mode until it gets a complete set of initial status
+ * messages from all cluster members. In a newly-forming cluster is
+ * then in INIT mode until the configured cluster-size members have
+ * joined.
*
* The newcomer uses initial status to determine
* - The cluster UUID
@@ -97,11 +108,16 @@
* - Do I need to get an update from an existing active member?
* - Can I recover from my own store?
*
- * Initialization happens in the Cluster constructor (plugin
- * early-init phase) because it needs to be done before the store
- * initializes. In INIT mode sending & receiving from the cluster are
- * done single-threaded, bypassing the normal PollableQueues because
- * the Poller is not active at this point to service them.
+ * Pre-initialization happens in the Cluster constructor (plugin
+ * early-init phase) because it needs to set the recovery flag before
+ * the store initializes. This phase lasts until inital-status is
+ * received for all active members. The PollableQueues and Multicaster
+ * are in "bypass" mode during this phase since the poller has not
+ * started so there are no threads to serve pollable queues.
+ *
+ * The remaining initialization happens in Cluster::initialize() or,
+ * if cluster-size=N is specified, in the deliver thread when an
+ * initial-status control is delivered that brings the total to N.
*/
#include "qpid/Exception.h"
#include "qpid/cluster/Cluster.h"
@@ -244,7 +260,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
quorum(boost::bind(&Cluster::leave, this)),
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
- state(INIT),
+ state(PRE_INIT),
initMap(self, settings.size),
store(broker.getDataDir().getPath()),
elder(false),
@@ -274,17 +290,18 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
// without modifying delivery-properties.exchange.
broker.getExchanges().registerExchange(
boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+
// Load my store status before we go into initialization
if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
store.load();
- if (store.getState() == STORE_STATE_DIRTY_STORE)
- broker.setRecovery(false); // Ditch my current store.
if (store.getClusterId())
clusterId = store.getClusterId(); // Use stored ID if there is one.
QPID_LOG(notice, "Cluster store state: " << store)
}
-
cpg.join(name);
+ // pump the CPG dispatch manually till we get past PRE_INIT.
+ while (state == PRE_INIT)
+ cpg.dispatchOne();
}
Cluster::~Cluster() {
@@ -301,9 +318,14 @@ void Cluster::initialize() {
dispatcher.start();
deliverEventQueue.start();
deliverFrameQueue.start();
+ mcast.start();
+
+ // Run initMapCompleted immediately to process the initial configuration.
+ assert(state == INIT);
+ initMapCompleted(*(Mutex::ScopedLock*)0); // Fake lock, single-threaded context.
// Add finalizer last for exception safety.
- broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
+ broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
}
// Called in connection thread to insert a client connection.
@@ -579,9 +601,27 @@ void Cluster::setReady(Lock&) {
void Cluster::initMapCompleted(Lock& l) {
// Called on completion of the initial status map.
QPID_LOG(debug, *this << " initial status map complete. ");
- if (state == INIT) {
- // We have status for all members so we can make join descisions.
+ if (state == PRE_INIT) {
+ // PRE_INIT means we're still in the earlyInitialize phase, in the constructor.
+ // We decide here whether we want to recover from our store.
+ // We won't recover if we are joining an active cluster or our store is dirty.
+ if (store.hasStore() &&
+ (initMap.isActive() || store.getState() == STORE_STATE_DIRTY_STORE))
+ broker.setRecovery(false); // Ditch my current store.
+ state = INIT;
+ }
+ else if (state == INIT) {
+ // INIT means we are past Cluster::initialize().
+
+ // If we're forming an initial cluster (no active members)
+ // then we wait to reach the configured cluster-size
+ if (!initMap.isActive() && initMap.getActualSize() < initMap.getRequiredSize()) {
+ QPID_LOG(info, *this << initMap.getActualSize()
+ << " members, waiting for at least " << initMap.getRequiredSize());
+ return;
+ }
initMap.checkConsistent();
+
elders = initMap.getElders();
QPID_LOG(debug, *this << " elders: " << elders);
if (elders.empty())
@@ -969,7 +1009,8 @@ void Cluster::memberUpdate(Lock& l) {
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
static const char* STATE[] = {
- "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT"
+ "PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP",
+ "READY", "OFFER", "UPDATER", "LEFT"
};
assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
o << "cluster(" << cluster.self << " " << STATE[cluster.state];
@@ -1009,12 +1050,14 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu
}
void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) {
- timer->deliverWakeup(name);
+ if (state >= CATCHUP) // Pre catchup our timer isn't set up.
+ timer->deliverWakeup(name);
}
void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) {
QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name)
- timer->deliverDrop(name);
+ if (state >= CATCHUP) // Pre catchup our timer isn't set up.
+ timer->deliverDrop(name);
}
bool Cluster::isElder() const {
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index e280a7e928..4a64ad73d6 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -180,6 +180,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void memberUpdate(Lock&);
void setClusterId(const framing::Uuid&, Lock&);
void erase(const ConnectionId&, Lock&);
+ void requestUpdate(Lock& );
void initMapCompleted(Lock&);
void becomeElder(Lock&);
@@ -251,7 +252,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// Local cluster state, cluster map
enum {
- INIT, ///< Establishing inital cluster stattus.
+ PRE_INIT,///< Have not yet received complete initial status map.
+ INIT, ///< Waiting to reach cluster-size.
JOINER, ///< Sent update request, waiting for update offer.
UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete.
CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event.
diff --git a/cpp/src/qpid/cluster/InitialStatusMap.cpp b/cpp/src/qpid/cluster/InitialStatusMap.cpp
index a1a1456618..c8ecc13f2c 100644
--- a/cpp/src/qpid/cluster/InitialStatusMap.cpp
+++ b/cpp/src/qpid/cluster/InitialStatusMap.cpp
@@ -86,8 +86,7 @@ bool InitialStatusMap::notInitialized(const Map::value_type& v) {
}
bool InitialStatusMap::isComplete() const {
- return !map.empty() && find_if(map.begin(), map.end(), &notInitialized) == map.end()
- && (map.size() >= size);
+ return !map.empty() && find_if(map.begin(), map.end(), &notInitialized) == map.end();
}
bool InitialStatusMap::transitionToComplete() {
@@ -100,7 +99,7 @@ bool InitialStatusMap::isResendNeeded() {
return ret;
}
-bool InitialStatusMap::isActive(const Map::value_type& v) {
+bool InitialStatusMap::isActiveEntry(const Map::value_type& v) {
return v.second && v.second->getActive();
}
@@ -110,10 +109,15 @@ bool InitialStatusMap::hasStore(const Map::value_type& v) {
v.second->getStoreState() == STORE_STATE_DIRTY_STORE);
}
+bool InitialStatusMap::isActive() {
+ assert(isComplete());
+ return (find_if(map.begin(), map.end(), &isActiveEntry) != map.end());
+}
+
bool InitialStatusMap::isUpdateNeeded() {
assert(isComplete());
// We need an update if there are any active members.
- if (find_if(map.begin(), map.end(), &isActive) != map.end()) return true;
+ if (isActive()) return true;
// Otherwise it depends on store status, get my own status:
Map::iterator me = map.find(self);
@@ -154,7 +158,7 @@ MemberSet InitialStatusMap::getElders() const {
Uuid InitialStatusMap::getClusterId() {
assert(isComplete());
assert(!map.empty());
- Map::iterator i = find_if(map.begin(), map.end(), &isActive);
+ Map::iterator i = find_if(map.begin(), map.end(), &isActiveEntry);
if (i != map.end())
return i->second->getClusterId(); // An active member
else
@@ -178,6 +182,7 @@ void InitialStatusMap::checkConsistent() {
Uuid clusterId;
Uuid shutdownId;
+ bool initialCluster = !isActive();
for (Map::iterator i = map.begin(); i != map.end(); ++i) {
assert(i->second);
if (i->second->getActive()) ++active;
@@ -193,8 +198,10 @@ void InitialStatusMap::checkConsistent() {
++clean;
checkId(clusterId, i->second->getClusterId(),
"Cluster-ID mismatch. Stores belong to different clusters.");
- checkId(shutdownId, i->second->getShutdownId(),
- "Shutdown-ID mismatch. Stores were not shut down together");
+ // Only need shutdownId to match if we are in an initially forming cluster.
+ if (initialCluster)
+ checkId(shutdownId, i->second->getShutdownId(),
+ "Shutdown-ID mismatch. Stores were not shut down together");
break;
}
}
@@ -202,10 +209,13 @@ void InitialStatusMap::checkConsistent() {
if (none && (clean+dirty+empty))
throw Exception("Mixing transient and persistent brokers in a cluster");
- // If there are no active members and there are dirty stores there
- // must be at least one clean store.
- if (!active && dirty && !clean)
- throw Exception("Cannot recover, no clean store.");
+ if (map.size() >= size) {
+ // All initial members are present. If there are no active
+ // members and there are dirty stores there must be at least
+ // one clean store.
+ if (!active && dirty && !clean)
+ throw Exception("Cannot recover, no clean store.");
+ }
}
std::string InitialStatusMap::getFirstConfigStr() const {
diff --git a/cpp/src/qpid/cluster/InitialStatusMap.h b/cpp/src/qpid/cluster/InitialStatusMap.h
index eedc99b0b2..a5a600365e 100644
--- a/cpp/src/qpid/cluster/InitialStatusMap.h
+++ b/cpp/src/qpid/cluster/InitialStatusMap.h
@@ -51,12 +51,18 @@ class InitialStatusMap
/** Process received status */
void received(const MemberId&, const Status& is);
- /**@return true if the map is complete. */
+ /**@return true if the map has an entry for all current cluster members. */
bool isComplete() const;
+
+ size_t getActualSize() const { return map.size(); }
+ size_t getRequiredSize() const { return size; }
+
/**@return true if the map was completed by the last config change or received. */
bool transitionToComplete();
/**@pre isComplete(). @return this node's elders */
MemberSet getElders() const;
+ /**@pre isComplete(). @return True if there are active members of the cluster. */
+ bool isActive();
/**@pre isComplete(). @return True if we need to request an update. */
bool isUpdateNeeded();
/**@pre isComplete(). @return Cluster-wide cluster ID. */
@@ -71,8 +77,9 @@ class InitialStatusMap
private:
typedef std::map<MemberId, boost::optional<Status> > Map;
static bool notInitialized(const Map::value_type&);
- static bool isActive(const Map::value_type&);
+ static bool isActiveEntry(const Map::value_type&);
static bool hasStore(const Map::value_type&);
+
Map map;
MemberSet firstConfig;
MemberId self;
diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp
index 4a8195438f..d57ff76941 100644
--- a/cpp/src/qpid/cluster/Multicaster.cpp
+++ b/cpp/src/qpid/cluster/Multicaster.cpp
@@ -33,10 +33,8 @@ Multicaster::Multicaster(Cpg& cpg_,
boost::function<void()> onError_) :
onError(onError_), cpg(cpg_),
queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
- ready(false)
-{
- queue.start();
-}
+ ready(false), bypass(true)
+{}
void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId& id) {
mcast(Event::control(body, id));
@@ -61,10 +59,16 @@ void Multicaster::mcast(const Event& e) {
}
}
QPID_LOG(trace, "MCAST " << e);
- queue.push(e);
+ if (bypass) { // direct, don't queue
+ iovec iov = e.toIovec();
+ // FIXME aconway 2010-03-10: should do limited retry.
+ while (!cpg.mcast(&iov, 1))
+ ;
+ }
+ else
+ queue.push(e);
}
-
Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) {
try {
PollableEventQueue::Batch::const_iterator i = values.begin();
@@ -86,6 +90,11 @@ Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(co
}
}
+void Multicaster::start() {
+ queue.start();
+ bypass = false;
+}
+
void Multicaster::setReady() {
sys::Mutex::ScopedLock l(lock);
ready = true;
diff --git a/cpp/src/qpid/cluster/Multicaster.h b/cpp/src/qpid/cluster/Multicaster.h
index 2db84a9ce0..f70bd5ca31 100644
--- a/cpp/src/qpid/cluster/Multicaster.h
+++ b/cpp/src/qpid/cluster/Multicaster.h
@@ -41,16 +41,18 @@ class Cpg;
/**
* Multicast to the cluster. Shared, thread safe object.
- *
- * Runs in two modes;
*
- * initializing: Hold connection mcast events. Multicast cluster
- * events directly in the calling thread. This mode is used before
- * joining the cluster where the poller may not yet be active and we
- * want to hold any connection traffic till we join.
+ * holding mode: Hold connection events for later multicast. Cluster
+ * events are never held. Used during PRE_INIT/INIT state when we
+ * want to hold any connection traffic till we are read in the
+ * cluster.
+ *
+ * bypass mode: Multicast cluster events directly in the calling
+ * thread. This mode is used by cluster in PRE_INIT state the poller
+ * is not yet be active.
*
- * ready: normal operation. Queues all mcasts on a pollable queue,
- * multicasts connection and cluster events.
+ * Multicaster is created in bypass+holding mode, they are disabled by
+ * start and setReady respectively.
*/
class Multicaster
{
@@ -65,7 +67,9 @@ class Multicaster
void mcastBuffer(const char*, size_t, const ConnectionId&);
void mcast(const Event& e);
- /** Switch to ready mode. */
+ /** Start the pollable queue, turn off bypass mode. */
+ void start();
+ /** Switch to ready mode, release held messages. */
void setReady();
private:
@@ -81,6 +85,7 @@ class Multicaster
bool ready;
PlainEventQueue holdingQueue;
std::vector<struct ::iovec> ioVector;
+ bool bypass;
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/PollableQueue.h b/cpp/src/qpid/cluster/PollableQueue.h
index 2aed6de5b9..59d0bcd36a 100644
--- a/cpp/src/qpid/cluster/PollableQueue.h
+++ b/cpp/src/qpid/cluster/PollableQueue.h
@@ -31,6 +31,13 @@ namespace cluster {
/**
* More convenient version of PollableQueue that handles iterating
* over the batch and error handling.
+ *
+ * Constructed in "bypass" mode where items are processed directly
+ * rather than put on the queue. This is important for the
+ * PRE_INIT stage when Cluster is pumping CPG dispatch directly
+ * before the poller has started.
+ *
+ * Calling start() starts the pollable queue and disabled bypass mode.
*/
template <class T> class PollableQueue : public sys::PollableQueue<T> {
public:
@@ -41,7 +48,7 @@ template <class T> class PollableQueue : public sys::PollableQueue<T> {
const boost::shared_ptr<sys::Poller>& poller)
: sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1),
poller),
- callback(f), error(err), message(msg)
+ callback(f), error(err), message(msg), bypass(true)
{}
typename sys::PollableQueue<T>::Batch::const_iterator
@@ -62,10 +69,21 @@ template <class T> class PollableQueue : public sys::PollableQueue<T> {
}
}
+ void push(const T& t) {
+ if (bypass) callback(t);
+ else sys::PollableQueue<T>::push(t);
+ }
+
+ void start() {
+ bypass = false;
+ sys::PollableQueue<T>::start();
+ }
+
private:
Callback callback;
ErrorCallback error;
std::string message;
+ bool bypass;
};
diff --git a/cpp/src/qpid/cluster/StoreStatus.cpp b/cpp/src/qpid/cluster/StoreStatus.cpp
index 648fcfbbd5..b44c0e1a9a 100644
--- a/cpp/src/qpid/cluster/StoreStatus.cpp
+++ b/cpp/src/qpid/cluster/StoreStatus.cpp
@@ -21,6 +21,7 @@
#include "StoreStatus.h"
#include "qpid/Exception.h"
#include "qpid/Msg.h"
+#include "qpid/log/Statement.h"
#include <boost/filesystem/path.hpp>
#include <boost/filesystem/fstream.hpp>
#include <boost/filesystem/operations.hpp>
@@ -54,24 +55,39 @@ Uuid loadUuid(const fs::path& path) {
Uuid ret;
if (exists(path)) {
fs::ifstream i(path);
- throw_exceptions(i);
- i >> ret;
+ try {
+ throw_exceptions(i);
+ i >> ret;
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Cant load UUID from " << path.string() << ": " << e.what());
+ throw;
+ }
}
return ret;
}
void saveUuid(const fs::path& path, const Uuid& uuid) {
fs::ofstream o(path);
- throw_exceptions(o);
- o << uuid;
+ try {
+ throw_exceptions(o);
+ o << uuid;
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Cant save UUID to " << path.string() << ": " << e.what());
+ throw;
+ }
}
framing::SequenceNumber loadSeqNum(const fs::path& path) {
uint32_t n = 0;
if (exists(path)) {
fs::ifstream i(path);
- throw_exceptions(i);
- i >> n;
+ try {
+ throw_exceptions(i);
+ i >> n;
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Cant load sequence number from " << path.string() << ": " << e.what());
+ throw;
+ }
}
return framing::SequenceNumber(n);
}
@@ -105,9 +121,14 @@ void StoreStatus::save() {
create_directory(dir);
saveUuid(dir/CLUSTER_ID_FILE, clusterId);
saveUuid(dir/SHUTDOWN_ID_FILE, shutdownId);
- fs::ofstream o(dir/CONFIG_SEQ_FILE);
- throw_exceptions(o);
- o << configSeq.getValue();
+ try {
+ fs::ofstream o(dir/CONFIG_SEQ_FILE);
+ throw_exceptions(o);
+ o << configSeq.getValue();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Cant save sequence number to " << (dir/CONFIG_SEQ_FILE).string() << ": " << e.what());
+ throw;
+ }
}
catch (const std::exception&e) {
throw Exception(QPID_MSG("Cannot save cluster store status: " << e.what()));