diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 33 |
1 files changed, 6 insertions, 27 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index d97a840f82..4ea77e7fbf 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -23,6 +23,7 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/log/Statement.h" +#include "qpid/memory.h" #include <boost/bind.hpp> #include <boost/scoped_array.hpp> #include <algorithm> @@ -36,25 +37,12 @@ using namespace qpid::sys; using namespace std; using broker::Connection; -namespace { - -// FIXME aconway 2008-07-01: sending every frame to cluster, -// serializing all processing in cluster deliver thread. -// This will not perform at all, but provides a correct starting point. -// -// TODO: -// - Fake "Connection" for cluster: owns shadow sessions. -// - Maintain shadow sessions. -// - Apply foreign frames to shadow sessions. -// - - // Beginning of inbound chain: send to cluster. -struct ClusterSendHandler : public FrameHandler { - Connection& connection; +struct ClusterSendHandler : public HandlerChain<FrameHandler>::Handler { + Cluster::ConnectionChain& connection; Cluster& cluster; - ClusterSendHandler(Connection& conn, Cluster& clust) : connection(conn), cluster(clust) {} + ClusterSendHandler(Cluster::ConnectionChain& conn, Cluster& clust) : connection(conn), cluster(clust) {} void handle(AMQFrame& f) { // FIXME aconway 2008-01-29: Refcount Connections to ensure @@ -63,16 +51,8 @@ struct ClusterSendHandler : public FrameHandler { } }; -struct ConnectionObserver : public broker::ConnectionManager::Observer { - Cluster& cluster; - ConnectionObserver(Cluster& c) : cluster(c) {} - - void created(Connection& c) { - // FIXME aconway 2008-06-16: clean up chaining and observers. - ClusterSendHandler* sender=new ClusterSendHandler(c, cluster); - c.getInChain().insert(sender); - } -}; +void Cluster::initialize(Cluster::ConnectionChain& cc) { + cc.push(ConnectionChain::HandlerAutoPtr(new ClusterSendHandler(cc, *this))); } ostream& operator <<(ostream& out, const Cluster& cluster) { @@ -95,7 +75,6 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : cpg(*this), name(name_), url(url_), - observer(new ConnectionObserver(*this)), self(cpg.self()) { QPID_LOG(trace, "Joining cluster: " << name_); |
