summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-07-23 01:48:59 +0000
committerAlan Conway <aconway@apache.org>2010-07-23 01:48:59 +0000
commit0e500f1263b0d4fcee94c948b15afca719cf8616 (patch)
treec575e633ec7f3d20436ddf3c5a6a822034d8d52a /cpp
parenta621976b2626148ad00b4909198d9c9d6ae9426d (diff)
downloadqpid-python-0e500f1263b0d4fcee94c948b15afca719cf8616.tar.gz
Race condition in cluster+management, inconsistent errors like:
"confirmed < (2097+0) but only sent < (2096+0)" Management messages are generated if a managed objects properties have changed since the last update. Properties of the cluster object (members and status) were sometimes being changed outside the delivery context which could create inconsistencies in the cluster. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@966933 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp67
-rw-r--r--cpp/src/qpid/cluster/Cluster.h2
2 files changed, 46 insertions, 23 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index f3b81c386e..3f1d79d382 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -316,7 +316,6 @@ void Cluster::initialize() {
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
broker.deferDelivery = boost::bind(&Cluster::deferDeliveryImpl, this, _1, _2);
broker.setExpiryPolicy(expiryPolicy);
- dispatcher.start();
deliverEventQueue.bypassOff();
deliverEventQueue.start();
deliverFrameQueue.bypassOff();
@@ -329,7 +328,6 @@ void Cluster::initialize() {
_qmf::Package packageInit(mAgent);
mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str());
mAgent->addObject (mgmtObject);
- mgmtObject->set_status("JOINING");
}
// Run initMapCompleted immediately to process the initial configuration
@@ -340,6 +338,8 @@ void Cluster::initialize() {
// Add finalizer last for exception safety.
broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
+ // Start dispatching CPG events.
+ dispatcher.start();
}
// Called in connection thread to insert a client connection.
@@ -595,15 +595,26 @@ void Cluster::configChange (
void Cluster::setReady(Lock&) {
state = READY;
- if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
mcast.setReady();
broker.getQueueEvents().enable();
enableClusterSafe(); // Enable cluster-safe assertions.
}
+// Set the management status from the Cluster::state.
+//
+// NOTE: Management updates are sent based on property changes. In
+// order to keep consistency across the cluster, we touch the local
+// management status property even if it is locally unchanged for any
+// event that could have cause a cluster property change on any cluster member.
+void Cluster::setMgmtStatus(Lock&) {
+ if (mgmtObject)
+ mgmtObject->set_status(state >= CATCHUP ? "ACTIVE" : "JOINING");
+}
+
void Cluster::initMapCompleted(Lock& l) {
- // Called on completion of the initial status map.
+ // Called on completion of the initial status map.
QPID_LOG(debug, *this << " initial status map complete. ");
+ setMgmtStatus(l);
if (state == PRE_INIT) {
// PRE_INIT means we're still in the earlyInitialize phase, in the constructor.
// We decide here whether we want to recover from our store.
@@ -649,6 +660,7 @@ void Cluster::initMapCompleted(Lock& l) {
discarding = false;
setReady(l);
memberUpdate(l);
+ updateMgmtMembership(l);
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
QPID_LOG(notice, *this << " joined cluster " << name);
}
@@ -695,6 +707,8 @@ void Cluster::configChange(const MemberId&,
memberUpdate(l);
if (elders.empty()) becomeElder(l);
}
+
+ updateMgmtMembership(l); // Update on every config change for consistency
}
void Cluster::becomeElder(Lock&) {
@@ -788,6 +802,9 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
} catch (const Url::Invalid& e) {
QPID_LOG(error, "Invalid URL in cluster ready command: " << url);
}
+ // Update management on every ready event to be consistent across cluster.
+ setMgmtStatus(l);
+ updateMgmtMembership(l);
}
void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) {
@@ -900,6 +917,8 @@ void Cluster::checkUpdateIn(Lock& l) {
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
memberUpdate(l);
+ // NB: don't updateMgmtMembership() here as we are not in the deliver
+ // thread. It will be updated on delivery of the "ready" we just mcast.
broker.setClusterUpdatee(false);
if (mAgent) mAgent->suppress(false); // Enable management output.
discarding = false; // ok to set, we're stalled for update.
@@ -981,11 +1000,9 @@ void Cluster::memberUpdate(Lock& l) {
// Ignore config changes while we are joining.
if (state < CATCHUP) return;
QPID_LOG(info, *this << " member update: " << map);
- std::vector<Url> urls = getUrls(l);
- std::vector<string> ids = getIds(l);
size_t aliveCount = map.aliveCount();
assert(map.isAlive(self));
- failoverExchange->updateUrls(urls);
+ failoverExchange->updateUrls(getUrls(l));
// Mark store clean if I am the only broker, dirty otherwise.
if (store.hasStore()) {
@@ -1017,22 +1034,6 @@ void Cluster::memberUpdate(Lock& l) {
}
lastAliveCount = aliveCount;
- if (mgmtObject) {
- mgmtObject->set_clusterSize(urls.size());
- string urlstr;
- for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++ ) {
- if (iter != urls.begin()) urlstr += ";";
- urlstr += iter->str();
- }
- string idstr;
- for(std::vector<string>::iterator iter = ids.begin(); iter != ids.end(); iter++ ) {
- if (iter != ids.begin()) idstr += ";";
- idstr += (*iter);
- }
- mgmtObject->set_members(urlstr);
- mgmtObject->set_memberIDs(idstr);
- }
-
// Close connections belonging to members that have left the cluster.
ConnectionMap::iterator i = connections.begin();
while (i != connections.end()) {
@@ -1045,6 +1046,26 @@ void Cluster::memberUpdate(Lock& l) {
}
}
+// See comment on Cluster::setMgmtStatus
+void Cluster::updateMgmtMembership(Lock& l) {
+ if (!mgmtObject) return;
+ std::vector<Url> urls = getUrls(l);
+ mgmtObject->set_clusterSize(urls.size());
+ string urlstr;
+ for(std::vector<Url>::iterator i = urls.begin(); i != urls.end(); i++ ) {
+ if (i != urls.begin()) urlstr += ";";
+ urlstr += i->str();
+ }
+ std::vector<string> ids = getIds(l);
+ string idstr;
+ for(std::vector<string>::iterator i = ids.begin(); i != ids.end(); i++ ) {
+ if (i != ids.begin()) idstr += ";";
+ idstr += *i;
+ }
+ mgmtObject->set_members(urlstr);
+ mgmtObject->set_memberIDs(idstr);
+}
+
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
static const char* STATE[] = {
"PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP",
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 5668d04996..1f8fd447a3 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -196,6 +196,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void requestUpdate(Lock& );
void initMapCompleted(Lock&);
void becomeElder(Lock&);
+ void setMgmtStatus(Lock&);
+ void updateMgmtMembership(Lock&);
// == Called in CPG dispatch thread
void deliver( // CPG deliver callback.