summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-07 17:27:06 +0000
committerAlan Conway <aconway@apache.org>2008-10-07 17:27:06 +0000
commit41d33af55b9fbf4c664ccb56accb1a37bd1ef006 (patch)
treede5e5b5e431bf695b2c44e198ee93d179201a0e2 /cpp/src/qpid/cluster/Cluster.cpp
parenta653ebe5bdfad1d44a576d2ab23f7e6ea80ba96f (diff)
downloadqpid-python-41d33af55b9fbf4c664ccb56accb1a37bd1ef006.tar.gz
broker: Fixed incorrect pass-by-reference of Queue::shared_ptr in several files.
cluster: added FailoverExchange - send cluster membership to clients. client: added FailoverListener - receive cluster updates from failover exchange. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@702552 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp36
1 files changed, 22 insertions, 14 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 9c503d6d13..e64692bc91 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -19,6 +19,7 @@
#include "Cluster.h"
#include "Connection.h"
#include "DumpClient.h"
+#include "FailoverExchange.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/SessionState.h"
@@ -109,6 +110,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
// FIXME aconway 2008-09-24:
// if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined.
}
+ failoverExchange.reset(new FailoverExchange(this));
broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
cpgDispatchHandle.startWatch(poller);
cpg.join(name);
@@ -331,15 +333,15 @@ void Cluster::configChange (
Mutex::ScopedLock l(lock);
QPID_LOG(debug, *this << " configuration change: " << AddrList(current, nCurrent)
<< AddrList(left, nLeft, "( ", ")"));
- map.configChange(current, nCurrent, left, nLeft, joined, nJoined);
- updateMemberStats(l);
+ bool changed = map.configChange(current, nCurrent, left, nLeft, joined, nJoined);
if (state == LEFT) return;
if (!map.isAlive(memberId)) { leave(l); return; }
-
+
if(state == INIT) { // First configChange
if (map.aliveCount() == 1) {
QPID_LOG(info, *this << " first in cluster at " << myUrl);
map = ClusterMap(memberId, myUrl, true);
+ memberUpdate(l);
unstall(l);
}
else { // Joining established group.
@@ -348,6 +350,8 @@ void Cluster::configChange (
QPID_LOG(debug, *this << " send dump-request " << myUrl);
}
}
+ else if (state >= READY && changed)
+ memberUpdate(l);
}
void Cluster::dumpInDone(const ClusterMap& m) {
@@ -403,8 +407,9 @@ void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) {
tryMakeOffer(id, l);
}
-void Cluster::ready(const MemberId& id, const std::string& url, Lock&) {
- map.ready(id, Url(url));
+void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
+ if (map.ready(id, Url(url)))
+ memberUpdate(l);
}
void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) {
@@ -454,8 +459,8 @@ void Cluster::checkDumpIn(Lock& l) {
if (state == DUMPEE && dumpedMap) {
map = *dumpedMap;
QPID_LOG(debug, *this << " incoming dump complete. Members: " << map);
- unstall(l);
mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), 0, l);
+ unstall(l);
}
}
@@ -488,28 +493,31 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string
Lock l(lock);
QPID_LOG (debug, *this << " managementMethod [id=" << methodId << "]");
switch (methodId) {
- case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(); break;
- case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(); break;
+ case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(l); break;
+ case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(l); break;
default: return Manageable::STATUS_UNKNOWN_METHOD;
}
return Manageable::STATUS_OK;
}
-void Cluster::stopClusterNode() {
+void Cluster::stopClusterNode(Lock&) {
QPID_LOG(notice, *this << " stopped by admin");
leave();
}
-void Cluster::stopFullCluster() {
- Lock l(lock);
+void Cluster::stopFullCluster(Lock& l) {
QPID_LOG(notice, *this << " shutting down cluster " << name.str());
mcastControl(ClusterShutdownBody(), 0, l);
}
-void Cluster::updateMemberStats(Lock& l) {
+void Cluster::memberUpdate(Lock& l) {
+ std::vector<Url> vectUrl = getUrls(l);
+ size_t size = vectUrl.size();
+
+ failoverExchange->setUrls(vectUrl);
+
if (mgmtObject) {
- std::vector<Url> vectUrl = getUrls(l);
- size_t size = vectUrl.size();
+
if (lastSize != size && size == 1){
QPID_LOG(info, *this << " last node standing, updating queue policies.");
broker.getQueues().updateQueueClusterState(true);