From e65b0086a2924ff04640b1350393a816249d01b3 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 17 Jul 2008 00:03:50 +0000 Subject: Cluster: shadow connections, fix lifecycle & valgrind issues. - tests/ForkedBroker: improved broker forking, exec full qpidd. - Plugin::addFinalizer - more flexible way to shutdown plugins. - Reworked cluster extension points using boost::function. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@677471 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/cluster/ClusterPlugin.cpp | 74 +++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 27 deletions(-) (limited to 'cpp/src/qpid/cluster/ClusterPlugin.cpp') diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index c4b67de141..a2c66e3790 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -15,8 +15,8 @@ * limitations under the License. * */ -#include +#include "ConnectionInterceptor.h" #include "qpid/broker/Broker.h" @@ -25,61 +25,81 @@ #include "qpid/Options.h" #include "qpid/shared_ptr.h" -#include #include - namespace qpid { namespace cluster { using namespace std; -struct ClusterOptions : public Options { +struct ClusterValues { string name; string url; - ClusterOptions() : Options("Cluster Options") { + Url getUrl(uint16_t port) const { + if (url.empty()) return Url::getIpAddressesUrl(port); + return Url(url); + } +}; + +/** Note separating options from values to work around boost version differences. + * Old boost takes a reference to options objects, but new boost makes a copy. + * New boost allows a shared_ptr but that's not compatible with old boost. + */ +struct ClusterOptions : public Options { + ClusterValues& values; + + ClusterOptions(ClusterValues& v) : Options("Cluster Options"), values(v) { addOptions() - ("cluster-name", optValue(name, "NAME"), "Name of cluster to join") - ("cluster-url", optValue(url,"URL"), + ("cluster-name", optValue(values.name, "NAME"), "Name of cluster to join") + ("cluster-url", optValue(values.url,"URL"), "URL of this broker, advertized to the cluster.\n" "Defaults to a URL listing all the local IP addresses\n") ; } - - Url getUrl(uint16_t port) const { - if (url.empty()) return Url::getIpAddressesUrl(port); - return Url(url); - } }; struct ClusterPlugin : public Plugin { - typedef PluginHandlerChain ConnectionChain; + ClusterValues values; ClusterOptions options; - boost::optional cluster; + boost::intrusive_ptr cluster; + + ClusterPlugin() : options(values) {} + + Options* getOptions() { return &options; } - template void init(Plugin::Target& t) { - Chain* c = dynamic_cast(&t); - if (c) cluster->initialize(*c); + void init(broker::Broker& b) { + if (values.name.empty()) return; // Only if --cluster-name option was specified. + if (cluster) throw Exception("Cluster plugin cannot be initialized twice in one process."); + cluster = new Cluster(values.name, values.getUrl(b.getPort()), b); + b.addFinalizer(boost::bind(&ClusterPlugin::shutdown, this)); + } + + template void init(T& t) { + if (cluster) cluster->initialize(t); + } + + template bool init(Plugin::Target& target) { + T* t = dynamic_cast(&target); + if (t) init(*t); + return t; } void earlyInitialize(Plugin::Target&) {} void initialize(Plugin::Target& target) { - broker::Broker* broker = dynamic_cast(&target); - if (broker && !options.name.empty()) { - if (cluster) throw Exception("Cluster plugin cannot be initialized twice in a process."); - cluster = boost::in_place(options.name, - options.getUrl(broker->getPort()), - boost::ref(*broker)); - return; - } - if (!cluster) return; // Ignore chain handlers if we didn't init a cluster. - init(target); + if (init(target)) return; + if (!cluster) return; // Remaining plugins only valid if cluster initialized. + if (init(target)) return; } + + void shutdown() { cluster = 0; } }; static ClusterPlugin instance; // Static initialization. + +// For test purposes. +boost::intrusive_ptr getGlobalCluster() { return instance.cluster; } }} // namespace qpid::cluster -- cgit v1.2.1