diff options
| author | Alan Conway <aconway@apache.org> | 2007-06-29 17:59:00 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-06-29 17:59:00 +0000 |
| commit | fda6dadde945a9c73c97b73dc79e93368b743348 (patch) | |
| tree | d7755539ae485efdfbc46298cd1ef6632515159e /cpp/src/qpid/broker | |
| parent | 79cd6c772da003ddc917eff362f9adaa99e28b49 (diff) | |
| download | qpid-python-fda6dadde945a9c73c97b73dc79e93368b743348.tar.gz | |
* Summary:
- Improved plugin framework and HandlerUpdater interface.
- Cluster handlers for traffic to/from cluster.
- Cluster HandlerUpdater configures channel chains for cluster.
- Cluster PluginProvider registers cluster objects with broker.
* src/qpid/framing/AMQFrame.h: Made data members public. Handlers
need to be able to modify frame data, getters/setters are just a
nuisance here.
* src/tests/Cluster.cpp: Updated for cluster changes, using
handlers instead of friendship to hook test into Cluster code.
* src/qpid/framing/amqp_types.h: Added CHANNEL_MAX and
CHANNEL_HIGH_BIT constants.
* src/qpid/framing/HandlerUpdater.h: Renamed ChannelInitializer,
broke dependency on broker channel types.
* src/qpid/framing/Handler.h: Added constructors and nextHandler()
* src/qpid/framing/AMQFrame.h (class AMQFrame): Inlined getChannel()
* src/qpid/cluster/ClusterPluginProvider.cpp: Provider for cluster
plugins.
* src/qpid/cluster/Cluster.cpp: Use ChannelManager. Factor out
plugin details to ClusterPluginProvider.
* src/qpid/cluster/ChannelManager.h: Insert cluster handlers
into channel chains, route frames between cluster and channels.
* src/qpid/broker/BrokerAdapter.cpp (startOk): use CHANNEL_MAX
constant.
* src/qpid/broker/Broker.cpp:
- Refactored for new plugin framework.
- Added getUrl().
* src/qpid/Url.h: Added constructor from Address.
* src/qpid/Plugin.h: Generalized plugin framework, broke
dependency on Broker interfaces. We may want to use plug-ins for
clients also at some point.
* src/tests/run_test: Fix bug when VALGRIND is not set.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@551981 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 23 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.h | 36 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/BrokerSingleton.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/BrokerSingleton.h | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/ChannelInitializer.h | 43 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Plugin.cpp | 67 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Plugin.h | 80 |
8 files changed, 42 insertions, 219 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 4354f6d38a..65252f5415 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -22,6 +22,7 @@ #include "Broker.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/HandlerUpdater.h" #include "DirectExchange.h" #include "TopicExchange.h" #include "FanOutExchange.h" @@ -41,6 +42,7 @@ #include <memory> using qpid::sys::Acceptor; +using qpid::framing::HandlerUpdater; namespace qpid { namespace broker { @@ -100,16 +102,17 @@ Broker::Broker(const Broker::Options& conf) : } -Broker::shared_ptr Broker::create(int16_t port) +shared_ptr<Broker> Broker::create(int16_t port) { Options config; config.port=port; return create(config); } -Broker::shared_ptr Broker::create(const Options& config) { - return Broker::shared_ptr(new Broker(config)); -} +shared_ptr<Broker> Broker::create(const Options& opts) +{ + return shared_ptr<Broker>(new Broker(opts)); +} MessageStore* Broker::createStore(const Options& config) { if (config.store.empty()) @@ -134,6 +137,10 @@ Broker::~Broker() { int16_t Broker::getPort() const { return getAcceptor().getPort(); } +std::string Broker::getUrl() const { + return Url(TcpAddress(getAcceptor().getHost(), getPort())).str(); +} + Acceptor& Broker::getAcceptor() const { if (!acceptor) const_cast<Acceptor::shared_ptr&>(acceptor) = @@ -144,6 +151,14 @@ Acceptor& Broker::getAcceptor() const { return *acceptor; } +void Broker::use(const shared_ptr<Plugin>& plugin) { + shared_ptr<HandlerUpdater> updater= + dynamic_pointer_cast<HandlerUpdater>(plugin); + if (updater) { + QPID_LOG(critical, "HandlerUpdater plugins not implemented"); + // FIXME aconway 2007-06-28: hook into Connections. + } +} }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 4be3d9761e..58ba8589b0 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -24,9 +24,9 @@ #include "ConnectionFactory.h" #include "qpid/Url.h" +#include "qpid/Plugin.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Acceptor.h" -#include "qpid/SharedObject.h" #include "MessageStore.h" #include "AutoDelete.h" #include "ExchangeRegistry.h" @@ -39,14 +39,17 @@ #include "qpid/Options.h" namespace qpid { + +namespace framing { +class HandlerUpdater; +} + namespace broker { -class ChannelInitializer; /** * A broker instance. */ -class Broker : public sys::Runnable, - public SharedObject<Broker> +class Broker : public sys::Runnable, public PluginUser { public: struct Options : public qpid::Options { @@ -62,16 +65,9 @@ class Broker : public sys::Runnable, virtual ~Broker(); - /** - * Create a broker. - * @param port Port to listen on or 0 to pick a port dynamically. - */ - static shared_ptr create(int16_t port = TcpAddress::DEFAULT_PORT); - - /** - * Create a broker with the options in config. - */ - static shared_ptr create(const Options& config); + Broker(const Options& configuration); + static shared_ptr<Broker> create(const Options& configuration); + static shared_ptr<Broker> create(int16_t port = TcpAddress::DEFAULT_PORT); /** * Return listening port. If called before bind this is @@ -80,19 +76,22 @@ class Broker : public sys::Runnable, * 0. */ virtual int16_t getPort() const; - + + /** Return the broker's URL. */ + virtual std::string getUrl() const; + /** * Run the broker. Implements Runnable::run() so the broker * can be run in a separate thread. */ virtual void run(); - /** Plug-in a channel initializer. */ - void plugin(const qpid::shared_ptr<ChannelInitializer>&); - /** Shut down the broker */ virtual void shutdown(); + /** Use a plugin */ + void use(const shared_ptr<Plugin>& plugin); + MessageStore& getStore() { return *store; } QueueRegistry& getQueues() { return queues; } ExchangeRegistry& getExchanges() { return exchanges; } @@ -102,7 +101,6 @@ class Broker : public sys::Runnable, DtxManager& getDtxManager() { return dtxManager; } private: - Broker(const Options& configuration); sys::Acceptor& getAcceptor() const; Options config; diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index c31f4d197d..b7a61ababe 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -58,7 +58,7 @@ void BrokerAdapter::ConnectionHandlerImpl::startOk( const string& /*response*/, const string& /*locale*/) { client.tune( - 100, connection.getFrameMax(), connection.getHeartbeat()); + CHANNEL_MAX, connection.getFrameMax(), connection.getHeartbeat()); } void BrokerAdapter::ConnectionHandlerImpl::secureOk( diff --git a/cpp/src/qpid/broker/BrokerSingleton.cpp b/cpp/src/qpid/broker/BrokerSingleton.cpp index 4571764850..77200dd760 100644 --- a/cpp/src/qpid/broker/BrokerSingleton.cpp +++ b/cpp/src/qpid/broker/BrokerSingleton.cpp @@ -24,13 +24,13 @@ namespace broker { BrokerSingleton::BrokerSingleton() { if (broker.get() == 0) broker = Broker::create(); - Broker::shared_ptr::operator=(broker); + shared_ptr<Broker>::operator=(broker); } BrokerSingleton::~BrokerSingleton() { broker->shutdown(); } -Broker::shared_ptr BrokerSingleton::broker; +shared_ptr<Broker> BrokerSingleton::broker; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/BrokerSingleton.h b/cpp/src/qpid/broker/BrokerSingleton.h index 139e02a5fd..14b932df36 100644 --- a/cpp/src/qpid/broker/BrokerSingleton.h +++ b/cpp/src/qpid/broker/BrokerSingleton.h @@ -32,17 +32,17 @@ namespace broker { * Useful for unit tests that want to share a broker between multiple * tests to reduce overhead of starting/stopping a broker for every test. * - * Tests that need a new broker can call Broker::create directly. + * Tests that need a new broker can create it directly. * * THREAD UNSAFE. */ -class BrokerSingleton : public Broker::shared_ptr +class BrokerSingleton : public shared_ptr<Broker> { public: BrokerSingleton(); ~BrokerSingleton(); private: - static Broker::shared_ptr broker; + static shared_ptr<Broker> broker; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/ChannelInitializer.h b/cpp/src/qpid/broker/ChannelInitializer.h deleted file mode 100644 index 84dea7a3d7..0000000000 --- a/cpp/src/qpid/broker/ChannelInitializer.h +++ /dev/null @@ -1,43 +0,0 @@ -#ifndef QPID_BROKER_CHANNELINITIALIZER_H -#define QPID_BROKER_CHANNELINITIALIZER_H - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <boost/noncopyable.hpp> - -namespace qpid { -namespace broker { - -/** - * A ChannelInitializer is called each time a new Channel is created. - */ -class ChannelInitializer : boost::noncopyable -{ - public: - virtual ~ChannelInitializer() {} - - /** Called for each new channel */ - virtual initialize(Channe&) = 0; -}; - -}} // namespace qpid::broker - - - -#endif /*!QPID_BROKER_CHANNELINITIALIZER_H*/ diff --git a/cpp/src/qpid/broker/Plugin.cpp b/cpp/src/qpid/broker/Plugin.cpp deleted file mode 100644 index f018c4e152..0000000000 --- a/cpp/src/qpid/broker/Plugin.cpp +++ /dev/null @@ -1,67 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "Plugin.h" - -namespace qpid { -namespace broker { - -std::vector<Plugin*> Plugin::plugins; - -Plugin::Plugin() { - // Register myself. - plugins.push_back(this); -} - -Plugin::~Plugin() {} - -Options* Plugin::getOptions() { return 0; } - -void Plugin::start(Broker&) {} - -void Plugin::finish(Broker&) {} - -const std::vector<Plugin*>& Plugin::getPlugins() { return plugins; } - -#ifdef USE_APR_PLATFORM - -#include "qpid/sys/apr/APRBase.h" -#include "qpid/sys/apr/APRPool.h" -#include <apr_dso.h> - -void Plugin::dlopen(const std::string& name) { - apr_dso_handle_t* handle; - CHECK_APR_SUCCESS( - apr_dso_load(&handle, name.c_str(), sys::APRPool::get())); -} - -#else // Posix - -#include <dlfcn.h> - -void Plugin::dlopen(const std::string& name) { - dlerror(); - dlopen(name.c_str(), RTLD_NOW); - const char* error = dlerror(); - if (error) { - THROW_QPID_ERROR(INTERNAL_ERROR, error); - } -} -#endif // USE_APR_PLATFORM - -}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Plugin.h b/cpp/src/qpid/broker/Plugin.h deleted file mode 100644 index 8c794b86a0..0000000000 --- a/cpp/src/qpid/broker/Plugin.h +++ /dev/null @@ -1,80 +0,0 @@ -#ifndef QPID_BROKER_PLUGIN_H -#define QPID_BROKER_PLUGIN_H - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/Options.h" -#include "qpid/broker/Broker.h" -#include <boost/noncopyable.hpp> -#include <vector> - -namespace qpid { -namespace broker { - -/** - * Inherit broker plug-ins from this class, override the virtual - * functions and create a global (or class static member) instance in - * a shared library. When the library is loaded your plug-in will be - * registered. - */ -class Plugin : boost::noncopyable -{ - public: - /** Constructor registers the plugin to appear in getPlugins(). - * Note: Plugin subclasses should only be constructed during - * static initialization, i.e. they should only be declared - * as global or static member variables. - */ - Plugin(); - - virtual ~Plugin(); - - /** - * Override if your plugin has configuration options. - * They will be included in options parsing prior to broker - * creation setup. - *@return An options group or 0. Default returns 0. - */ - virtual Options* getOptions(); - - /** Called immediately after broker creation to allow plug-ins - * to do whatever they do to the broker, e.g. add handler chain - * manipulators. - */ - virtual void start(Broker& b); - - /** Called just before broker shutdown. Default does nothing */ - virtual void finish(Broker& b); - - /** Get the list of registered plug-ins. */ - static const std::vector<Plugin*>& getPlugins(); - - /** Load a shared library (that contains plugins presumably!) */ - static void dlopen(const std::string& libname); - - private: - static std::vector<Plugin*> plugins; -}; - -}} // namespace qpid::broker - - - - -#endif /*!QPID_BROKER_PLUGIN_H*/ |
