diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 60 |
1 files changed, 44 insertions, 16 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 1d81a50e1c..3a0d11b5d1 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -28,6 +28,7 @@ #include "qpid/log/Statement.h" #include "qpid/memory.h" #include "qpid/shared_ptr.h" + #include <boost/bind.hpp> #include <boost/cast.hpp> #include <algorithm> @@ -57,25 +58,32 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : broker(&b), + poller(b.getPoller()), cpg(*this), name(name_), url(url_), - self(cpg.self()) + self(cpg.self()), + cpgDispatchHandle(cpg, + boost::bind(&Cluster::dispatch, this, _1), // read + 0, // write + boost::bind(&Cluster::disconnect, this, _1)) // disconnect { broker->addFinalizer(boost::bind(&Cluster::leave, this)); QPID_LOG(trace, "Joining cluster: " << name_); cpg.join(name); notify(); - dispatcher=Thread(*this); - // Wait till we show up in the cluster map. - { - Mutex::ScopedLock l(lock); - while (empty()) - lock.wait(); - } + + // FIXME aconway 2008-08-11: can we remove this loop? + // Dispatch till we show up in the cluster map. + while (empty()) + cpg.dispatchOne(); + + // Start dispatching from the poller. + cpgDispatchHandle.startWatch(poller); } -Cluster::~Cluster() {} +Cluster::~Cluster() { +} // local connection initializes plugins void Cluster::initialize(broker::Connection& c) { @@ -87,14 +95,19 @@ void Cluster::initialize(broker::Connection& c) { void Cluster::leave() { Mutex::ScopedLock l(lock); if (!broker) return; // Already left. - assert(Thread::current().id() != dispatcher.id()); // Must not be called in the dispatch thread. + // At this point the poller has already been shut down so + // no dispatches can occur thru the cpgDispatchHandle. + // + // FIXME aconway 2008-08-11: assert this is the cae. + QPID_LOG(debug, "Leaving cluster " << *this); cpg.leave(name); - // The dispatch thread sets broker=0 when the final config-change - // is delivered. - while(broker) lock.wait(); + // broker= is set to 0 when the final config-change is delivered. + while(broker) { + Mutex::ScopedUnlock u(lock); + cpg.dispatchAll(); + } cpg.shutdown(); - dispatcher.join(); } template <class T> void decodePtr(Buffer& buf, T*& ptr) { @@ -134,6 +147,13 @@ Cluster::MemberList Cluster::getMembers() const { return result; } +// ################ HERE - leaking shadow connections. +// FIXME aconway 2008-08-11: revisit memory management for shadow +// connections, what if the Connection is closed other than via +// disconnect? Dangling pointer in shadow map. Use ptr_map for shadow +// map, add deleted state to ConnectionInterceptor? Interceptors need +// to know about map? Check how Connections can be deleted. + ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) { ShadowConnectionId id(member, remotePtr); ShadowConnectionMap::iterator i = shadowConnectionMap.find(id); @@ -233,8 +253,16 @@ void Cluster::configChange( lock.notifyAll(); // Threads waiting for membership changes. } -void Cluster::run() { - cpg.dispatchBlocking(); +void Cluster::dispatch(sys::DispatchHandle& h) { + cpg.dispatchAll(); + h.rewatch(); +} + +void Cluster::disconnect(sys::DispatchHandle& h) { + h.stopWatch(); + // FIXME aconway 2008-08-11: error handling if we are disconnected. + // Kill the broker? + assert(0); } }} // namespace qpid::cluster |
