diff options
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 33 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/InitialStatusMap.cpp | 40 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/InitialStatusMap.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/StoreStatus.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/StoreStatus.h | 1 |
6 files changed, 53 insertions, 28 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 282b639f61..fa53fc5475 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -175,7 +175,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 835547; +const uint32_t Cluster::CLUSTER_VERSION = 884125; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -202,7 +202,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { cluster.errorCheck(member, type, frameSeq, l); } - void shutdown() { cluster.shutdown(member, l); } + void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } }; @@ -287,7 +287,7 @@ void Cluster::initialize() { default: assert(0); } - QPID_LOG(notice, *this << (state == READY ? "joined" : "joining") << " cluster " << name << " with url=" << myUrl); + QPID_LOG(notice, *this << (state == READY ? " joined" : " joining") << " cluster " << name); broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); broker.setExpiryPolicy(expiryPolicy); dispatcher.start(); @@ -601,6 +601,7 @@ void Cluster::initMapCompleted(Lock& l) { // Called on completion of the initial status map. if (state == INIT) { // We have status for all members so we can make join descisions. + initMap.checkConsistent(); elders = initMap.getElders(); QPID_LOG(debug, *this << " elders: " << elders); if (!elders.empty()) { // I'm not the elder, I don't handle links & replication. @@ -611,17 +612,8 @@ void Cluster::initMapCompleted(Lock& l) { else { QPID_LOG(info, this << " active for links."); } - // Check that cluster ID matches persistent store. - Uuid agreedId = initMap.getClusterId(); - if (store.hasStore()) { - Uuid storeId = store.getClusterId(); - if (storeId && storeId != agreedId) - throw Exception( - QPID_MSG("Persistent cluster-id " << storeId - << " doesn't match cluster " << agreedId)); - store.dirty(agreedId); - } - setClusterId(agreedId, l); + setClusterId(initMap.getClusterId(), l); + if (store.hasStore()) store.dirty(clusterId); if (initMap.isUpdateNeeded()) { // Joining established cluster. broker.setRecovery(false); // Ditch my current store. @@ -822,13 +814,13 @@ void Cluster::checkUpdateIn(Lock&) { mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; discarding = false; // ok to set, we're stalled for update. - QPID_LOG(notice, *this << " update complete, starting catch-up, members: " << map); + QPID_LOG(notice, *this << " update complete, starting catch-up."); deliverEventQueue.start(); } else if (updateRetracted) { // Update was retracted, request another update updateRetracted = false; state = JOINER; - QPID_LOG(notice, *this << " update retracted, sending new update request"); + QPID_LOG(notice, *this << " update retracted, sending new update request."); mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); deliverEventQueue.start(); } @@ -853,10 +845,9 @@ void Cluster::updateOutError(const std::exception& e) { updateOutDone(l); } -void Cluster ::shutdown(const MemberId& , Lock& l) { +void Cluster ::shutdown(const MemberId& , const Uuid& id, Lock& l) { QPID_LOG(notice, *this << " cluster shut down by administrator."); - // FIXME aconway 2009-11-20: incorrect! Need to pass UUID on shutdown command. - if (store.hasStore()) store.clean(Uuid(true)); + if (store.hasStore()) store.clean(Uuid(id)); leave(l); } @@ -885,13 +876,13 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, s } void Cluster::stopClusterNode(Lock& l) { - QPID_LOG(notice, *this << " stopped by admin"); + QPID_LOG(notice, *this << " cluster member stopped by administrator."); leave(l); } void Cluster::stopFullCluster(Lock& ) { QPID_LOG(notice, *this << " shutting down cluster " << name); - mcast.mcastControl(ClusterShutdownBody(), self); + mcast.mcastControl(ClusterShutdownBody(ProtocolVersion(), Uuid(true)), self); } void Cluster::memberUpdate(Lock& l) { diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 0f931bbe29..7872588307 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -160,7 +160,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void messageExpired(const MemberId&, uint64_t, Lock& l); void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&); - void shutdown(const MemberId&, Lock&); + void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&); // Helper functions ConnectionPtr getConnection(const EventFrame&, Lock&); diff --git a/cpp/src/qpid/cluster/InitialStatusMap.cpp b/cpp/src/qpid/cluster/InitialStatusMap.cpp index 51d6140008..a5618db3e6 100644 --- a/cpp/src/qpid/cluster/InitialStatusMap.cpp +++ b/cpp/src/qpid/cluster/InitialStatusMap.cpp @@ -29,6 +29,7 @@ namespace cluster { using namespace std; using namespace boost; using namespace framing::cluster; +using namespace framing; InitialStatusMap::InitialStatusMap(const MemberId& self_, size_t size_) : self(self_), completed(), resendNeeded(), size(size_) @@ -106,7 +107,6 @@ bool InitialStatusMap::hasStore(const Map::value_type& v) { } bool InitialStatusMap::isUpdateNeeded() { - // FIXME aconway 2009-11-20: consistency checks isComplete or here? assert(isComplete()); // We need an update if there are any active members. if (find_if(map.begin(), map.end(), &isActive) != map.end()) return true; @@ -145,7 +145,43 @@ framing::Uuid InitialStatusMap::getClusterId() { if (i != map.end()) return i->second->getClusterId(); // An active member else - return map.begin()->second->getClusterId(); + return map.begin()->second->getClusterId(); // Youngest newcomer in node-id order } +void InitialStatusMap::checkConsistent() { + assert(isComplete()); + bool persistent = (map.begin()->second->getStoreState() != STORE_STATE_NO_STORE); + Uuid clusterId; + for (Map::iterator i = map.begin(); i != map.end(); ++i) { + // Must not mix transient and persistent members. + if (persistent != (i->second->getStoreState() != STORE_STATE_NO_STORE)) + throw Exception("Mixing transient and persistent brokers in a cluster"); + // Members with non-empty stores must have same cluster-id + switch (i->second->getStoreState()) { + case STORE_STATE_NO_STORE: + case STORE_STATE_EMPTY_STORE: + break; + case STORE_STATE_DIRTY_STORE: + case STORE_STATE_CLEAN_STORE: + if (!clusterId) clusterId = i->second->getClusterId(); + assert(clusterId); + if (clusterId != i->second->getClusterId()) + throw Exception("Cluster-id mismatch, brokers belonged to different clusters."); + } + } + // If this is a newly forming cluster, clean stores must have same shutdown-id + if (find_if(map.begin(), map.end(), &isActive) == map.end()) { + Uuid shutdownId; + for (Map::iterator i = map.begin(); i != map.end(); ++i) { + if (i->second->getStoreState() == STORE_STATE_CLEAN_STORE) { + if (!shutdownId) shutdownId = i->second->getShutdownId(); + assert(shutdownId); + if (shutdownId != i->second->getShutdownId()) + throw Exception("Shutdown-id mismatch, brokers were not shut down together."); + } + } + } +} + + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/InitialStatusMap.h b/cpp/src/qpid/cluster/InitialStatusMap.h index 72963ea2bb..40fd9ee49d 100644 --- a/cpp/src/qpid/cluster/InitialStatusMap.h +++ b/cpp/src/qpid/cluster/InitialStatusMap.h @@ -56,13 +56,14 @@ class InitialStatusMap bool isUpdateNeeded(); /**@pre isComplete(). @return Cluster-wide cluster ID. */ framing::Uuid getClusterId(); + /**@pre isComplete(). @throw Exception if there are any inconsistencies. */ + void checkConsistent(); private: typedef std::map<MemberId, boost::optional<Status> > Map; static bool notInitialized(const Map::value_type&); static bool isActive(const Map::value_type&); static bool hasStore(const Map::value_type&); - void check(); Map map; MemberSet firstConfig; MemberId self; diff --git a/cpp/src/qpid/cluster/StoreStatus.cpp b/cpp/src/qpid/cluster/StoreStatus.cpp index 3602ec9188..a7da3baa50 100644 --- a/cpp/src/qpid/cluster/StoreStatus.cpp +++ b/cpp/src/qpid/cluster/StoreStatus.cpp @@ -85,8 +85,6 @@ void StoreStatus::dirty(const Uuid& clusterId_) { } void StoreStatus::clean(const Uuid& shutdownId_) { - assert(clusterId); // FIXME aconway 2009-11-20: throw exception - assert(shutdownId_); state = STORE_STATE_CLEAN_STORE; shutdownId = shutdownId_; save(); diff --git a/cpp/src/qpid/cluster/StoreStatus.h b/cpp/src/qpid/cluster/StoreStatus.h index ead30b8fb8..539f46c10b 100644 --- a/cpp/src/qpid/cluster/StoreStatus.h +++ b/cpp/src/qpid/cluster/StoreStatus.h @@ -50,7 +50,6 @@ class StoreStatus void save(); bool hasStore() { return state != framing::cluster::STORE_STATE_NO_STORE; } - bool isEmpty() { return state != framing::cluster::STORE_STATE_EMPTY_STORE; } private: framing::cluster::StoreState state; |
