From 0e500f1263b0d4fcee94c948b15afca719cf8616 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 23 Jul 2010 01:48:59 +0000 Subject: Race condition in cluster+management, inconsistent errors like: "confirmed < (2097+0) but only sent < (2096+0)" Management messages are generated if a managed objects properties have changed since the last update. Properties of the cluster object (members and status) were sometimes being changed outside the delivery context which could create inconsistencies in the cluster. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@966933 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/cluster/Cluster.cpp | 67 ++++++++++++++++++++++++++-------------- cpp/src/qpid/cluster/Cluster.h | 2 ++ 2 files changed, 46 insertions(+), 23 deletions(-) (limited to 'cpp/src') diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index f3b81c386e..3f1d79d382 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -316,7 +316,6 @@ void Cluster::initialize() { broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); broker.deferDelivery = boost::bind(&Cluster::deferDeliveryImpl, this, _1, _2); broker.setExpiryPolicy(expiryPolicy); - dispatcher.start(); deliverEventQueue.bypassOff(); deliverEventQueue.start(); deliverFrameQueue.bypassOff(); @@ -329,7 +328,6 @@ void Cluster::initialize() { _qmf::Package packageInit(mAgent); mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str()); mAgent->addObject (mgmtObject); - mgmtObject->set_status("JOINING"); } // Run initMapCompleted immediately to process the initial configuration @@ -340,6 +338,8 @@ void Cluster::initialize() { // Add finalizer last for exception safety. broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); + // Start dispatching CPG events. + dispatcher.start(); } // Called in connection thread to insert a client connection. @@ -595,15 +595,26 @@ void Cluster::configChange ( void Cluster::setReady(Lock&) { state = READY; - if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); mcast.setReady(); broker.getQueueEvents().enable(); enableClusterSafe(); // Enable cluster-safe assertions. } +// Set the management status from the Cluster::state. +// +// NOTE: Management updates are sent based on property changes. In +// order to keep consistency across the cluster, we touch the local +// management status property even if it is locally unchanged for any +// event that could have cause a cluster property change on any cluster member. +void Cluster::setMgmtStatus(Lock&) { + if (mgmtObject) + mgmtObject->set_status(state >= CATCHUP ? "ACTIVE" : "JOINING"); +} + void Cluster::initMapCompleted(Lock& l) { - // Called on completion of the initial status map. + // Called on completion of the initial status map. QPID_LOG(debug, *this << " initial status map complete. "); + setMgmtStatus(l); if (state == PRE_INIT) { // PRE_INIT means we're still in the earlyInitialize phase, in the constructor. // We decide here whether we want to recover from our store. @@ -649,6 +660,7 @@ void Cluster::initMapCompleted(Lock& l) { discarding = false; setReady(l); memberUpdate(l); + updateMgmtMembership(l); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); QPID_LOG(notice, *this << " joined cluster " << name); } @@ -695,6 +707,8 @@ void Cluster::configChange(const MemberId&, memberUpdate(l); if (elders.empty()) becomeElder(l); } + + updateMgmtMembership(l); // Update on every config change for consistency } void Cluster::becomeElder(Lock&) { @@ -788,6 +802,9 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { } catch (const Url::Invalid& e) { QPID_LOG(error, "Invalid URL in cluster ready command: " << url); } + // Update management on every ready event to be consistent across cluster. + setMgmtStatus(l); + updateMgmtMembership(l); } void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) { @@ -900,6 +917,8 @@ void Cluster::checkUpdateIn(Lock& l) { mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; memberUpdate(l); + // NB: don't updateMgmtMembership() here as we are not in the deliver + // thread. It will be updated on delivery of the "ready" we just mcast. broker.setClusterUpdatee(false); if (mAgent) mAgent->suppress(false); // Enable management output. discarding = false; // ok to set, we're stalled for update. @@ -981,11 +1000,9 @@ void Cluster::memberUpdate(Lock& l) { // Ignore config changes while we are joining. if (state < CATCHUP) return; QPID_LOG(info, *this << " member update: " << map); - std::vector urls = getUrls(l); - std::vector ids = getIds(l); size_t aliveCount = map.aliveCount(); assert(map.isAlive(self)); - failoverExchange->updateUrls(urls); + failoverExchange->updateUrls(getUrls(l)); // Mark store clean if I am the only broker, dirty otherwise. if (store.hasStore()) { @@ -1017,22 +1034,6 @@ void Cluster::memberUpdate(Lock& l) { } lastAliveCount = aliveCount; - if (mgmtObject) { - mgmtObject->set_clusterSize(urls.size()); - string urlstr; - for(std::vector::iterator iter = urls.begin(); iter != urls.end(); iter++ ) { - if (iter != urls.begin()) urlstr += ";"; - urlstr += iter->str(); - } - string idstr; - for(std::vector::iterator iter = ids.begin(); iter != ids.end(); iter++ ) { - if (iter != ids.begin()) idstr += ";"; - idstr += (*iter); - } - mgmtObject->set_members(urlstr); - mgmtObject->set_memberIDs(idstr); - } - // Close connections belonging to members that have left the cluster. ConnectionMap::iterator i = connections.begin(); while (i != connections.end()) { @@ -1045,6 +1046,26 @@ void Cluster::memberUpdate(Lock& l) { } } +// See comment on Cluster::setMgmtStatus +void Cluster::updateMgmtMembership(Lock& l) { + if (!mgmtObject) return; + std::vector urls = getUrls(l); + mgmtObject->set_clusterSize(urls.size()); + string urlstr; + for(std::vector::iterator i = urls.begin(); i != urls.end(); i++ ) { + if (i != urls.begin()) urlstr += ";"; + urlstr += i->str(); + } + std::vector ids = getIds(l); + string idstr; + for(std::vector::iterator i = ids.begin(); i != ids.end(); i++ ) { + if (i != ids.begin()) idstr += ";"; + idstr += *i; + } + mgmtObject->set_members(urlstr); + mgmtObject->set_memberIDs(idstr); +} + std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { static const char* STATE[] = { "PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP", diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 5668d04996..1f8fd447a3 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -196,6 +196,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { void requestUpdate(Lock& ); void initMapCompleted(Lock&); void becomeElder(Lock&); + void setMgmtStatus(Lock&); + void updateMgmtMembership(Lock&); // == Called in CPG dispatch thread void deliver( // CPG deliver callback. -- cgit v1.2.1