summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-11-24 20:07:24 +0000
committerAlan Conway <aconway@apache.org>2009-11-24 20:07:24 +0000
commit0fb7ff9cfbfd01e9093c2c6021a5915696d2a089 (patch)
tree1d2db335592be80a9aa9f8f404d2c1682afeb485 /cpp/src/qpid/cluster/Cluster.cpp
parent1ee447563d208b39e962537a47f14aea741777b0 (diff)
downloadqpid-python-0fb7ff9cfbfd01e9093c2c6021a5915696d2a089.tar.gz
Support for restarting a persistent cluster.
Option --cluster-size=N: members wait for N members before recovering store. Stores marked as clean/dirty. Automatically recover from clean store on restart. Stores marked with UUID to detect errors. Not yet implemented: consistency checks, manual recovery from all dirty stores. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@883842 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp136
1 files changed, 101 insertions, 35 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 5e962e9767..07fdc6fc93 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -83,6 +83,25 @@
* - Connection control events carrying cluster.connection commands.
* - Connection control events carrying non-cluster frames: frames sent to the client.
* e.g. flow-control frames generated on a timer.
+ *
+ * CLUSTER INITIALIZATION OVERVIEW
+ *
+ * When a new member joins the CPG group, all members (including the
+ * new one) multicast their "initial status." The new member is in
+ * INIT mode until it gets a complete set of initial status messages
+ * from all cluster members.
+ *
+ * The newcomer uses initial status to determine
+ * - The cluster UUID
+ * - Am I speaking the correct version of the cluster protocol?
+ * - Do I need to get an update from an existing active member?
+ * - Can I recover from my own store?
+ *
+ * Initialization happens in the Cluster constructor (plugin
+ * early-init phase) because it needs to be done before the store
+ * initializes. In INIT mode sending & receiving from the cluster are
+ * done single-threaded, bypassing the normal PollableQueues because
+ * the Poller is not active at this point to service them.
*/
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/ClusterSettings.h"
@@ -97,6 +116,7 @@
#include "qmf/org/apache/qpid/cluster/Package.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
+#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SignalHandler.h"
@@ -162,9 +182,12 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {}
void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
- void initialStatus(bool active, bool persistent, const Uuid& clusterId,
- uint32_t version, const std::string& url) {
- cluster.initialStatus(member, active, persistent, clusterId, version, url, l);
+
+ void initialStatus(uint32_t version, bool active, const Uuid& clusterId,
+ uint8_t storeState, const Uuid& start, const Uuid& stop)
+ {
+ cluster.initialStatus(member, version, active, clusterId,
+ framing::cluster::StoreState(storeState), start, stop, l);
}
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& current) { cluster.configChange(member, current, l); }
@@ -204,11 +227,11 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
"Error delivering frames",
poller),
quorum(boost::bind(&Cluster::leave, this)),
- initialized(false),
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
state(INIT),
initMap(self, settings.size),
+ store(broker.getDataDir().getPath()),
lastSize(0),
lastBroker(false),
updateRetracted(false),
@@ -226,12 +249,17 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
failoverExchange.reset(new FailoverExchange(this));
broker.getExchanges().registerExchange(failoverExchange);
- // Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange.
- broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+ // Update exchange is used during updates to replicate messages
+ // without modifying delivery-properties.exchange.
+ broker.getExchanges().registerExchange(
+ boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+ // Load my store status before we go into initialization
+ if (! broker::NullMessageStore::isNullStore(&broker.getStore()))
+ store.load();
cpg.join(name);
- // pump the CPG dispatch manually till we get initialized.
- while (!initialized)
+ // Pump the CPG dispatch manually till we get initialized.
+ while (state == INIT)
cpg.dispatchOne();
}
@@ -243,12 +271,24 @@ void Cluster::initialize() {
if (settings.quorum) quorum.start(poller);
if (myUrl.empty())
myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
- QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
+ // Cluster constructor will leave us in either READY or JOINER state.
+ switch (state) {
+ case READY:
+ mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
+ break;
+ case JOINER:
+ mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
+ break;
+ default:
+ assert(0);
+ }
+ QPID_LOG(notice, *this << (state == READY ? "joined" : "joining") << " cluster " << name << " with url=" << myUrl);
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
deliverEventQueue.start();
deliverFrameQueue.start();
+
// Add finalizer last for exception safety.
broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
}
@@ -344,11 +384,21 @@ void Cluster::deliver(
}
void Cluster::deliverEvent(const Event& e) {
- deliverEventQueue.push(e);
+ // During initialization, execute events directly in the same thread.
+ // Once initialized, push to pollable queue to be processed in another thread.
+ if (state == INIT)
+ deliveredEvent(e);
+ else
+ deliverEventQueue.push(e);
}
void Cluster::deliverFrame(const EventFrame& e) {
- deliverFrameQueue.push(e);
+ // During initialization, execute events directly in the same thread.
+ // Once initialized, push to pollable queue to be processed in another thread.
+ if (state == INIT)
+ deliveredFrame(e);
+ else
+ deliverFrameQueue.push(e);
}
const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) {
@@ -524,12 +574,6 @@ void Cluster::configChange (
const cpg_address *joined, int nJoined)
{
Mutex::ScopedLock l(lock);
- if (state == INIT) {
- // FIXME aconway 2009-11-16: persistent restart
- // Recover only if we are first in cluster.
- broker.setRecovery(nCurrent == 1);
- initialized = true;
- }
QPID_LOG(notice, *this << " membership change: "
<< AddrList(current, nCurrent) << "("
<< AddrList(joined, nJoined, "joined: ")
@@ -544,30 +588,42 @@ void Cluster::configChange (
void Cluster::setReady(Lock&) {
state = READY;
if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
- mcast.release();
+ mcast.setReady();
broker.getQueueEvents().enable();
}
void Cluster::initMapCompleted(Lock& l) {
+ // Called on completion of the initial status map.
if (state == INIT) {
+ // We have status for all members so we can make join descisions.
elders = initMap.getElders();
+ QPID_LOG(debug, *this << " elders: " << elders);
if (!elders.empty()) { // I'm not the elder, I don't handle links & replication.
broker.getLinks().setPassive(true);
broker.getQueueEvents().disable();
+ QPID_LOG(info, *this << " not active for links.");
}
+ else {
+ QPID_LOG(info, this << " active for links.");
+ }
+
setClusterId(initMap.getClusterId(), l);
+ // FIXME aconway 2009-11-20: store id == cluster id.
+ // Clean up redundant copy of id in InitialStatus
+ // Use store ID as advertized cluster ID.
+ // Consistency check on cluster ID vs. locally stored ID.
+ // throw rathr than assert in StoreStatus.
+ if (store.hasStore()) store.dirty(clusterId);
if (initMap.isUpdateNeeded()) { // Joining established cluster.
+ broker.setRecovery(false); // Ditch my current store.
state = JOINER;
- mcast.mcastControl(
- ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
}
else { // I can go ready.
- QPID_LOG(notice, *this << " ready.");
discarding = false;
setReady(l);
- map = ClusterMap(initMap.getMemberUrls());
memberUpdate(l);
}
+ QPID_LOG(debug, *this << "Initialization complete");
}
}
@@ -587,9 +643,11 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock&
initMap.configChange(config);
if (initMap.isResendNeeded()) {
mcast.mcastControl(
- // FIXME aconway 2009-11-17: persistent restart, set persistence bit.
- ClusterInitialStatusBody(ProtocolVersion(), state > INIT, false, clusterId,
- CLUSTER_VERSION, myUrl.str()), self);
+ ClusterInitialStatusBody(
+ ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
+ store.getState(), store.getStart(), store.getStop()
+ ),
+ self);
}
if (initMap.transitionToComplete()) initMapCompleted(l);
@@ -597,6 +655,7 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock&
memberUpdate(l);
if (elders.empty()) {
// We are the oldest, reactive links if necessary
+ QPID_LOG(info, this << " becoming active for links.");
broker.getLinks().setPassive(false);
}
}
@@ -628,9 +687,11 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l)
makeOffer(id, l);
}
-void Cluster::initialStatus(const MemberId& member, bool active, bool persistent,
- const framing::Uuid& id, uint32_t version,
- const std::string& url, Lock& l)
+void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active,
+ const framing::Uuid& id,
+ framing::cluster::StoreState store,
+ const framing::Uuid& start, const framing::Uuid& end,
+ Lock& l)
{
if (version != CLUSTER_VERSION) {
QPID_LOG(critical, *this << " incompatible cluster versions " <<
@@ -640,9 +701,13 @@ void Cluster::initialStatus(const MemberId& member, bool active, bool persistent
}
initMap.received(
member,
- ClusterInitialStatusBody(ProtocolVersion(), active, persistent, id, version, url)
+ ClusterInitialStatusBody(
+ ProtocolVersion(), version, active, id, store, start, end)
);
- if (initMap.transitionToComplete()) initMapCompleted(l);
+ if (initMap.transitionToComplete()) {
+ QPID_LOG(debug, *this << " initial status map complete. ");
+ initMapCompleted(l);
+ }
}
void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
@@ -650,7 +715,7 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
memberUpdate(l);
if (state == CATCHUP && id == self) {
setReady(l);
- QPID_LOG(notice, *this << " caught up, active cluster member.");
+ QPID_LOG(notice, *this << " caught up.");
}
}
@@ -770,8 +835,7 @@ void Cluster::updateOutDone(Lock& l) {
QPID_LOG(notice, *this << " update sent");
assert(state == UPDATER);
state = READY;
- mcast.release();
- deliverEventQueue.start(); // Start processing events again.
+ deliverEventQueue.start(); // Start processing events again.
makeOffer(map.firstJoiner(), l); // Try another offer
}
@@ -781,8 +845,10 @@ void Cluster::updateOutError(const std::exception& e) {
updateOutDone(l);
}
-void Cluster ::shutdown(const MemberId& id, Lock& l) {
- QPID_LOG(notice, *this << " received shutdown from " << id);
+void Cluster ::shutdown(const MemberId& , Lock& l) {
+ QPID_LOG(notice, *this << " cluster shut down by administrator.");
+ // FIXME aconway 2009-11-20: incorrect! Need to pass UUID on shutdown command.
+ if (store.hasStore()) store.clean(Uuid(true));
leave(l);
}