diff options
author | Alan Conway <aconway@apache.org> | 2010-07-23 01:48:59 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-07-23 01:48:59 +0000 |
commit | 0e500f1263b0d4fcee94c948b15afca719cf8616 (patch) | |
tree | c575e633ec7f3d20436ddf3c5a6a822034d8d52a /cpp | |
parent | a621976b2626148ad00b4909198d9c9d6ae9426d (diff) | |
download | qpid-python-0e500f1263b0d4fcee94c948b15afca719cf8616.tar.gz |
Race condition in cluster+management, inconsistent errors like:
"confirmed < (2097+0) but only sent < (2096+0)"
Management messages are generated if a managed objects properties have
changed since the last update. Properties of the cluster object
(members and status) were sometimes being changed outside the delivery
context which could create inconsistencies in the cluster.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@966933 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 67 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 2 |
2 files changed, 46 insertions, 23 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index f3b81c386e..3f1d79d382 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -316,7 +316,6 @@ void Cluster::initialize() { broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); broker.deferDelivery = boost::bind(&Cluster::deferDeliveryImpl, this, _1, _2); broker.setExpiryPolicy(expiryPolicy); - dispatcher.start(); deliverEventQueue.bypassOff(); deliverEventQueue.start(); deliverFrameQueue.bypassOff(); @@ -329,7 +328,6 @@ void Cluster::initialize() { _qmf::Package packageInit(mAgent); mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str()); mAgent->addObject (mgmtObject); - mgmtObject->set_status("JOINING"); } // Run initMapCompleted immediately to process the initial configuration @@ -340,6 +338,8 @@ void Cluster::initialize() { // Add finalizer last for exception safety. broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); + // Start dispatching CPG events. + dispatcher.start(); } // Called in connection thread to insert a client connection. @@ -595,15 +595,26 @@ void Cluster::configChange ( void Cluster::setReady(Lock&) { state = READY; - if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); mcast.setReady(); broker.getQueueEvents().enable(); enableClusterSafe(); // Enable cluster-safe assertions. } +// Set the management status from the Cluster::state. +// +// NOTE: Management updates are sent based on property changes. In +// order to keep consistency across the cluster, we touch the local +// management status property even if it is locally unchanged for any +// event that could have cause a cluster property change on any cluster member. +void Cluster::setMgmtStatus(Lock&) { + if (mgmtObject) + mgmtObject->set_status(state >= CATCHUP ? "ACTIVE" : "JOINING"); +} + void Cluster::initMapCompleted(Lock& l) { - // Called on completion of the initial status map. + // Called on completion of the initial status map. QPID_LOG(debug, *this << " initial status map complete. "); + setMgmtStatus(l); if (state == PRE_INIT) { // PRE_INIT means we're still in the earlyInitialize phase, in the constructor. // We decide here whether we want to recover from our store. @@ -649,6 +660,7 @@ void Cluster::initMapCompleted(Lock& l) { discarding = false; setReady(l); memberUpdate(l); + updateMgmtMembership(l); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); QPID_LOG(notice, *this << " joined cluster " << name); } @@ -695,6 +707,8 @@ void Cluster::configChange(const MemberId&, memberUpdate(l); if (elders.empty()) becomeElder(l); } + + updateMgmtMembership(l); // Update on every config change for consistency } void Cluster::becomeElder(Lock&) { @@ -788,6 +802,9 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { } catch (const Url::Invalid& e) { QPID_LOG(error, "Invalid URL in cluster ready command: " << url); } + // Update management on every ready event to be consistent across cluster. + setMgmtStatus(l); + updateMgmtMembership(l); } void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) { @@ -900,6 +917,8 @@ void Cluster::checkUpdateIn(Lock& l) { mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; memberUpdate(l); + // NB: don't updateMgmtMembership() here as we are not in the deliver + // thread. It will be updated on delivery of the "ready" we just mcast. broker.setClusterUpdatee(false); if (mAgent) mAgent->suppress(false); // Enable management output. discarding = false; // ok to set, we're stalled for update. @@ -981,11 +1000,9 @@ void Cluster::memberUpdate(Lock& l) { // Ignore config changes while we are joining. if (state < CATCHUP) return; QPID_LOG(info, *this << " member update: " << map); - std::vector<Url> urls = getUrls(l); - std::vector<string> ids = getIds(l); size_t aliveCount = map.aliveCount(); assert(map.isAlive(self)); - failoverExchange->updateUrls(urls); + failoverExchange->updateUrls(getUrls(l)); // Mark store clean if I am the only broker, dirty otherwise. if (store.hasStore()) { @@ -1017,22 +1034,6 @@ void Cluster::memberUpdate(Lock& l) { } lastAliveCount = aliveCount; - if (mgmtObject) { - mgmtObject->set_clusterSize(urls.size()); - string urlstr; - for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++ ) { - if (iter != urls.begin()) urlstr += ";"; - urlstr += iter->str(); - } - string idstr; - for(std::vector<string>::iterator iter = ids.begin(); iter != ids.end(); iter++ ) { - if (iter != ids.begin()) idstr += ";"; - idstr += (*iter); - } - mgmtObject->set_members(urlstr); - mgmtObject->set_memberIDs(idstr); - } - // Close connections belonging to members that have left the cluster. ConnectionMap::iterator i = connections.begin(); while (i != connections.end()) { @@ -1045,6 +1046,26 @@ void Cluster::memberUpdate(Lock& l) { } } +// See comment on Cluster::setMgmtStatus +void Cluster::updateMgmtMembership(Lock& l) { + if (!mgmtObject) return; + std::vector<Url> urls = getUrls(l); + mgmtObject->set_clusterSize(urls.size()); + string urlstr; + for(std::vector<Url>::iterator i = urls.begin(); i != urls.end(); i++ ) { + if (i != urls.begin()) urlstr += ";"; + urlstr += i->str(); + } + std::vector<string> ids = getIds(l); + string idstr; + for(std::vector<string>::iterator i = ids.begin(); i != ids.end(); i++ ) { + if (i != ids.begin()) idstr += ";"; + idstr += *i; + } + mgmtObject->set_members(urlstr); + mgmtObject->set_memberIDs(idstr); +} + std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { static const char* STATE[] = { "PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP", diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 5668d04996..1f8fd447a3 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -196,6 +196,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { void requestUpdate(Lock& ); void initMapCompleted(Lock&); void becomeElder(Lock&); + void setMgmtStatus(Lock&); + void updateMgmtMembership(Lock&); // == Called in CPG dispatch thread void deliver( // CPG deliver callback. |