diff options
| author | Alan Conway <aconway@apache.org> | 2007-07-19 21:52:24 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-07-19 21:52:24 +0000 |
| commit | cb566519d58ded6704507fa5530bf901e620edf6 (patch) | |
| tree | ab4b29ddd0ad2b5e9015647e379bede84163b13e /cpp/src/qpid/broker | |
| parent | 3f900af77d5f781431dc25e307974e0fc27aa561 (diff) | |
| download | qpid-python-cb566519d58ded6704507fa5530bf901e620edf6.tar.gz | |
* Summary:
- Connect cluster handlers into broker handler chains.
- Progress on wiring replication.
* src/tests/cluster.mk: Temporarily disabled Cluster test.
* src/tests/Cluster.h, cpp, Cluster_child.cpp: Updated to use UUIDs.
* src/qpidd.cpp:
- Load optional libs (cluster)
- Include plugin config in options.parse.
* src/qpid/cluster/SessionManager.h:
- Create sessions, update handler chains (as HandlerUpdater)
- Handle frames from cluster.
* src/qpid/cluster/ClusterPlugin.h, .cpp:
- renamed from ClusterPluginProvider
- Create and connect Cluster and SessionManager.
- Register SessionManager as HandlerUpdater.
* src/qpid/cluster/Cluster.h, .cpp: Refactor as SessionFrameHandler.
* src/qpid/broker/Connection.cpp: Apply HandlerUpdaters.
* src/qpid/broker/Broker.h, .cpp:
- Initialize plugins
- Apply HandlerUpdaters
* src/qpid/Plugin.h, .cpp: Simplified plugin framework.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@557788 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 25 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.h | 34 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 5 |
3 files changed, 43 insertions, 21 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 9c8e98ec9a..86342b3c43 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -20,7 +20,6 @@ */ #include "Broker.h" - #include "Connection.h" #include "DirectExchange.h" #include "FanOutExchange.h" @@ -40,11 +39,14 @@ #include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/sys/TimeoutHandler.h" +#include <boost/bind.hpp> + #include <iostream> #include <memory> using qpid::sys::Acceptor; using qpid::framing::HandlerUpdater; +using qpid::framing::FrameHandler; namespace qpid { namespace broker { @@ -98,6 +100,12 @@ Broker::Broker(const Broker::Options& conf) : store->recover(recoverer); } + // Initialize plugins + const Plugin::Plugins& plugins=Plugin::getPlugins(); + for (Plugin::Plugins::const_iterator i = plugins.begin(); + i != plugins.end(); + i++) + (*i)->initialize(*this); } @@ -149,13 +157,14 @@ Acceptor& Broker::getAcceptor() const { return *acceptor; } -void Broker::use(const shared_ptr<Plugin>& plugin) { - shared_ptr<HandlerUpdater> updater= - dynamic_pointer_cast<HandlerUpdater>(plugin); - if (updater) { - QPID_LOG(critical, "HandlerUpdater plugins not implemented"); - // FIXME aconway 2007-06-28: hook into Connections. - } +void Broker::add(const shared_ptr<HandlerUpdater>& updater) { + QPID_LOG(debug, "Broker added HandlerUpdater"); + handlerUpdaters.push_back(updater); +} + +void Broker::update(FrameHandler::Chains& chains) { + for_each(handlerUpdaters.begin(), handlerUpdaters.end(), + boost::bind(&HandlerUpdater::update, _1, chains)); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index a27bce1751..9f57a45e0c 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -23,19 +23,23 @@ */ #include "ConnectionFactory.h" -#include "qpid/Url.h" -#include "qpid/Plugin.h" -#include "qpid/sys/Runnable.h" -#include "qpid/sys/Acceptor.h" -#include "MessageStore.h" -#include "ExchangeRegistry.h" #include "ConnectionToken.h" #include "DirectExchange.h" #include "DtxManager.h" -#include "qpid/framing/OutputHandler.h" -#include "qpid/framing/ProtocolInitiation.h" +#include "ExchangeRegistry.h" +#include "MessageStore.h" #include "QueueRegistry.h" #include "qpid/Options.h" +#include "qpid/Plugin.h" +#include "qpid/Url.h" +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/HandlerUpdater.h" +#include "qpid/framing/OutputHandler.h" +#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/sys/Acceptor.h" +#include "qpid/sys/Runnable.h" + +#include <vector> namespace qpid { @@ -48,7 +52,7 @@ namespace broker { /** * A broker instance. */ -class Broker : public sys::Runnable, public PluginUser +class Broker : public sys::Runnable, public Plugin::Target { public: struct Options : public qpid::Options { @@ -88,26 +92,32 @@ class Broker : public sys::Runnable, public PluginUser /** Shut down the broker */ virtual void shutdown(); - /** Use a plugin */ - void use(const shared_ptr<Plugin>& plugin); + /** Register a handler updater. */ + void add(const shared_ptr<framing::HandlerUpdater>&); + + /** Apply all handler updaters to a handler chain pair. */ + void update(framing::FrameHandler::Chains&); MessageStore& getStore() { return *store; } QueueRegistry& getQueues() { return queues; } ExchangeRegistry& getExchanges() { return exchanges; } uint64_t getStagingThreshold() { return stagingThreshold; } DtxManager& getDtxManager() { return dtxManager; } - + private: sys::Acceptor& getAcceptor() const; Options config; sys::Acceptor::shared_ptr acceptor; const std::auto_ptr<MessageStore> store; + typedef std::vector<shared_ptr<framing::HandlerUpdater> > HandlerUpdaters; + QueueRegistry queues; ExchangeRegistry exchanges; uint64_t stagingThreshold; ConnectionFactory factory; DtxManager dtxManager; + HandlerUpdaters handlerUpdaters; static MessageStore* createStore(const Options& config); }; diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 978228a364..7a987f28d2 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -104,7 +104,10 @@ void Connection::closeChannel(uint16_t id) { FrameHandler::Chains& Connection::getChannel(ChannelId id) { ChannelMap::iterator i = channels.find(id); if (i == channels.end()) { - FrameHandler::Chains chains(new SemanticHandler(id, *this), new OutputHandlerFrameHandler(*out)); + FrameHandler::Chains chains( + new SemanticHandler(id, *this), + new OutputHandlerFrameHandler(*out)); + broker.update(chains); i = channels.insert(ChannelMap::value_type(id, chains)).first; } return i->second; |
