summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-01-16 21:34:46 +0000
committerAlan Conway <aconway@apache.org>2009-01-16 21:34:46 +0000
commit5a0c2ddd943067fc38fde4b60632501fd793810b (patch)
treeb5d2bb1dbbb0083d35f6425a1447a61fdf8faf62 /cpp/src/qpid/cluster/Cluster.cpp
parent53eabf3d2cd58938234017aac5901b327ad267c8 (diff)
downloadqpid-python-5a0c2ddd943067fc38fde4b60632501fd793810b.tar.gz
cluster refactor: separate out dispatch strategy, implement poller and thread dispatch.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@735151 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp44
1 files changed, 9 insertions, 35 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index ce564939b8..18b4a2e69c 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -85,6 +85,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_, size_t writeEstimate_, size_t mcastMax) :
broker(b),
+ mgmtObject(0),
poller(b.getPoller()),
cpg(*this),
name(name_),
@@ -92,14 +93,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
myId(cpg.self()),
readMax(readMax_),
writeEstimate(writeEstimate_),
- cpgDispatchHandle(
- cpg,
- boost::bind(&Cluster::dispatch, this, _1), // read
- 0, // write
- boost::bind(&Cluster::disconnect, this, _1) // disconnect
- ),
mcast(cpg, mcastMax, poller, boost::bind(&Cluster::leave, this)),
- mgmtObject(0),
+ dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
state(INIT),
lastSize(0),
@@ -114,7 +109,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
}
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
failoverExchange.reset(new FailoverExchange(this));
- cpgDispatchHandle.startWatch(poller);
+ dispatcher.start();
deliverQueue.start();
QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
if (quorum_) quorum.init();
@@ -153,14 +148,13 @@ void Cluster::leave(Lock&) {
state = LEFT;
QPID_LOG(notice, *this << " leaving cluster " << name);
if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
- if (!deliverQueue.isStopped()) deliverQueue.stop();
try { cpg.leave(); }
catch (const std::exception& e) {
QPID_LOG(critical, *this << " error leaving process group: " << e.what());
}
try { broker.shutdown(); }
catch (const std::exception& e) {
- QPID_LOG(critical, *this << " error during shutdown: " << e.what());
+ QPID_LOG(critical, *this << " error during broker shutdown: " << e.what());
}
}
}
@@ -202,7 +196,8 @@ void Cluster::deliver(const Event& e, Lock&) {
// Entry point: called when deliverQueue has events to process.
void Cluster::delivered(PollableEventQueue::Queue& events) {
try {
- for_each(events.begin(), events.end(), boost::bind(&Cluster::deliveredEvent, this, _1));
+ for (PollableEventQueue::Queue::iterator i = events.begin(); i != events.end(); ++i)
+ deliveredEvent(*i, i->getData());
events.clear();
} catch (const std::exception& e) {
QPID_LOG(critical, *this << " error in cluster delivery: " << e.what());
@@ -210,8 +205,8 @@ void Cluster::delivered(PollableEventQueue::Queue& events) {
}
}
-void Cluster::deliveredEvent(const Event& e) {
- Buffer buf(e);
+void Cluster::deliveredEvent(const EventHeader& e, const char* data) {
+ Buffer buf(const_cast<char*>(data), e.getSize());
AMQFrame frame;
if (e.isCluster()) {
while (frame.decode(buf)) {
@@ -270,27 +265,6 @@ ostream& operator<<(ostream& o, const AddrList& a) {
return o << a.suffix;
}
-// Entry point: called by IO to dispatch CPG events.
-void Cluster::dispatch(sys::DispatchHandle& h) {
- try {
- cpg.dispatchAll();
- h.rewatch();
- } catch (const std::exception& e) {
- QPID_LOG(critical, *this << " error in cluster dispatch: " << e.what());
- leave();
- }
-}
-
-// Entry point: called if disconnected from CPG.
-void Cluster::disconnect(sys::DispatchHandle& ) {
- QPID_LOG(critical, *this << " error disconnected from cluster");
- try {
- broker.shutdown();
- } catch (const std::exception& e) {
- QPID_LOG(error, *this << " error in shutdown: " << e.what());
- }
-}
-
void Cluster::configChange (
cpg_handle_t /*handle*/,
cpg_name */*group*/,
@@ -358,7 +332,7 @@ void Cluster::brokerShutdown() {
if (state != LEFT) {
try { cpg.shutdown(); }
catch (const std::exception& e) {
- QPID_LOG(error, *this << " during shutdown: " << e.what());
+ QPID_LOG(error, *this << " shutting down CPG: " << e.what());
}
}
delete this;