summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp60
1 files changed, 44 insertions, 16 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 1d81a50e1c..3a0d11b5d1 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -28,6 +28,7 @@
#include "qpid/log/Statement.h"
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
+
#include <boost/bind.hpp>
#include <boost/cast.hpp>
#include <algorithm>
@@ -57,25 +58,32 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
broker(&b),
+ poller(b.getPoller()),
cpg(*this),
name(name_),
url(url_),
- self(cpg.self())
+ self(cpg.self()),
+ cpgDispatchHandle(cpg,
+ boost::bind(&Cluster::dispatch, this, _1), // read
+ 0, // write
+ boost::bind(&Cluster::disconnect, this, _1)) // disconnect
{
broker->addFinalizer(boost::bind(&Cluster::leave, this));
QPID_LOG(trace, "Joining cluster: " << name_);
cpg.join(name);
notify();
- dispatcher=Thread(*this);
- // Wait till we show up in the cluster map.
- {
- Mutex::ScopedLock l(lock);
- while (empty())
- lock.wait();
- }
+
+ // FIXME aconway 2008-08-11: can we remove this loop?
+ // Dispatch till we show up in the cluster map.
+ while (empty())
+ cpg.dispatchOne();
+
+ // Start dispatching from the poller.
+ cpgDispatchHandle.startWatch(poller);
}
-Cluster::~Cluster() {}
+Cluster::~Cluster() {
+}
// local connection initializes plugins
void Cluster::initialize(broker::Connection& c) {
@@ -87,14 +95,19 @@ void Cluster::initialize(broker::Connection& c) {
void Cluster::leave() {
Mutex::ScopedLock l(lock);
if (!broker) return; // Already left.
- assert(Thread::current().id() != dispatcher.id()); // Must not be called in the dispatch thread.
+ // At this point the poller has already been shut down so
+ // no dispatches can occur thru the cpgDispatchHandle.
+ //
+ // FIXME aconway 2008-08-11: assert this is the cae.
+
QPID_LOG(debug, "Leaving cluster " << *this);
cpg.leave(name);
- // The dispatch thread sets broker=0 when the final config-change
- // is delivered.
- while(broker) lock.wait();
+ // broker= is set to 0 when the final config-change is delivered.
+ while(broker) {
+ Mutex::ScopedUnlock u(lock);
+ cpg.dispatchAll();
+ }
cpg.shutdown();
- dispatcher.join();
}
template <class T> void decodePtr(Buffer& buf, T*& ptr) {
@@ -134,6 +147,13 @@ Cluster::MemberList Cluster::getMembers() const {
return result;
}
+// ################ HERE - leaking shadow connections.
+// FIXME aconway 2008-08-11: revisit memory management for shadow
+// connections, what if the Connection is closed other than via
+// disconnect? Dangling pointer in shadow map. Use ptr_map for shadow
+// map, add deleted state to ConnectionInterceptor? Interceptors need
+// to know about map? Check how Connections can be deleted.
+
ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) {
ShadowConnectionId id(member, remotePtr);
ShadowConnectionMap::iterator i = shadowConnectionMap.find(id);
@@ -233,8 +253,16 @@ void Cluster::configChange(
lock.notifyAll(); // Threads waiting for membership changes.
}
-void Cluster::run() {
- cpg.dispatchBlocking();
+void Cluster::dispatch(sys::DispatchHandle& h) {
+ cpg.dispatchAll();
+ h.rewatch();
+}
+
+void Cluster::disconnect(sys::DispatchHandle& h) {
+ h.stopWatch();
+ // FIXME aconway 2008-08-11: error handling if we are disconnected.
+ // Kill the broker?
+ assert(0);
}
}} // namespace qpid::cluster