summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-07-08 22:58:37 +0000
committerAlan Conway <aconway@apache.org>2008-07-08 22:58:37 +0000
commit8c3baf496f9424249e2a666d79f0e3b38ba8d8fc (patch)
tree5fd950f023cacb47cf3cc9dc11aed91c94f380f8 /cpp/src/qpid/cluster
parent391608a73f18a1797ab0c358f0a94364dc888eb2 (diff)
downloadqpid-python-8c3baf496f9424249e2a666d79f0e3b38ba8d8fc.tar.gz
HandlerChain: plug-in handler chain extension points. Replaces Handler<T>::Chain.
Updated Sessoin & Connection handler chains and Cluster. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@675017 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp33
-rw-r--r--cpp/src/qpid/cluster/Cluster.h10
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp13
3 files changed, 21 insertions, 35 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index d97a840f82..4ea77e7fbf 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -23,6 +23,7 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/log/Statement.h"
+#include "qpid/memory.h"
#include <boost/bind.hpp>
#include <boost/scoped_array.hpp>
#include <algorithm>
@@ -36,25 +37,12 @@ using namespace qpid::sys;
using namespace std;
using broker::Connection;
-namespace {
-
-// FIXME aconway 2008-07-01: sending every frame to cluster,
-// serializing all processing in cluster deliver thread.
-// This will not perform at all, but provides a correct starting point.
-//
-// TODO:
-// - Fake "Connection" for cluster: owns shadow sessions.
-// - Maintain shadow sessions.
-// - Apply foreign frames to shadow sessions.
-//
-
-
// Beginning of inbound chain: send to cluster.
-struct ClusterSendHandler : public FrameHandler {
- Connection& connection;
+struct ClusterSendHandler : public HandlerChain<FrameHandler>::Handler {
+ Cluster::ConnectionChain& connection;
Cluster& cluster;
- ClusterSendHandler(Connection& conn, Cluster& clust) : connection(conn), cluster(clust) {}
+ ClusterSendHandler(Cluster::ConnectionChain& conn, Cluster& clust) : connection(conn), cluster(clust) {}
void handle(AMQFrame& f) {
// FIXME aconway 2008-01-29: Refcount Connections to ensure
@@ -63,16 +51,8 @@ struct ClusterSendHandler : public FrameHandler {
}
};
-struct ConnectionObserver : public broker::ConnectionManager::Observer {
- Cluster& cluster;
- ConnectionObserver(Cluster& c) : cluster(c) {}
-
- void created(Connection& c) {
- // FIXME aconway 2008-06-16: clean up chaining and observers.
- ClusterSendHandler* sender=new ClusterSendHandler(c, cluster);
- c.getInChain().insert(sender);
- }
-};
+void Cluster::initialize(Cluster::ConnectionChain& cc) {
+ cc.push(ConnectionChain::HandlerAutoPtr(new ClusterSendHandler(cc, *this)));
}
ostream& operator <<(ostream& out, const Cluster& cluster) {
@@ -95,7 +75,6 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
cpg(*this),
name(name_),
url(url_),
- observer(new ConnectionObserver(*this)),
self(cpg.self())
{
QPID_LOG(trace, "Joining cluster: " << name_);
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 031baf914a..84b5ed072c 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -22,6 +22,7 @@
#include "qpid/cluster/Cpg.h"
#include "qpid/cluster/ShadowConnectionOutputHandler.h"
+#include "qpid/HandlerChain.h"
#include "qpid/broker/Broker.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Runnable.h"
@@ -47,6 +48,8 @@ namespace cluster {
class Cluster : private sys::Runnable, private Cpg::Handler
{
public:
+ typedef PluginHandlerChain<framing::FrameHandler, broker::Connection> ConnectionChain;
+
/** Details of a cluster member */
struct Member {
Member(const Url& url_=Url()) : url(url_) {}
@@ -62,11 +65,11 @@ class Cluster : private sys::Runnable, private Cpg::Handler
*/
Cluster(const std::string& name, const Url& url, broker::Broker&);
+ // Add cluster handlers to broker chains.
+ void initialize(ConnectionChain&);
+
virtual ~Cluster();
- // FIXME aconway 2008-01-29:
- boost::intrusive_ptr<broker::ConnectionManager::Observer> getObserver() { return observer; }
-
/** Get the current cluster membership. */
MemberList getMembers() const;
@@ -124,7 +127,6 @@ class Cluster : private sys::Runnable, private Cpg::Handler
MemberMap members;
sys::Thread dispatcher;
boost::function<void()> callback;
- boost::intrusive_ptr<broker::ConnectionManager::Observer> observer;
Id self;
ShadowConnectionMap shadowConnectionMap;
ShadowConnectionOutputHandler shadowOut;
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 6d3dca84be..c4b67de141 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -54,24 +54,29 @@ struct ClusterOptions : public Options {
};
struct ClusterPlugin : public Plugin {
+ typedef PluginHandlerChain<framing::FrameHandler, broker::Connection> ConnectionChain;
ClusterOptions options;
boost::optional<Cluster> cluster;
- Options* getOptions() { return &options; }
+ template <class Chain> void init(Plugin::Target& t) {
+ Chain* c = dynamic_cast<Chain*>(&t);
+ if (c) cluster->initialize(*c);
+ }
void earlyInitialize(Plugin::Target&) {}
void initialize(Plugin::Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
- // Only provide to a Broker, and only if the --cluster config is set.
if (broker && !options.name.empty()) {
- assert(!cluster); // A process can only belong to one cluster.
+ if (cluster) throw Exception("Cluster plugin cannot be initialized twice in a process.");
cluster = boost::in_place(options.name,
options.getUrl(broker->getPort()),
boost::ref(*broker));
- broker->getConnectionManager().add(cluster->getObserver());
+ return;
}
+ if (!cluster) return; // Ignore chain handlers if we didn't init a cluster.
+ init<ConnectionChain>(target);
}
};