summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-07-19 21:52:24 +0000
committerAlan Conway <aconway@apache.org>2007-07-19 21:52:24 +0000
commitcb566519d58ded6704507fa5530bf901e620edf6 (patch)
treeab4b29ddd0ad2b5e9015647e379bede84163b13e /cpp/src/qpid/broker
parent3f900af77d5f781431dc25e307974e0fc27aa561 (diff)
downloadqpid-python-cb566519d58ded6704507fa5530bf901e620edf6.tar.gz
* Summary:
- Connect cluster handlers into broker handler chains. - Progress on wiring replication. * src/tests/cluster.mk: Temporarily disabled Cluster test. * src/tests/Cluster.h, cpp, Cluster_child.cpp: Updated to use UUIDs. * src/qpidd.cpp: - Load optional libs (cluster) - Include plugin config in options.parse. * src/qpid/cluster/SessionManager.h: - Create sessions, update handler chains (as HandlerUpdater) - Handle frames from cluster. * src/qpid/cluster/ClusterPlugin.h, .cpp: - renamed from ClusterPluginProvider - Create and connect Cluster and SessionManager. - Register SessionManager as HandlerUpdater. * src/qpid/cluster/Cluster.h, .cpp: Refactor as SessionFrameHandler. * src/qpid/broker/Connection.cpp: Apply HandlerUpdaters. * src/qpid/broker/Broker.h, .cpp: - Initialize plugins - Apply HandlerUpdaters * src/qpid/Plugin.h, .cpp: Simplified plugin framework. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@557788 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp25
-rw-r--r--cpp/src/qpid/broker/Broker.h34
-rw-r--r--cpp/src/qpid/broker/Connection.cpp5
3 files changed, 43 insertions, 21 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 9c8e98ec9a..86342b3c43 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -20,7 +20,6 @@
*/
#include "Broker.h"
-
#include "Connection.h"
#include "DirectExchange.h"
#include "FanOutExchange.h"
@@ -40,11 +39,14 @@
#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/sys/TimeoutHandler.h"
+#include <boost/bind.hpp>
+
#include <iostream>
#include <memory>
using qpid::sys::Acceptor;
using qpid::framing::HandlerUpdater;
+using qpid::framing::FrameHandler;
namespace qpid {
namespace broker {
@@ -98,6 +100,12 @@ Broker::Broker(const Broker::Options& conf) :
store->recover(recoverer);
}
+ // Initialize plugins
+ const Plugin::Plugins& plugins=Plugin::getPlugins();
+ for (Plugin::Plugins::const_iterator i = plugins.begin();
+ i != plugins.end();
+ i++)
+ (*i)->initialize(*this);
}
@@ -149,13 +157,14 @@ Acceptor& Broker::getAcceptor() const {
return *acceptor;
}
-void Broker::use(const shared_ptr<Plugin>& plugin) {
- shared_ptr<HandlerUpdater> updater=
- dynamic_pointer_cast<HandlerUpdater>(plugin);
- if (updater) {
- QPID_LOG(critical, "HandlerUpdater plugins not implemented");
- // FIXME aconway 2007-06-28: hook into Connections.
- }
+void Broker::add(const shared_ptr<HandlerUpdater>& updater) {
+ QPID_LOG(debug, "Broker added HandlerUpdater");
+ handlerUpdaters.push_back(updater);
+}
+
+void Broker::update(FrameHandler::Chains& chains) {
+ for_each(handlerUpdaters.begin(), handlerUpdaters.end(),
+ boost::bind(&HandlerUpdater::update, _1, chains));
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index a27bce1751..9f57a45e0c 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -23,19 +23,23 @@
*/
#include "ConnectionFactory.h"
-#include "qpid/Url.h"
-#include "qpid/Plugin.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Acceptor.h"
-#include "MessageStore.h"
-#include "ExchangeRegistry.h"
#include "ConnectionToken.h"
#include "DirectExchange.h"
#include "DtxManager.h"
-#include "qpid/framing/OutputHandler.h"
-#include "qpid/framing/ProtocolInitiation.h"
+#include "ExchangeRegistry.h"
+#include "MessageStore.h"
#include "QueueRegistry.h"
#include "qpid/Options.h"
+#include "qpid/Plugin.h"
+#include "qpid/Url.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/HandlerUpdater.h"
+#include "qpid/framing/OutputHandler.h"
+#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/sys/Acceptor.h"
+#include "qpid/sys/Runnable.h"
+
+#include <vector>
namespace qpid {
@@ -48,7 +52,7 @@ namespace broker {
/**
* A broker instance.
*/
-class Broker : public sys::Runnable, public PluginUser
+class Broker : public sys::Runnable, public Plugin::Target
{
public:
struct Options : public qpid::Options {
@@ -88,26 +92,32 @@ class Broker : public sys::Runnable, public PluginUser
/** Shut down the broker */
virtual void shutdown();
- /** Use a plugin */
- void use(const shared_ptr<Plugin>& plugin);
+ /** Register a handler updater. */
+ void add(const shared_ptr<framing::HandlerUpdater>&);
+
+ /** Apply all handler updaters to a handler chain pair. */
+ void update(framing::FrameHandler::Chains&);
MessageStore& getStore() { return *store; }
QueueRegistry& getQueues() { return queues; }
ExchangeRegistry& getExchanges() { return exchanges; }
uint64_t getStagingThreshold() { return stagingThreshold; }
DtxManager& getDtxManager() { return dtxManager; }
-
+
private:
sys::Acceptor& getAcceptor() const;
Options config;
sys::Acceptor::shared_ptr acceptor;
const std::auto_ptr<MessageStore> store;
+ typedef std::vector<shared_ptr<framing::HandlerUpdater> > HandlerUpdaters;
+
QueueRegistry queues;
ExchangeRegistry exchanges;
uint64_t stagingThreshold;
ConnectionFactory factory;
DtxManager dtxManager;
+ HandlerUpdaters handlerUpdaters;
static MessageStore* createStore(const Options& config);
};
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 978228a364..7a987f28d2 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -104,7 +104,10 @@ void Connection::closeChannel(uint16_t id) {
FrameHandler::Chains& Connection::getChannel(ChannelId id) {
ChannelMap::iterator i = channels.find(id);
if (i == channels.end()) {
- FrameHandler::Chains chains(new SemanticHandler(id, *this), new OutputHandlerFrameHandler(*out));
+ FrameHandler::Chains chains(
+ new SemanticHandler(id, *this),
+ new OutputHandlerFrameHandler(*out));
+ broker.update(chains);
i = channels.insert(ChannelMap::value_type(id, chains)).first;
}
return i->second;