From d40d874132bc5011a76bd883fdf9d2507a2f8149 Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Mon, 26 Jan 2009 23:17:29 +0000 Subject: Added qpid-cluster utility plus model changes to support it. Fixed a segfault during cluster member shutdown. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@737935 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/cluster/Cluster.cpp | 62 ++++++++++++++++++++++-------- cpp/src/qpid/cluster/Cluster.h | 3 ++ cpp/src/qpid/cluster/ClusterMap.cpp | 11 ++++++ cpp/src/qpid/cluster/ClusterMap.h | 1 + cpp/src/qpid/cluster/ConnectionMap.h | 5 +++ cpp/src/qpid/cluster/management-schema.xml | 10 +++-- 6 files changed, 74 insertions(+), 18 deletions(-) (limited to 'cpp/src') diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 7b40328f1c..0e1c049a9c 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -42,6 +42,7 @@ #include "qpid/memory.h" #include "qpid/shared_ptr.h" #include "qmf/org/apache/qpid/cluster/Package.h" +#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" #include #include @@ -61,7 +62,7 @@ using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; -namespace qmf = qmf::org::apache::qpid::cluster; +namespace _qmf = ::qmf::org::apache::qpid::cluster; /**@file Threading notes: @@ -102,11 +103,11 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b lastSize(0), lastBroker(false) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); - if (agent != 0){ - qmf::Package packageInit(agent); - mgmtObject = new qmf::Cluster (agent, this, &broker,name,myUrl.str()); - agent->addObject (mgmtObject); + mAgent = ManagementAgent::Singleton::getInstance(); + if (mAgent != 0){ + _qmf::Package packageInit(mAgent); + mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str()); + mAgent->addObject (mgmtObject); mgmtObject->set_status("JOINING"); } broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); @@ -132,6 +133,15 @@ void Cluster::erase(ConnectionId id) { connections.erase(id); } +std::vector Cluster::getIds() const { + Lock l(lock); + return getIds(l); +} + +std::vector Cluster::getIds(Lock&) const { + return map.memberIds(); +} + std::vector Cluster::getUrls() const { Lock l(lock); return getUrls(l); @@ -150,11 +160,11 @@ void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; QPID_LOG(notice, *this << " leaving cluster " << name); - if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN"); try { cpg.leave(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error leaving process group: " << e.what()); } + connections.clear(); try { broker.shutdown(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error during broker shutdown: " << e.what()); @@ -173,7 +183,7 @@ boost::intrusive_ptr Cluster::getConnection(const ConnectionId& conn return cp; } -void Cluster::deliver( +void Cluster::deliver( cpg_handle_t /*handle*/, cpg_name* /*group*/, uint32_t nodeid, @@ -467,16 +477,27 @@ void Cluster ::shutdown(const MemberId& id, Lock& l) { ManagementObject* Cluster::GetManagementObject() const { return mgmtObject; } -Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string&) { +Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, string&) { Lock l(lock); QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]"); switch (methodId) { - case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(l); break; - case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(l); break; - default: return Manageable::STATUS_UNKNOWN_METHOD; + case _qmf::Cluster::METHOD_STOPCLUSTERNODE : + { + _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args; + stringstream stream; + stream << myId; + if (iargs.i_brokerId == stream.str()) + stopClusterNode(l); + } + break; + case _qmf::Cluster::METHOD_STOPFULLCLUSTER : + stopFullCluster(l); + break; + default: + return Manageable::STATUS_UNKNOWN_METHOD; } return Manageable::STATUS_OK; -} +} void Cluster::stopClusterNode(Lock& l) { QPID_LOG(notice, *this << " stopped by admin"); @@ -491,6 +512,7 @@ void Cluster::stopFullCluster(Lock& ) { void Cluster::memberUpdate(Lock& l) { QPID_LOG(info, *this << " member update: " << map); std::vector urls = getUrls(l); + std::vector ids = getIds(l); size_t size = urls.size(); failoverExchange->setUrls(urls); @@ -512,10 +534,16 @@ void Cluster::memberUpdate(Lock& l) { mgmtObject->set_clusterSize(size); string urlstr; for(std::vector::iterator iter = urls.begin(); iter != urls.end(); iter++ ) { - if (iter != urls.begin()) urlstr += "\n"; + if (iter != urls.begin()) urlstr += ";"; urlstr += iter->str(); } + string idstr; + for(std::vector::iterator iter = ids.begin(); iter != ids.end(); iter++ ) { + if (iter != ids.begin()) idstr += ";"; + idstr += (*iter); + } mgmtObject->set_members(urlstr); + mgmtObject->set_memberIDs(idstr); } // Close connections belonging to members that have now been excluded @@ -545,8 +573,12 @@ void Cluster::checkQuorum() { void Cluster::setClusterId(const Uuid& uuid) { clusterId = uuid; - if (mgmtObject) + if (mgmtObject) { + stringstream stream; + stream << myId; mgmtObject->set_clusterID(clusterId.str()); + mgmtObject->set_memberID(stream.str()); + } QPID_LOG(debug, *this << " cluster-id = " << clusterId); } diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 8d235c7caf..ecd63a866e 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -80,6 +80,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void erase(ConnectionId); // URLs of current cluster members - called in connection threads. + std::vector getIds() const; std::vector getUrls() const; boost::shared_ptr getFailoverExchange() const { return failoverExchange; } @@ -111,6 +112,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { // a Lock to call the unlocked functions. void leave(Lock&); + std::vector getIds(Lock&) const; std::vector getUrls(Lock&) const; // Make an offer if we can - called in deliver thread. @@ -185,6 +187,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { framing::Uuid clusterId; NoOpConnectionOutputHandler shadowOut; ClusterMap::Set myElders; + qpid::management::ManagementAgent* mAgent; // Thread safe members Multicaster mcast; diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp index cc9ea29093..b00699c903 100644 --- a/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/cpp/src/qpid/cluster/ClusterMap.cpp @@ -107,6 +107,17 @@ MemberId ClusterMap::firstNewbie() const { return newbies.empty() ? MemberId() : newbies.begin()->first; } +std::vector ClusterMap::memberIds() const { + std::vector ids; + for (Map::const_iterator iter = members.begin(); + iter != members.end(); iter++) { + std::stringstream stream; + stream << iter->first; + ids.push_back(stream.str()); + } + return ids; +} + std::vector ClusterMap::memberUrls() const { std::vector urls(members.size()); std::transform(members.begin(), members.end(), urls.begin(), diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h index 507fee9a72..1893d0e796 100644 --- a/cpp/src/qpid/cluster/ClusterMap.h +++ b/cpp/src/qpid/cluster/ClusterMap.h @@ -75,6 +75,7 @@ class ClusterMap { size_t aliveCount() const { return alive.size(); } size_t memberCount() const { return members.size(); } + std::vector memberIds() const; std::vector memberUrls() const; Set getAlive() const; diff --git a/cpp/src/qpid/cluster/ConnectionMap.h b/cpp/src/qpid/cluster/ConnectionMap.h index f1862e2e75..c355074e75 100644 --- a/cpp/src/qpid/cluster/ConnectionMap.h +++ b/cpp/src/qpid/cluster/ConnectionMap.h @@ -75,6 +75,11 @@ class ConnectionMap } } + void clear() { + ScopedLock l(lock); + map.clear(); + } + size_t size() const { return map.size(); } private: typedef std::map Map; diff --git a/cpp/src/qpid/cluster/management-schema.xml b/cpp/src/qpid/cluster/management-schema.xml index da19387cc6..a6292e9113 100644 --- a/cpp/src/qpid/cluster/management-schema.xml +++ b/cpp/src/qpid/cluster/management-schema.xml @@ -40,13 +40,17 @@ If access rights are omitted for a property, they are assumed to be RO. - + + - + + - + + + -- cgit v1.2.1