summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-25 12:30:14 +0000
committerAlan Conway <aconway@apache.org>2008-09-25 12:30:14 +0000
commit48f511ecb4a772f2cf6048f9b5ddbf9a4e3f9587 (patch)
tree7b75738562cf97befd775868670c336995ba0cd1 /cpp/src/qpid/cluster/Cluster.cpp
parentcd78f5c69d70b43e5bf82fa9125eb876bc3bbc42 (diff)
downloadqpid-python-48f511ecb4a772f2cf6048f9b5ddbf9a4e3f9587.tar.gz
Enabled management, add cluster shutdown command.
Remove dead Handler methods in Cluster. Fixed SessionException handling in broker, was throwing some SessionExceptions as "unknown exception" git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@698945 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp119
1 files changed, 39 insertions, 80 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 93625af948..7edf9f9392 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -26,6 +26,7 @@
#include "qpid/framing/ClusterDumpRequestBody.h"
#include "qpid/framing/ClusterUpdateBody.h"
#include "qpid/framing/ClusterReadyBody.h"
+#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/log/Statement.h"
@@ -51,7 +52,7 @@ using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
-namespace _qmf = qmf::org::apache::qpid::cluster;
+namespace qmf = qmf::org::apache::qpid::cluster;
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
broker(b),
@@ -66,6 +67,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))),
+ mgmtObject(0),
handler(&joiningHandler),
joiningHandler(*this),
memberHandler(*this),
@@ -73,30 +75,25 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
{
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0){
- _qmf::Package packageInit(agent);
- mgmtObject = new _qmf::Cluster (agent, this, &broker,name.str(),url.str());
+ qmf::Package packageInit(agent);
+ mgmtObject = new qmf::Cluster (agent, this, &broker,name.str(),url.str());
agent->addObject (mgmtObject);
mgmtObject->set_status("JOINING");
-
+
+ // FIXME aconway 2008-09-24:
// if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined.
}
QPID_LOG(notice, self << " joining cluster " << name.str());
- broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
+ broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
cpgDispatchHandle.startWatch(poller);
cpg.join(name);
}
Cluster::~Cluster() {}
-void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
- Mutex::ScopedLock l(lock);
- handler->insert(c);
-}
+void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { handler->insert(c); }
-void Cluster::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
- Mutex::ScopedLock l(lock);
- handler->catchUpClosed(c);
-}
+void Cluster::catchUpClosed(const boost::intrusive_ptr<Connection>& c) { handler->catchUpClosed(c); }
void Cluster::erase(ConnectionId id) {
Mutex::ScopedLock l(lock);
@@ -239,10 +236,8 @@ void Cluster::dispatch(sys::DispatchHandle& h) {
}
void Cluster::disconnect(sys::DispatchHandle& ) {
- // FIXME aconway 2008-09-11: this should be logged as critical,
- // when we provide admin option to shut down cluster and let
- // members leave cleanly.
- stopClusterNode();
+ QPID_LOG(critical, self << " unexpectedly disconnected from cluster, shutting down");
+ broker.shutdown();
}
void Cluster::configChange(
@@ -265,27 +260,8 @@ void Cluster::configChange(
map.left(left, nLeft);
handler->configChange(current, nCurrent, left, nLeft, joined, nJoined);
-
- // FIXME aconway 2008-09-17: management update.
- //update mgnt stats
- updateMemberStats();
}
-void Cluster::update(const MemberId& id, const framing::FieldTable& members, uint64_t dumper) {
- Mutex::ScopedLock l(lock);
- handler->update(id, members, dumper);
-}
-
-void Cluster::dumpRequest(const MemberId& dumpee, const string& urlStr) {
- Mutex::ScopedLock l(lock);
- handler->dumpRequest(dumpee, urlStr);
-}
-
-void Cluster::ready(const MemberId& member, const std::string& url) {
- Mutex::ScopedLock l(lock);
- handler->ready(member, url);
- // FIXME aconway 2008-09-17: management update.
-}
broker::Broker& Cluster::getBroker(){ return broker; }
@@ -295,12 +271,11 @@ void Cluster::stall() {
// Stop processing connection events. We still process config changes
// and cluster controls in deliver()
connectionEventQueue.stop();
+ if (mgmtObject!=0) mgmtObject->set_status("STALLED");
// FIXME aconway 2008-09-11: Flow control, we should slow down or
// stop reading from local connections while stalled to avoid an
// unbounded queue.
- // if (mgmtObject!=0)
- // mgmtObject->set_status("STALLED");
}
void Cluster::ready() {
@@ -314,8 +289,7 @@ void Cluster::unstall() {
QPID_LOG(debug, self << " un-stalling");
handler = &memberHandler; // Member mode.
connectionEventQueue.start(poller);
- // if (mgmtObject!=0)
- // mgmtObject->set_status("ACTIVE");
+ if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
}
// Called from Broker::~Broker when broker is shut down. At this
@@ -323,61 +297,46 @@ void Cluster::unstall() {
// invoked. We must ensure that CPG has also shut down so no CPG
// callbacks will be invoked.
//
-void Cluster::shutdown() {
+void Cluster::brokerShutdown() {
QPID_LOG(notice, self << " shutting down.");
try { cpg.shutdown(); }
catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: " << e.what()); }
delete this;
}
-ManagementObject* Cluster::GetManagementObject(void) const {
- return (ManagementObject*) mgmtObject;
-}
+ManagementObject* Cluster::GetManagementObject(void) const { return mgmtObject; }
-Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& /*args*/, string&) {
- Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string&) {
QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
-
- switch (methodId)
- {
- case _qmf::Cluster::METHOD_STOPCLUSTERNODE:
- stopClusterNode();
- break;
- case _qmf::Cluster::METHOD_STOPFULLCLUSTER:
- stopFullCluster();
- break;
+ switch (methodId) {
+ case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(); break;
+ case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(); break;
+ default: return Manageable::STATUS_UNKNOWN_METHOD;
}
-
- return status;
+ return Manageable::STATUS_OK;
}
-void Cluster::stopClusterNode(void)
-{
- // FIXME aconway 2008-09-18: mgmt
- QPID_LOG(notice, self << " disconnected from cluster " << name.str());
- broker.shutdown();
+void Cluster::stopClusterNode(void) {
+ QPID_LOG(notice, self << " stopped by admin");
+ leave();
}
-void Cluster::stopFullCluster(void)
-{
- // FIXME aconway 2008-09-17: TODO
+void Cluster::stopFullCluster(void) {
+ QPID_LOG(notice, self << " sending shutdown to cluster.");
+ mcastControl(ClusterShutdownBody(), 0);
}
-void Cluster::updateMemberStats(void)
-{
- //update mgnt stats
- // FIXME aconway 2008-09-18:
-// if (mgmtObject!=0){
-// mgmtObject->set_clusterSize(size());
-// std::vector<Url> vectUrl = getUrls();
-// string urlstr;
-// for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {
-// if (iter != vectUrl.begin()) urlstr += ";";
-// urlstr += iter->str();
-// }
-// mgmtObject->set_members(urlstr);
-// }
-
+void Cluster::updateMemberStats(void) {
+ if (mgmtObject) {
+ mgmtObject->set_clusterSize(size());
+ std::vector<Url> vectUrl = getUrls();
+ string urlstr;
+ for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {
+ if (iter != vectUrl.begin()) urlstr += "\n";
+ urlstr += iter->str();
+ }
+ mgmtObject->set_members(urlstr);
+ }
}
}} // namespace qpid::cluster