summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp67
1 files changed, 44 insertions, 23 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index f3b81c386e..3f1d79d382 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -316,7 +316,6 @@ void Cluster::initialize() {
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
broker.deferDelivery = boost::bind(&Cluster::deferDeliveryImpl, this, _1, _2);
broker.setExpiryPolicy(expiryPolicy);
- dispatcher.start();
deliverEventQueue.bypassOff();
deliverEventQueue.start();
deliverFrameQueue.bypassOff();
@@ -329,7 +328,6 @@ void Cluster::initialize() {
_qmf::Package packageInit(mAgent);
mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str());
mAgent->addObject (mgmtObject);
- mgmtObject->set_status("JOINING");
}
// Run initMapCompleted immediately to process the initial configuration
@@ -340,6 +338,8 @@ void Cluster::initialize() {
// Add finalizer last for exception safety.
broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
+ // Start dispatching CPG events.
+ dispatcher.start();
}
// Called in connection thread to insert a client connection.
@@ -595,15 +595,26 @@ void Cluster::configChange (
void Cluster::setReady(Lock&) {
state = READY;
- if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
mcast.setReady();
broker.getQueueEvents().enable();
enableClusterSafe(); // Enable cluster-safe assertions.
}
+// Set the management status from the Cluster::state.
+//
+// NOTE: Management updates are sent based on property changes. In
+// order to keep consistency across the cluster, we touch the local
+// management status property even if it is locally unchanged for any
+// event that could have cause a cluster property change on any cluster member.
+void Cluster::setMgmtStatus(Lock&) {
+ if (mgmtObject)
+ mgmtObject->set_status(state >= CATCHUP ? "ACTIVE" : "JOINING");
+}
+
void Cluster::initMapCompleted(Lock& l) {
- // Called on completion of the initial status map.
+ // Called on completion of the initial status map.
QPID_LOG(debug, *this << " initial status map complete. ");
+ setMgmtStatus(l);
if (state == PRE_INIT) {
// PRE_INIT means we're still in the earlyInitialize phase, in the constructor.
// We decide here whether we want to recover from our store.
@@ -649,6 +660,7 @@ void Cluster::initMapCompleted(Lock& l) {
discarding = false;
setReady(l);
memberUpdate(l);
+ updateMgmtMembership(l);
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
QPID_LOG(notice, *this << " joined cluster " << name);
}
@@ -695,6 +707,8 @@ void Cluster::configChange(const MemberId&,
memberUpdate(l);
if (elders.empty()) becomeElder(l);
}
+
+ updateMgmtMembership(l); // Update on every config change for consistency
}
void Cluster::becomeElder(Lock&) {
@@ -788,6 +802,9 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
} catch (const Url::Invalid& e) {
QPID_LOG(error, "Invalid URL in cluster ready command: " << url);
}
+ // Update management on every ready event to be consistent across cluster.
+ setMgmtStatus(l);
+ updateMgmtMembership(l);
}
void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) {
@@ -900,6 +917,8 @@ void Cluster::checkUpdateIn(Lock& l) {
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
memberUpdate(l);
+ // NB: don't updateMgmtMembership() here as we are not in the deliver
+ // thread. It will be updated on delivery of the "ready" we just mcast.
broker.setClusterUpdatee(false);
if (mAgent) mAgent->suppress(false); // Enable management output.
discarding = false; // ok to set, we're stalled for update.
@@ -981,11 +1000,9 @@ void Cluster::memberUpdate(Lock& l) {
// Ignore config changes while we are joining.
if (state < CATCHUP) return;
QPID_LOG(info, *this << " member update: " << map);
- std::vector<Url> urls = getUrls(l);
- std::vector<string> ids = getIds(l);
size_t aliveCount = map.aliveCount();
assert(map.isAlive(self));
- failoverExchange->updateUrls(urls);
+ failoverExchange->updateUrls(getUrls(l));
// Mark store clean if I am the only broker, dirty otherwise.
if (store.hasStore()) {
@@ -1017,22 +1034,6 @@ void Cluster::memberUpdate(Lock& l) {
}
lastAliveCount = aliveCount;
- if (mgmtObject) {
- mgmtObject->set_clusterSize(urls.size());
- string urlstr;
- for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++ ) {
- if (iter != urls.begin()) urlstr += ";";
- urlstr += iter->str();
- }
- string idstr;
- for(std::vector<string>::iterator iter = ids.begin(); iter != ids.end(); iter++ ) {
- if (iter != ids.begin()) idstr += ";";
- idstr += (*iter);
- }
- mgmtObject->set_members(urlstr);
- mgmtObject->set_memberIDs(idstr);
- }
-
// Close connections belonging to members that have left the cluster.
ConnectionMap::iterator i = connections.begin();
while (i != connections.end()) {
@@ -1045,6 +1046,26 @@ void Cluster::memberUpdate(Lock& l) {
}
}
+// See comment on Cluster::setMgmtStatus
+void Cluster::updateMgmtMembership(Lock& l) {
+ if (!mgmtObject) return;
+ std::vector<Url> urls = getUrls(l);
+ mgmtObject->set_clusterSize(urls.size());
+ string urlstr;
+ for(std::vector<Url>::iterator i = urls.begin(); i != urls.end(); i++ ) {
+ if (i != urls.begin()) urlstr += ";";
+ urlstr += i->str();
+ }
+ std::vector<string> ids = getIds(l);
+ string idstr;
+ for(std::vector<string>::iterator i = ids.begin(); i != ids.end(); i++ ) {
+ if (i != ids.begin()) idstr += ";";
+ idstr += *i;
+ }
+ mgmtObject->set_members(urlstr);
+ mgmtObject->set_memberIDs(idstr);
+}
+
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
static const char* STATE[] = {
"PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP",