diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 33 |
1 files changed, 12 insertions, 21 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 282b639f61..fa53fc5475 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -175,7 +175,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 835547; +const uint32_t Cluster::CLUSTER_VERSION = 884125; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -202,7 +202,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { cluster.errorCheck(member, type, frameSeq, l); } - void shutdown() { cluster.shutdown(member, l); } + void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } }; @@ -287,7 +287,7 @@ void Cluster::initialize() { default: assert(0); } - QPID_LOG(notice, *this << (state == READY ? "joined" : "joining") << " cluster " << name << " with url=" << myUrl); + QPID_LOG(notice, *this << (state == READY ? " joined" : " joining") << " cluster " << name); broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); broker.setExpiryPolicy(expiryPolicy); dispatcher.start(); @@ -601,6 +601,7 @@ void Cluster::initMapCompleted(Lock& l) { // Called on completion of the initial status map. if (state == INIT) { // We have status for all members so we can make join descisions. + initMap.checkConsistent(); elders = initMap.getElders(); QPID_LOG(debug, *this << " elders: " << elders); if (!elders.empty()) { // I'm not the elder, I don't handle links & replication. @@ -611,17 +612,8 @@ void Cluster::initMapCompleted(Lock& l) { else { QPID_LOG(info, this << " active for links."); } - // Check that cluster ID matches persistent store. - Uuid agreedId = initMap.getClusterId(); - if (store.hasStore()) { - Uuid storeId = store.getClusterId(); - if (storeId && storeId != agreedId) - throw Exception( - QPID_MSG("Persistent cluster-id " << storeId - << " doesn't match cluster " << agreedId)); - store.dirty(agreedId); - } - setClusterId(agreedId, l); + setClusterId(initMap.getClusterId(), l); + if (store.hasStore()) store.dirty(clusterId); if (initMap.isUpdateNeeded()) { // Joining established cluster. broker.setRecovery(false); // Ditch my current store. @@ -822,13 +814,13 @@ void Cluster::checkUpdateIn(Lock&) { mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; discarding = false; // ok to set, we're stalled for update. - QPID_LOG(notice, *this << " update complete, starting catch-up, members: " << map); + QPID_LOG(notice, *this << " update complete, starting catch-up."); deliverEventQueue.start(); } else if (updateRetracted) { // Update was retracted, request another update updateRetracted = false; state = JOINER; - QPID_LOG(notice, *this << " update retracted, sending new update request"); + QPID_LOG(notice, *this << " update retracted, sending new update request."); mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); deliverEventQueue.start(); } @@ -853,10 +845,9 @@ void Cluster::updateOutError(const std::exception& e) { updateOutDone(l); } -void Cluster ::shutdown(const MemberId& , Lock& l) { +void Cluster ::shutdown(const MemberId& , const Uuid& id, Lock& l) { QPID_LOG(notice, *this << " cluster shut down by administrator."); - // FIXME aconway 2009-11-20: incorrect! Need to pass UUID on shutdown command. - if (store.hasStore()) store.clean(Uuid(true)); + if (store.hasStore()) store.clean(Uuid(id)); leave(l); } @@ -885,13 +876,13 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, s } void Cluster::stopClusterNode(Lock& l) { - QPID_LOG(notice, *this << " stopped by admin"); + QPID_LOG(notice, *this << " cluster member stopped by administrator."); leave(l); } void Cluster::stopFullCluster(Lock& ) { QPID_LOG(notice, *this << " shutting down cluster " << name); - mcast.mcastControl(ClusterShutdownBody(), self); + mcast.mcastControl(ClusterShutdownBody(ProtocolVersion(), Uuid(true)), self); } void Cluster::memberUpdate(Lock& l) { |
