diff options
| author | Alan Conway <aconway@apache.org> | 2009-01-16 21:34:46 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2009-01-16 21:34:46 +0000 |
| commit | 5a0c2ddd943067fc38fde4b60632501fd793810b (patch) | |
| tree | b5d2bb1dbbb0083d35f6425a1447a61fdf8faf62 /cpp/src/qpid/cluster/Cluster.cpp | |
| parent | 53eabf3d2cd58938234017aac5901b327ad267c8 (diff) | |
| download | qpid-python-5a0c2ddd943067fc38fde4b60632501fd793810b.tar.gz | |
cluster refactor: separate out dispatch strategy, implement poller and thread dispatch.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@735151 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 44 |
1 files changed, 9 insertions, 35 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index ce564939b8..18b4a2e69c 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -85,6 +85,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_, size_t writeEstimate_, size_t mcastMax) : broker(b), + mgmtObject(0), poller(b.getPoller()), cpg(*this), name(name_), @@ -92,14 +93,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b myId(cpg.self()), readMax(readMax_), writeEstimate(writeEstimate_), - cpgDispatchHandle( - cpg, - boost::bind(&Cluster::dispatch, this, _1), // read - 0, // write - boost::bind(&Cluster::disconnect, this, _1) // disconnect - ), mcast(cpg, mcastMax, poller, boost::bind(&Cluster::leave, this)), - mgmtObject(0), + dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller), state(INIT), lastSize(0), @@ -114,7 +109,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b } broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); failoverExchange.reset(new FailoverExchange(this)); - cpgDispatchHandle.startWatch(poller); + dispatcher.start(); deliverQueue.start(); QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); if (quorum_) quorum.init(); @@ -153,14 +148,13 @@ void Cluster::leave(Lock&) { state = LEFT; QPID_LOG(notice, *this << " leaving cluster " << name); if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN"); - if (!deliverQueue.isStopped()) deliverQueue.stop(); try { cpg.leave(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error leaving process group: " << e.what()); } try { broker.shutdown(); } catch (const std::exception& e) { - QPID_LOG(critical, *this << " error during shutdown: " << e.what()); + QPID_LOG(critical, *this << " error during broker shutdown: " << e.what()); } } } @@ -202,7 +196,8 @@ void Cluster::deliver(const Event& e, Lock&) { // Entry point: called when deliverQueue has events to process. void Cluster::delivered(PollableEventQueue::Queue& events) { try { - for_each(events.begin(), events.end(), boost::bind(&Cluster::deliveredEvent, this, _1)); + for (PollableEventQueue::Queue::iterator i = events.begin(); i != events.end(); ++i) + deliveredEvent(*i, i->getData()); events.clear(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error in cluster delivery: " << e.what()); @@ -210,8 +205,8 @@ void Cluster::delivered(PollableEventQueue::Queue& events) { } } -void Cluster::deliveredEvent(const Event& e) { - Buffer buf(e); +void Cluster::deliveredEvent(const EventHeader& e, const char* data) { + Buffer buf(const_cast<char*>(data), e.getSize()); AMQFrame frame; if (e.isCluster()) { while (frame.decode(buf)) { @@ -270,27 +265,6 @@ ostream& operator<<(ostream& o, const AddrList& a) { return o << a.suffix; } -// Entry point: called by IO to dispatch CPG events. -void Cluster::dispatch(sys::DispatchHandle& h) { - try { - cpg.dispatchAll(); - h.rewatch(); - } catch (const std::exception& e) { - QPID_LOG(critical, *this << " error in cluster dispatch: " << e.what()); - leave(); - } -} - -// Entry point: called if disconnected from CPG. -void Cluster::disconnect(sys::DispatchHandle& ) { - QPID_LOG(critical, *this << " error disconnected from cluster"); - try { - broker.shutdown(); - } catch (const std::exception& e) { - QPID_LOG(error, *this << " error in shutdown: " << e.what()); - } -} - void Cluster::configChange ( cpg_handle_t /*handle*/, cpg_name */*group*/, @@ -358,7 +332,7 @@ void Cluster::brokerShutdown() { if (state != LEFT) { try { cpg.shutdown(); } catch (const std::exception& e) { - QPID_LOG(error, *this << " during shutdown: " << e.what()); + QPID_LOG(error, *this << " shutting down CPG: " << e.what()); } } delete this; |
