diff options
| author | Alan Conway <aconway@apache.org> | 2008-09-25 12:30:14 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-09-25 12:30:14 +0000 |
| commit | 48f511ecb4a772f2cf6048f9b5ddbf9a4e3f9587 (patch) | |
| tree | 7b75738562cf97befd775868670c336995ba0cd1 /cpp/src/qpid/cluster/Cluster.cpp | |
| parent | cd78f5c69d70b43e5bf82fa9125eb876bc3bbc42 (diff) | |
| download | qpid-python-48f511ecb4a772f2cf6048f9b5ddbf9a4e3f9587.tar.gz | |
Enabled management, add cluster shutdown command.
Remove dead Handler methods in Cluster.
Fixed SessionException handling in broker, was throwing some SessionExceptions as "unknown exception"
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@698945 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 119 |
1 files changed, 39 insertions, 80 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 93625af948..7edf9f9392 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -26,6 +26,7 @@ #include "qpid/framing/ClusterDumpRequestBody.h" #include "qpid/framing/ClusterUpdateBody.h" #include "qpid/framing/ClusterReadyBody.h" +#include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/log/Statement.h" @@ -51,7 +52,7 @@ using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; -namespace _qmf = qmf::org::apache::qpid::cluster; +namespace qmf = qmf::org::apache::qpid::cluster; Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : broker(b), @@ -66,6 +67,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : boost::bind(&Cluster::disconnect, this, _1) // disconnect ), connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))), + mgmtObject(0), handler(&joiningHandler), joiningHandler(*this), memberHandler(*this), @@ -73,30 +75,25 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : { ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0){ - _qmf::Package packageInit(agent); - mgmtObject = new _qmf::Cluster (agent, this, &broker,name.str(),url.str()); + qmf::Package packageInit(agent); + mgmtObject = new qmf::Cluster (agent, this, &broker,name.str(),url.str()); agent->addObject (mgmtObject); mgmtObject->set_status("JOINING"); - + + // FIXME aconway 2008-09-24: // if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined. } QPID_LOG(notice, self << " joining cluster " << name.str()); - broker.addFinalizer(boost::bind(&Cluster::shutdown, this)); + broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); cpgDispatchHandle.startWatch(poller); cpg.join(name); } Cluster::~Cluster() {} -void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { - Mutex::ScopedLock l(lock); - handler->insert(c); -} +void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { handler->insert(c); } -void Cluster::catchUpClosed(const boost::intrusive_ptr<Connection>& c) { - Mutex::ScopedLock l(lock); - handler->catchUpClosed(c); -} +void Cluster::catchUpClosed(const boost::intrusive_ptr<Connection>& c) { handler->catchUpClosed(c); } void Cluster::erase(ConnectionId id) { Mutex::ScopedLock l(lock); @@ -239,10 +236,8 @@ void Cluster::dispatch(sys::DispatchHandle& h) { } void Cluster::disconnect(sys::DispatchHandle& ) { - // FIXME aconway 2008-09-11: this should be logged as critical, - // when we provide admin option to shut down cluster and let - // members leave cleanly. - stopClusterNode(); + QPID_LOG(critical, self << " unexpectedly disconnected from cluster, shutting down"); + broker.shutdown(); } void Cluster::configChange( @@ -265,27 +260,8 @@ void Cluster::configChange( map.left(left, nLeft); handler->configChange(current, nCurrent, left, nLeft, joined, nJoined); - - // FIXME aconway 2008-09-17: management update. - //update mgnt stats - updateMemberStats(); } -void Cluster::update(const MemberId& id, const framing::FieldTable& members, uint64_t dumper) { - Mutex::ScopedLock l(lock); - handler->update(id, members, dumper); -} - -void Cluster::dumpRequest(const MemberId& dumpee, const string& urlStr) { - Mutex::ScopedLock l(lock); - handler->dumpRequest(dumpee, urlStr); -} - -void Cluster::ready(const MemberId& member, const std::string& url) { - Mutex::ScopedLock l(lock); - handler->ready(member, url); - // FIXME aconway 2008-09-17: management update. -} broker::Broker& Cluster::getBroker(){ return broker; } @@ -295,12 +271,11 @@ void Cluster::stall() { // Stop processing connection events. We still process config changes // and cluster controls in deliver() connectionEventQueue.stop(); + if (mgmtObject!=0) mgmtObject->set_status("STALLED"); // FIXME aconway 2008-09-11: Flow control, we should slow down or // stop reading from local connections while stalled to avoid an // unbounded queue. - // if (mgmtObject!=0) - // mgmtObject->set_status("STALLED"); } void Cluster::ready() { @@ -314,8 +289,7 @@ void Cluster::unstall() { QPID_LOG(debug, self << " un-stalling"); handler = &memberHandler; // Member mode. connectionEventQueue.start(poller); - // if (mgmtObject!=0) - // mgmtObject->set_status("ACTIVE"); + if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); } // Called from Broker::~Broker when broker is shut down. At this @@ -323,61 +297,46 @@ void Cluster::unstall() { // invoked. We must ensure that CPG has also shut down so no CPG // callbacks will be invoked. // -void Cluster::shutdown() { +void Cluster::brokerShutdown() { QPID_LOG(notice, self << " shutting down."); try { cpg.shutdown(); } catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: " << e.what()); } delete this; } -ManagementObject* Cluster::GetManagementObject(void) const { - return (ManagementObject*) mgmtObject; -} +ManagementObject* Cluster::GetManagementObject(void) const { return mgmtObject; } -Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& /*args*/, string&) { - Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; +Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string&) { QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]"); - - switch (methodId) - { - case _qmf::Cluster::METHOD_STOPCLUSTERNODE: - stopClusterNode(); - break; - case _qmf::Cluster::METHOD_STOPFULLCLUSTER: - stopFullCluster(); - break; + switch (methodId) { + case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(); break; + case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(); break; + default: return Manageable::STATUS_UNKNOWN_METHOD; } - - return status; + return Manageable::STATUS_OK; } -void Cluster::stopClusterNode(void) -{ - // FIXME aconway 2008-09-18: mgmt - QPID_LOG(notice, self << " disconnected from cluster " << name.str()); - broker.shutdown(); +void Cluster::stopClusterNode(void) { + QPID_LOG(notice, self << " stopped by admin"); + leave(); } -void Cluster::stopFullCluster(void) -{ - // FIXME aconway 2008-09-17: TODO +void Cluster::stopFullCluster(void) { + QPID_LOG(notice, self << " sending shutdown to cluster."); + mcastControl(ClusterShutdownBody(), 0); } -void Cluster::updateMemberStats(void) -{ - //update mgnt stats - // FIXME aconway 2008-09-18: -// if (mgmtObject!=0){ -// mgmtObject->set_clusterSize(size()); -// std::vector<Url> vectUrl = getUrls(); -// string urlstr; -// for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) { -// if (iter != vectUrl.begin()) urlstr += ";"; -// urlstr += iter->str(); -// } -// mgmtObject->set_members(urlstr); -// } - +void Cluster::updateMemberStats(void) { + if (mgmtObject) { + mgmtObject->set_clusterSize(size()); + std::vector<Url> vectUrl = getUrls(); + string urlstr; + for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) { + if (iter != vectUrl.begin()) urlstr += "\n"; + urlstr += iter->str(); + } + mgmtObject->set_members(urlstr); + } } }} // namespace qpid::cluster |
