diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 67 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 2 |
2 files changed, 46 insertions, 23 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index f3b81c386e..3f1d79d382 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -316,7 +316,6 @@ void Cluster::initialize() { broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); broker.deferDelivery = boost::bind(&Cluster::deferDeliveryImpl, this, _1, _2); broker.setExpiryPolicy(expiryPolicy); - dispatcher.start(); deliverEventQueue.bypassOff(); deliverEventQueue.start(); deliverFrameQueue.bypassOff(); @@ -329,7 +328,6 @@ void Cluster::initialize() { _qmf::Package packageInit(mAgent); mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str()); mAgent->addObject (mgmtObject); - mgmtObject->set_status("JOINING"); } // Run initMapCompleted immediately to process the initial configuration @@ -340,6 +338,8 @@ void Cluster::initialize() { // Add finalizer last for exception safety. broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); + // Start dispatching CPG events. + dispatcher.start(); } // Called in connection thread to insert a client connection. @@ -595,15 +595,26 @@ void Cluster::configChange ( void Cluster::setReady(Lock&) { state = READY; - if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); mcast.setReady(); broker.getQueueEvents().enable(); enableClusterSafe(); // Enable cluster-safe assertions. } +// Set the management status from the Cluster::state. +// +// NOTE: Management updates are sent based on property changes. In +// order to keep consistency across the cluster, we touch the local +// management status property even if it is locally unchanged for any +// event that could have cause a cluster property change on any cluster member. +void Cluster::setMgmtStatus(Lock&) { + if (mgmtObject) + mgmtObject->set_status(state >= CATCHUP ? "ACTIVE" : "JOINING"); +} + void Cluster::initMapCompleted(Lock& l) { - // Called on completion of the initial status map. + // Called on completion of the initial status map. QPID_LOG(debug, *this << " initial status map complete. "); + setMgmtStatus(l); if (state == PRE_INIT) { // PRE_INIT means we're still in the earlyInitialize phase, in the constructor. // We decide here whether we want to recover from our store. @@ -649,6 +660,7 @@ void Cluster::initMapCompleted(Lock& l) { discarding = false; setReady(l); memberUpdate(l); + updateMgmtMembership(l); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); QPID_LOG(notice, *this << " joined cluster " << name); } @@ -695,6 +707,8 @@ void Cluster::configChange(const MemberId&, memberUpdate(l); if (elders.empty()) becomeElder(l); } + + updateMgmtMembership(l); // Update on every config change for consistency } void Cluster::becomeElder(Lock&) { @@ -788,6 +802,9 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { } catch (const Url::Invalid& e) { QPID_LOG(error, "Invalid URL in cluster ready command: " << url); } + // Update management on every ready event to be consistent across cluster. + setMgmtStatus(l); + updateMgmtMembership(l); } void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) { @@ -900,6 +917,8 @@ void Cluster::checkUpdateIn(Lock& l) { mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; memberUpdate(l); + // NB: don't updateMgmtMembership() here as we are not in the deliver + // thread. It will be updated on delivery of the "ready" we just mcast. broker.setClusterUpdatee(false); if (mAgent) mAgent->suppress(false); // Enable management output. discarding = false; // ok to set, we're stalled for update. @@ -981,11 +1000,9 @@ void Cluster::memberUpdate(Lock& l) { // Ignore config changes while we are joining. if (state < CATCHUP) return; QPID_LOG(info, *this << " member update: " << map); - std::vector<Url> urls = getUrls(l); - std::vector<string> ids = getIds(l); size_t aliveCount = map.aliveCount(); assert(map.isAlive(self)); - failoverExchange->updateUrls(urls); + failoverExchange->updateUrls(getUrls(l)); // Mark store clean if I am the only broker, dirty otherwise. if (store.hasStore()) { @@ -1017,22 +1034,6 @@ void Cluster::memberUpdate(Lock& l) { } lastAliveCount = aliveCount; - if (mgmtObject) { - mgmtObject->set_clusterSize(urls.size()); - string urlstr; - for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++ ) { - if (iter != urls.begin()) urlstr += ";"; - urlstr += iter->str(); - } - string idstr; - for(std::vector<string>::iterator iter = ids.begin(); iter != ids.end(); iter++ ) { - if (iter != ids.begin()) idstr += ";"; - idstr += (*iter); - } - mgmtObject->set_members(urlstr); - mgmtObject->set_memberIDs(idstr); - } - // Close connections belonging to members that have left the cluster. ConnectionMap::iterator i = connections.begin(); while (i != connections.end()) { @@ -1045,6 +1046,26 @@ void Cluster::memberUpdate(Lock& l) { } } +// See comment on Cluster::setMgmtStatus +void Cluster::updateMgmtMembership(Lock& l) { + if (!mgmtObject) return; + std::vector<Url> urls = getUrls(l); + mgmtObject->set_clusterSize(urls.size()); + string urlstr; + for(std::vector<Url>::iterator i = urls.begin(); i != urls.end(); i++ ) { + if (i != urls.begin()) urlstr += ";"; + urlstr += i->str(); + } + std::vector<string> ids = getIds(l); + string idstr; + for(std::vector<string>::iterator i = ids.begin(); i != ids.end(); i++ ) { + if (i != ids.begin()) idstr += ";"; + idstr += *i; + } + mgmtObject->set_members(urlstr); + mgmtObject->set_memberIDs(idstr); +} + std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { static const char* STATE[] = { "PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP", diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 5668d04996..1f8fd447a3 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -196,6 +196,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { void requestUpdate(Lock& ); void initMapCompleted(Lock&); void becomeElder(Lock&); + void setMgmtStatus(Lock&); + void updateMgmtMembership(Lock&); // == Called in CPG dispatch thread void deliver( // CPG deliver callback. |