diff options
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/Plugin.cpp | 55 | ||||
| -rw-r--r-- | cpp/src/qpid/Plugin.h | 137 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 14 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 59 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/TCPIOPlugin.cpp | 26 |
6 files changed, 180 insertions, 114 deletions
diff --git a/cpp/src/qpid/Plugin.cpp b/cpp/src/qpid/Plugin.cpp index 733d134334..84dc5a0107 100644 --- a/cpp/src/qpid/Plugin.cpp +++ b/cpp/src/qpid/Plugin.cpp @@ -19,36 +19,53 @@ */ #include "Plugin.h" -#include "qpid/Options.h" +#include <qpid/shared_ptr.h> +#include <qpid/Options.h> +#include <qpid/sys/Mutex.h> namespace qpid { -namespace { -Plugin::Plugins& thePlugins() { - // This is a single threaded singleton implementation so - // it is important to be sure that the first use of this - // singleton is when the program is still single threaded - static Plugin::Plugins plugins; - return plugins; -} +Plugin::Target::~Target() {} + +void Plugin::Target::createPlugins() { + typedef std::vector<Plugin::Factory*>::const_iterator Iter; + for (Iter i = Factory::getList().begin(); i != Factory::getList().end(); ++i) { + boost::shared_ptr<Plugin> plugin = (**i).create(*this); + if (plugin) + plugins.push_back(plugin); + } } -Plugin::Plugin() { - // Register myself. - thePlugins().push_back(this); +void Plugin::Target::initializePlugins() { + typedef std::vector<boost::shared_ptr<Plugin> >::iterator Iter; + for (Iter i = plugins.begin(); i != plugins.end(); ++i) + (**i).initialize(*this); } -Plugin::~Plugin() {} +void Plugin::Target::releasePlugins() { plugins.clear(); } + +Plugin::Factory::~Factory() {} -Options* Plugin::getOptions() { return 0; } +Plugin::Factory::Factory() { + const_cast<std::vector<Plugin::Factory*>& >(getList()).push_back(this); +} + +static sys::PODMutex PluginFactoryGetListLock; -const Plugin::Plugins& Plugin::getPlugins() { return thePlugins(); } +const std::vector<Plugin::Factory*>& Plugin::Factory::getList() { + sys::PODMutex::ScopedLock l(PluginFactoryGetListLock); + static std::vector<Plugin::Factory*> list; + return list; +} -void Plugin::addOptions(Options& opts) { - for (Plugins::const_iterator i = getPlugins().begin(); i != getPlugins().end(); ++i) { - if ((*i)->getOptions()) - opts.add(*(*i)->getOptions()); +void Plugin::Factory::addOptions(Options& opts) { + typedef std::vector<Plugin::Factory*>::const_iterator Iter; + for (Iter i = Factory::getList().begin(); i != Factory::getList().end(); ++i) { + if ((**i).getOptions()) + opts.add(*(**i).getOptions()); } } +Plugin::~Plugin() {} + } // namespace qpid diff --git a/cpp/src/qpid/Plugin.h b/cpp/src/qpid/Plugin.h index 3ead770129..5b0bb0c2c1 100644 --- a/cpp/src/qpid/Plugin.h +++ b/cpp/src/qpid/Plugin.h @@ -21,78 +21,123 @@ * */ -#include "qpid/shared_ptr.h" -#include <boost/noncopyable.hpp> +#include <boost/shared_ptr.hpp> #include <vector> -#include <boost/function.hpp> - /**@file Generic plug-in framework. */ namespace qpid { + class Options; /** * Plug-in base class. + * + * A Plugin is created by a Plugin::Factory and attached to a Plugin::Target. */ -class Plugin : boost::noncopyable -{ +class Plugin { public: /** - * Base interface for targets that receive plug-ins. - * - * The Broker is a plug-in target, there might be others - * in future. + * Base class for target objects that receive plug-ins. */ - struct Target { virtual ~Target() {} }; + class Target { + public: + virtual ~Target(); - typedef std::vector<Plugin*> Plugins; - - /** - * Construct registers the plug-in to appear in getPlugins(). - * - * A concrete Plugin is instantiated as a global or static - * member variable in a library so it is registered during static - * initialization when the library is loaded. - */ - Plugin(); - - virtual ~Plugin(); + protected: + /** For each Factory create a plugin attached to this */ + void createPlugins(); - /** - * Configuration options for the plugin. - * Then will be updated during option parsing by the host program. - * - * @return An options group or 0 for no options. Default returns 0. - * Plugin retains ownership of return value. - */ - virtual Options* getOptions(); + /** Initialize all attached plugins */ + void initializePlugins(); + + /** Release the attached plugins. Done automatically in destructor. */ + void releasePlugins(); + + private: + std::vector<boost::shared_ptr<Plugin> > plugins; + }; + + /** Base class for a factory to create Plugins. */ + class Factory { + public: + /** + * Constructor registers the factory so it will be included in getList(). + * + * Derive your Plugin Factory class from Factory and create a + * single global instance in your plug-in library. Loading the + * library will automatically register your factory. + */ + Factory(); + + virtual ~Factory(); + + /** Get the list of Factories */ + static const std::vector<Factory*>& getList(); + + /** For each Factory in Factory::getList() add options to opts. */ + static void addOptions(Options& opts); + + /** + * Configuration options for this factory. + * Then will be updated during option parsing by the host program. + * + * @return An options group or 0 for no options. + */ + virtual Options* getOptions() = 0; + + /** + * Create a Plugin for target. + * Uses option values set by getOptions(). + * Target may not be fully initialized at this point. + * Actions that require a fully-initialized target should be + * done in initialize(). + * + * @returns 0 if the target type is not compatible with this Factory. + */ + virtual boost::shared_ptr<Plugin> create(Target& target) = 0; + }; /** - * Initialize Plugin functionality on a Target. - * Plugins should ignore targets they don't recognize. - * - * Called before the target itself is initialized. + * Template factory implementation, checks target type is correct. */ - virtual void earlyInitialize(Target&) = 0; + template <class TargetType> class FactoryT : public Factory { + Options* getOptions() { return 0; } + + virtual boost::shared_ptr<Plugin> createT(TargetType& target) = 0; + + boost::shared_ptr<Plugin> create(Target& target) { + TargetType* tt = dynamic_cast<TargetType*>(&target); + if (tt) + return createT(*tt); + else + return boost::shared_ptr<Plugin>(); + } + }; + + // Plugin functions. + virtual ~Plugin(); + /** - * Initialize Plugin functionality on a Target. - * Plugins should ignore targets they don't recognize. - * + * Initialize the Plugin. * Called after the target is fully initialized. */ virtual void initialize(Target&) = 0; +}; - /** List of registered Plugin objects. - * Caller must not delete plugin pointers. - */ - static const Plugins& getPlugins(); +/** Template plugin factory */ +template <class TargetType> class PluginT : public Plugin { + + virtual void initializeT(TargetType&) = 0; - /** For each registered plugin, add plugin.getOptions() to opts. */ - static void addOptions(Options& opts); + void initialize(Plugin::Target& target) { + TargetType* tt = dynamic_cast<TargetType*>(&target); + assert(tt); + initializeT(*tt); + } }; - + } // namespace qpid #endif /*!QPID_PLUGIN_H*/ diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index f008eb23f7..a3dd93899a 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -166,13 +166,7 @@ Broker::Broker(const Broker::Options& conf) : links.setParent (vhost); } - // Early-Initialize plugins - const Plugin::Plugins& plugins=Plugin::getPlugins(); - for (Plugin::Plugins::const_iterator i = plugins.begin(); - i != plugins.end(); - i++) - (*i)->earlyInitialize(*this); - + createPlugins(); // If no plugin store module registered itself, set up the null store. if (store == 0) setStore (new NullMessageStore (false)); @@ -223,11 +217,7 @@ Broker::Broker(const Broker::Options& conf) : #endif } - // Initialize plugins - for (Plugin::Plugins::const_iterator i = plugins.begin(); - i != plugins.end(); - i++) - (*i)->initialize(*this); + initializePlugins(); } void Broker::declareStandardExchange(const std::string& name, const std::string& type) diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index ceafa389b0..a638f509c6 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -33,47 +33,64 @@ namespace qpid { namespace cluster { using namespace std; +using broker::Broker; -struct ClusterOptions : public Options { +struct OptionValues { string name; string url; - ClusterOptions() : Options("Cluster Options") { + Url getUrl(uint16_t port) const { + if (url.empty()) return Url::getIpAddressesUrl(port); + return Url(url); + } +}; + +// Note we update the values in a separate struct. +// This is to work around boost::program_options differences, +// older versions took a reference to the options, newer +// ones take a copy (or require a shared_ptr) +// +struct ClusterOptions : public Options { + + ClusterOptions(OptionValues* v) : Options("Cluster Options") { addOptions() - ("cluster-name", optValue(name, "NAME"), "Name of cluster to join") - ("cluster-url", optValue(url,"URL"), + ("cluster-name", optValue(v->name, "NAME"), "Name of cluster to join") + ("cluster-url", optValue(v->url,"URL"), "URL of this broker, advertized to the cluster.\n" "Defaults to a URL listing all the local IP addresses\n"); } +}; - Url getUrl(uint16_t port) const { - if (url.empty()) return Url::getIpAddressesUrl(port); - return Url(url); +struct ClusterPlugin : public PluginT<Broker> { + OptionValues values; + boost::optional<Cluster> cluster; + + ClusterPlugin(const OptionValues& v) : values(v) {} + + void initializeT(Broker& broker) { + cluster = boost::in_place(values.name, values.getUrl(broker.getPort()), boost::ref(broker)); + broker.getSessionManager().add(cluster->getObserver()); } }; -struct ClusterPlugin : public Plugin { +struct PluginFactory : public Plugin::FactoryT<Broker> { + OptionValues values; ClusterOptions options; - boost::optional<Cluster> cluster; - Options* getOptions() { return &options; } + PluginFactory() : options(&values) {} - void earlyInitialize(Plugin::Target&) {} + Options* getOptions() { return &options; } - void initialize(Plugin::Target& target) { - broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); + boost::shared_ptr<Plugin> createT(Broker&) { // Only provide to a Broker, and only if the --cluster config is set. - if (broker && !options.name.empty()) { - assert(!cluster); // A process can only belong to one cluster. - cluster = boost::in_place(options.name, - options.getUrl(broker->getPort()), - boost::ref(*broker)); - broker->getSessionManager().add(cluster->getObserver()); - } + if (values.name.empty()) + return boost::shared_ptr<Plugin>(); + else + return make_shared_ptr(new ClusterPlugin(values)); } }; -static ClusterPlugin instance; // Static initialization. +static PluginFactory instance; // Static initialization. }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index ce8aa0dc33..7b8fce4112 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -104,9 +104,8 @@ Cpg::~Cpg() { } void Cpg::shutdown() { - QPID_LOG(debug, "Shutdown CPG handle " << handle); if (handles.get(handle)) { - QPID_LOG(debug, "Finalize CPG handle " << handle); + QPID_LOG(debug, "Finalize CPG handle " << std::hex << handle); handles.put(handle, 0); check(cpg_finalize(handle), "Error in shutdown of CPG"); } diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp index e82a6a9102..9d272ee69c 100644 --- a/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -53,22 +53,20 @@ class AsynchIOProtocolFactory : public ProtocolFactory { bool isClient); }; -// Static instance to initialise plugin -static class TCPIOPlugin : public Plugin { - void earlyInitialize(Target&) { +struct TCPIOPlugin : public PluginT<broker::Broker> { + void initializeT(broker::Broker& broker) { + const broker::Broker::Options& opts = broker.getOptions(); + ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog)); + QPID_LOG(info, "Listening on TCP port " << protocol->getPort()); + broker.registerProtocolFactory(protocol); } - - void initialize(Target& target) { - broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); - // Only provide to a Broker - if (broker) { - const broker::Broker::Options& opts = broker->getOptions(); - ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog)); - QPID_LOG(info, "Listening on TCP port " << protocol->getPort()); - broker->registerProtocolFactory(protocol); - } +}; + +static struct TCPIOPluginFactory : public Plugin::FactoryT<broker::Broker> { + boost::shared_ptr<Plugin> createT(broker::Broker&) { + return make_shared_ptr(new TCPIOPlugin()); } -} tcpPlugin; +} theTCPIOPluginFactory; // Static plugin factory instance. AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog) : listeningPort(listener.listen(port, backlog)) |
