summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp33
1 files changed, 12 insertions, 21 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 282b639f61..fa53fc5475 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -175,7 +175,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 835547;
+const uint32_t Cluster::CLUSTER_VERSION = 884125;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -202,7 +202,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
cluster.errorCheck(member, type, frameSeq, l);
}
- void shutdown() { cluster.shutdown(member, l); }
+ void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
};
@@ -287,7 +287,7 @@ void Cluster::initialize() {
default:
assert(0);
}
- QPID_LOG(notice, *this << (state == READY ? "joined" : "joining") << " cluster " << name << " with url=" << myUrl);
+ QPID_LOG(notice, *this << (state == READY ? " joined" : " joining") << " cluster " << name);
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
@@ -601,6 +601,7 @@ void Cluster::initMapCompleted(Lock& l) {
// Called on completion of the initial status map.
if (state == INIT) {
// We have status for all members so we can make join descisions.
+ initMap.checkConsistent();
elders = initMap.getElders();
QPID_LOG(debug, *this << " elders: " << elders);
if (!elders.empty()) { // I'm not the elder, I don't handle links & replication.
@@ -611,17 +612,8 @@ void Cluster::initMapCompleted(Lock& l) {
else {
QPID_LOG(info, this << " active for links.");
}
- // Check that cluster ID matches persistent store.
- Uuid agreedId = initMap.getClusterId();
- if (store.hasStore()) {
- Uuid storeId = store.getClusterId();
- if (storeId && storeId != agreedId)
- throw Exception(
- QPID_MSG("Persistent cluster-id " << storeId
- << " doesn't match cluster " << agreedId));
- store.dirty(agreedId);
- }
- setClusterId(agreedId, l);
+ setClusterId(initMap.getClusterId(), l);
+ if (store.hasStore()) store.dirty(clusterId);
if (initMap.isUpdateNeeded()) { // Joining established cluster.
broker.setRecovery(false); // Ditch my current store.
@@ -822,13 +814,13 @@ void Cluster::checkUpdateIn(Lock&) {
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
discarding = false; // ok to set, we're stalled for update.
- QPID_LOG(notice, *this << " update complete, starting catch-up, members: " << map);
+ QPID_LOG(notice, *this << " update complete, starting catch-up.");
deliverEventQueue.start();
}
else if (updateRetracted) { // Update was retracted, request another update
updateRetracted = false;
state = JOINER;
- QPID_LOG(notice, *this << " update retracted, sending new update request");
+ QPID_LOG(notice, *this << " update retracted, sending new update request.");
mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
deliverEventQueue.start();
}
@@ -853,10 +845,9 @@ void Cluster::updateOutError(const std::exception& e) {
updateOutDone(l);
}
-void Cluster ::shutdown(const MemberId& , Lock& l) {
+void Cluster ::shutdown(const MemberId& , const Uuid& id, Lock& l) {
QPID_LOG(notice, *this << " cluster shut down by administrator.");
- // FIXME aconway 2009-11-20: incorrect! Need to pass UUID on shutdown command.
- if (store.hasStore()) store.clean(Uuid(true));
+ if (store.hasStore()) store.clean(Uuid(id));
leave(l);
}
@@ -885,13 +876,13 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, s
}
void Cluster::stopClusterNode(Lock& l) {
- QPID_LOG(notice, *this << " stopped by admin");
+ QPID_LOG(notice, *this << " cluster member stopped by administrator.");
leave(l);
}
void Cluster::stopFullCluster(Lock& ) {
QPID_LOG(notice, *this << " shutting down cluster " << name);
- mcast.mcastControl(ClusterShutdownBody(), self);
+ mcast.mcastControl(ClusterShutdownBody(ProtocolVersion(), Uuid(true)), self);
}
void Cluster::memberUpdate(Lock& l) {