diff options
| author | Alan Conway <aconway@apache.org> | 2008-10-07 17:27:06 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-10-07 17:27:06 +0000 |
| commit | 41d33af55b9fbf4c664ccb56accb1a37bd1ef006 (patch) | |
| tree | de5e5b5e431bf695b2c44e198ee93d179201a0e2 /cpp/src/qpid/cluster/Cluster.cpp | |
| parent | a653ebe5bdfad1d44a576d2ab23f7e6ea80ba96f (diff) | |
| download | qpid-python-41d33af55b9fbf4c664ccb56accb1a37bd1ef006.tar.gz | |
broker: Fixed incorrect pass-by-reference of Queue::shared_ptr in several files.
cluster: added FailoverExchange - send cluster membership to clients.
client: added FailoverListener - receive cluster updates from failover exchange.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@702552 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 36 |
1 files changed, 22 insertions, 14 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 9c503d6d13..e64692bc91 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -19,6 +19,7 @@ #include "Cluster.h" #include "Connection.h" #include "DumpClient.h" +#include "FailoverExchange.h" #include "qpid/broker/Broker.h" #include "qpid/broker/SessionState.h" @@ -109,6 +110,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : // FIXME aconway 2008-09-24: // if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined. } + failoverExchange.reset(new FailoverExchange(this)); broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); cpgDispatchHandle.startWatch(poller); cpg.join(name); @@ -331,15 +333,15 @@ void Cluster::configChange ( Mutex::ScopedLock l(lock); QPID_LOG(debug, *this << " configuration change: " << AddrList(current, nCurrent) << AddrList(left, nLeft, "( ", ")")); - map.configChange(current, nCurrent, left, nLeft, joined, nJoined); - updateMemberStats(l); + bool changed = map.configChange(current, nCurrent, left, nLeft, joined, nJoined); if (state == LEFT) return; if (!map.isAlive(memberId)) { leave(l); return; } - + if(state == INIT) { // First configChange if (map.aliveCount() == 1) { QPID_LOG(info, *this << " first in cluster at " << myUrl); map = ClusterMap(memberId, myUrl, true); + memberUpdate(l); unstall(l); } else { // Joining established group. @@ -348,6 +350,8 @@ void Cluster::configChange ( QPID_LOG(debug, *this << " send dump-request " << myUrl); } } + else if (state >= READY && changed) + memberUpdate(l); } void Cluster::dumpInDone(const ClusterMap& m) { @@ -403,8 +407,9 @@ void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) { tryMakeOffer(id, l); } -void Cluster::ready(const MemberId& id, const std::string& url, Lock&) { - map.ready(id, Url(url)); +void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { + if (map.ready(id, Url(url))) + memberUpdate(l); } void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) { @@ -454,8 +459,8 @@ void Cluster::checkDumpIn(Lock& l) { if (state == DUMPEE && dumpedMap) { map = *dumpedMap; QPID_LOG(debug, *this << " incoming dump complete. Members: " << map); - unstall(l); mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), 0, l); + unstall(l); } } @@ -488,28 +493,31 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string Lock l(lock); QPID_LOG (debug, *this << " managementMethod [id=" << methodId << "]"); switch (methodId) { - case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(); break; - case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(); break; + case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(l); break; + case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(l); break; default: return Manageable::STATUS_UNKNOWN_METHOD; } return Manageable::STATUS_OK; } -void Cluster::stopClusterNode() { +void Cluster::stopClusterNode(Lock&) { QPID_LOG(notice, *this << " stopped by admin"); leave(); } -void Cluster::stopFullCluster() { - Lock l(lock); +void Cluster::stopFullCluster(Lock& l) { QPID_LOG(notice, *this << " shutting down cluster " << name.str()); mcastControl(ClusterShutdownBody(), 0, l); } -void Cluster::updateMemberStats(Lock& l) { +void Cluster::memberUpdate(Lock& l) { + std::vector<Url> vectUrl = getUrls(l); + size_t size = vectUrl.size(); + + failoverExchange->setUrls(vectUrl); + if (mgmtObject) { - std::vector<Url> vectUrl = getUrls(l); - size_t size = vectUrl.size(); + if (lastSize != size && size == 1){ QPID_LOG(info, *this << " last node standing, updating queue policies."); broker.getQueues().updateQueueClusterState(true); |
