diff options
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 59 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 3 |
2 files changed, 39 insertions, 23 deletions
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index ceafa389b0..a638f509c6 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -33,47 +33,64 @@ namespace qpid { namespace cluster { using namespace std; +using broker::Broker; -struct ClusterOptions : public Options { +struct OptionValues { string name; string url; - ClusterOptions() : Options("Cluster Options") { + Url getUrl(uint16_t port) const { + if (url.empty()) return Url::getIpAddressesUrl(port); + return Url(url); + } +}; + +// Note we update the values in a separate struct. +// This is to work around boost::program_options differences, +// older versions took a reference to the options, newer +// ones take a copy (or require a shared_ptr) +// +struct ClusterOptions : public Options { + + ClusterOptions(OptionValues* v) : Options("Cluster Options") { addOptions() - ("cluster-name", optValue(name, "NAME"), "Name of cluster to join") - ("cluster-url", optValue(url,"URL"), + ("cluster-name", optValue(v->name, "NAME"), "Name of cluster to join") + ("cluster-url", optValue(v->url,"URL"), "URL of this broker, advertized to the cluster.\n" "Defaults to a URL listing all the local IP addresses\n"); } +}; - Url getUrl(uint16_t port) const { - if (url.empty()) return Url::getIpAddressesUrl(port); - return Url(url); +struct ClusterPlugin : public PluginT<Broker> { + OptionValues values; + boost::optional<Cluster> cluster; + + ClusterPlugin(const OptionValues& v) : values(v) {} + + void initializeT(Broker& broker) { + cluster = boost::in_place(values.name, values.getUrl(broker.getPort()), boost::ref(broker)); + broker.getSessionManager().add(cluster->getObserver()); } }; -struct ClusterPlugin : public Plugin { +struct PluginFactory : public Plugin::FactoryT<Broker> { + OptionValues values; ClusterOptions options; - boost::optional<Cluster> cluster; - Options* getOptions() { return &options; } + PluginFactory() : options(&values) {} - void earlyInitialize(Plugin::Target&) {} + Options* getOptions() { return &options; } - void initialize(Plugin::Target& target) { - broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); + boost::shared_ptr<Plugin> createT(Broker&) { // Only provide to a Broker, and only if the --cluster config is set. - if (broker && !options.name.empty()) { - assert(!cluster); // A process can only belong to one cluster. - cluster = boost::in_place(options.name, - options.getUrl(broker->getPort()), - boost::ref(*broker)); - broker->getSessionManager().add(cluster->getObserver()); - } + if (values.name.empty()) + return boost::shared_ptr<Plugin>(); + else + return make_shared_ptr(new ClusterPlugin(values)); } }; -static ClusterPlugin instance; // Static initialization. +static PluginFactory instance; // Static initialization. }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index ce8aa0dc33..7b8fce4112 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -104,9 +104,8 @@ Cpg::~Cpg() { } void Cpg::shutdown() { - QPID_LOG(debug, "Shutdown CPG handle " << handle); if (handles.get(handle)) { - QPID_LOG(debug, "Finalize CPG handle " << handle); + QPID_LOG(debug, "Finalize CPG handle " << std::hex << handle); handles.put(handle, 0); check(cpg_finalize(handle), "Error in shutdown of CPG"); } |
