diff options
| author | Alan Conway <aconway@apache.org> | 2008-07-08 22:58:37 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-07-08 22:58:37 +0000 |
| commit | 8c3baf496f9424249e2a666d79f0e3b38ba8d8fc (patch) | |
| tree | 5fd950f023cacb47cf3cc9dc11aed91c94f380f8 /cpp/src/qpid/cluster | |
| parent | 391608a73f18a1797ab0c358f0a94364dc888eb2 (diff) | |
| download | qpid-python-8c3baf496f9424249e2a666d79f0e3b38ba8d8fc.tar.gz | |
HandlerChain: plug-in handler chain extension points. Replaces Handler<T>::Chain.
Updated Sessoin & Connection handler chains and Cluster.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@675017 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 33 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 13 |
3 files changed, 21 insertions, 35 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index d97a840f82..4ea77e7fbf 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -23,6 +23,7 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/log/Statement.h" +#include "qpid/memory.h" #include <boost/bind.hpp> #include <boost/scoped_array.hpp> #include <algorithm> @@ -36,25 +37,12 @@ using namespace qpid::sys; using namespace std; using broker::Connection; -namespace { - -// FIXME aconway 2008-07-01: sending every frame to cluster, -// serializing all processing in cluster deliver thread. -// This will not perform at all, but provides a correct starting point. -// -// TODO: -// - Fake "Connection" for cluster: owns shadow sessions. -// - Maintain shadow sessions. -// - Apply foreign frames to shadow sessions. -// - - // Beginning of inbound chain: send to cluster. -struct ClusterSendHandler : public FrameHandler { - Connection& connection; +struct ClusterSendHandler : public HandlerChain<FrameHandler>::Handler { + Cluster::ConnectionChain& connection; Cluster& cluster; - ClusterSendHandler(Connection& conn, Cluster& clust) : connection(conn), cluster(clust) {} + ClusterSendHandler(Cluster::ConnectionChain& conn, Cluster& clust) : connection(conn), cluster(clust) {} void handle(AMQFrame& f) { // FIXME aconway 2008-01-29: Refcount Connections to ensure @@ -63,16 +51,8 @@ struct ClusterSendHandler : public FrameHandler { } }; -struct ConnectionObserver : public broker::ConnectionManager::Observer { - Cluster& cluster; - ConnectionObserver(Cluster& c) : cluster(c) {} - - void created(Connection& c) { - // FIXME aconway 2008-06-16: clean up chaining and observers. - ClusterSendHandler* sender=new ClusterSendHandler(c, cluster); - c.getInChain().insert(sender); - } -}; +void Cluster::initialize(Cluster::ConnectionChain& cc) { + cc.push(ConnectionChain::HandlerAutoPtr(new ClusterSendHandler(cc, *this))); } ostream& operator <<(ostream& out, const Cluster& cluster) { @@ -95,7 +75,6 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : cpg(*this), name(name_), url(url_), - observer(new ConnectionObserver(*this)), self(cpg.self()) { QPID_LOG(trace, "Joining cluster: " << name_); diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 031baf914a..84b5ed072c 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -22,6 +22,7 @@ #include "qpid/cluster/Cpg.h" #include "qpid/cluster/ShadowConnectionOutputHandler.h" +#include "qpid/HandlerChain.h" #include "qpid/broker/Broker.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Runnable.h" @@ -47,6 +48,8 @@ namespace cluster { class Cluster : private sys::Runnable, private Cpg::Handler { public: + typedef PluginHandlerChain<framing::FrameHandler, broker::Connection> ConnectionChain; + /** Details of a cluster member */ struct Member { Member(const Url& url_=Url()) : url(url_) {} @@ -62,11 +65,11 @@ class Cluster : private sys::Runnable, private Cpg::Handler */ Cluster(const std::string& name, const Url& url, broker::Broker&); + // Add cluster handlers to broker chains. + void initialize(ConnectionChain&); + virtual ~Cluster(); - // FIXME aconway 2008-01-29: - boost::intrusive_ptr<broker::ConnectionManager::Observer> getObserver() { return observer; } - /** Get the current cluster membership. */ MemberList getMembers() const; @@ -124,7 +127,6 @@ class Cluster : private sys::Runnable, private Cpg::Handler MemberMap members; sys::Thread dispatcher; boost::function<void()> callback; - boost::intrusive_ptr<broker::ConnectionManager::Observer> observer; Id self; ShadowConnectionMap shadowConnectionMap; ShadowConnectionOutputHandler shadowOut; diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 6d3dca84be..c4b67de141 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -54,24 +54,29 @@ struct ClusterOptions : public Options { }; struct ClusterPlugin : public Plugin { + typedef PluginHandlerChain<framing::FrameHandler, broker::Connection> ConnectionChain; ClusterOptions options; boost::optional<Cluster> cluster; - Options* getOptions() { return &options; } + template <class Chain> void init(Plugin::Target& t) { + Chain* c = dynamic_cast<Chain*>(&t); + if (c) cluster->initialize(*c); + } void earlyInitialize(Plugin::Target&) {} void initialize(Plugin::Target& target) { broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); - // Only provide to a Broker, and only if the --cluster config is set. if (broker && !options.name.empty()) { - assert(!cluster); // A process can only belong to one cluster. + if (cluster) throw Exception("Cluster plugin cannot be initialized twice in a process."); cluster = boost::in_place(options.name, options.getUrl(broker->getPort()), boost::ref(*broker)); - broker->getConnectionManager().add(cluster->getObserver()); + return; } + if (!cluster) return; // Ignore chain handlers if we didn't init a cluster. + init<ConnectionChain>(target); } }; |
