diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 48 |
1 files changed, 28 insertions, 20 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 07fdc6fc93..282b639f61 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -103,6 +103,7 @@ * done single-threaded, bypassing the normal PollableQueues because * the Poller is not active at this point to service them. */ +#include "qpid/Exception.h" #include "qpid/cluster/Cluster.h" #include "qpid/cluster/ClusterSettings.h" #include "qpid/cluster/Connection.h" @@ -153,15 +154,16 @@ namespace qpid { namespace cluster { +using namespace qpid; using namespace qpid::framing; using namespace qpid::sys; -using namespace std; using namespace qpid::cluster; -using namespace qpid::framing::cluster; -using qpid::management::ManagementAgent; -using qpid::management::ManagementObject; -using qpid::management::Manageable; -using qpid::management::Args; +using namespace framing::cluster; +using namespace std; +using management::ManagementAgent; +using management::ManagementObject; +using management::Manageable; +using management::Args; namespace _qmf = ::qmf::org::apache::qpid::cluster; /** @@ -184,10 +186,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } void initialStatus(uint32_t version, bool active, const Uuid& clusterId, - uint8_t storeState, const Uuid& start, const Uuid& stop) + uint8_t storeState, const Uuid& shutdownId) { cluster.initialStatus(member, version, active, clusterId, - framing::cluster::StoreState(storeState), start, stop, l); + framing::cluster::StoreState(storeState), shutdownId, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& current) { cluster.configChange(member, current, l); } @@ -254,8 +256,11 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : broker.getExchanges().registerExchange( boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); // Load my store status before we go into initialization - if (! broker::NullMessageStore::isNullStore(&broker.getStore())) + if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { store.load(); + if (store.getClusterId()) + clusterId = store.getClusterId(); // Use stored ID if there is one. + } cpg.join(name); // Pump the CPG dispatch manually till we get initialized. @@ -606,14 +611,18 @@ void Cluster::initMapCompleted(Lock& l) { else { QPID_LOG(info, this << " active for links."); } + // Check that cluster ID matches persistent store. + Uuid agreedId = initMap.getClusterId(); + if (store.hasStore()) { + Uuid storeId = store.getClusterId(); + if (storeId && storeId != agreedId) + throw Exception( + QPID_MSG("Persistent cluster-id " << storeId + << " doesn't match cluster " << agreedId)); + store.dirty(agreedId); + } + setClusterId(agreedId, l); - setClusterId(initMap.getClusterId(), l); - // FIXME aconway 2009-11-20: store id == cluster id. - // Clean up redundant copy of id in InitialStatus - // Use store ID as advertized cluster ID. - // Consistency check on cluster ID vs. locally stored ID. - // throw rathr than assert in StoreStatus. - if (store.hasStore()) store.dirty(clusterId); if (initMap.isUpdateNeeded()) { // Joining established cluster. broker.setRecovery(false); // Ditch my current store. state = JOINER; @@ -645,7 +654,7 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& mcast.mcastControl( ClusterInitialStatusBody( ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, - store.getState(), store.getStart(), store.getStop() + store.getState(), store.getShutdownId() ), self); } @@ -690,7 +699,7 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active, const framing::Uuid& id, framing::cluster::StoreState store, - const framing::Uuid& start, const framing::Uuid& end, + const framing::Uuid& shutdownId, Lock& l) { if (version != CLUSTER_VERSION) { @@ -701,8 +710,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ } initMap.received( member, - ClusterInitialStatusBody( - ProtocolVersion(), version, active, id, store, start, end) + ClusterInitialStatusBody(ProtocolVersion(), version, active, id, store, shutdownId) ); if (initMap.transitionToComplete()) { QPID_LOG(debug, *this << " initial status map complete. "); |
