diff options
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 21 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 51 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterSettings.h | 48 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 15 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.h | 4 |
8 files changed, 99 insertions, 51 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index d9a5125760..f845492dbc 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -17,6 +17,7 @@ */ #include "Cluster.h" +#include "ClusterSettings.h" #include "Connection.h" #include "UpdateClient.h" #include "FailoverExchange.h" @@ -82,16 +83,17 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } }; -Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_, size_t writeEstimate_) : +Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : + settings(set), broker(b), mgmtObject(0), poller(b.getPoller()), cpg(*this), - name(name_), - myUrl(url_), + name(settings.name), + myUrl(settings.url.empty() ? Url() : Url(settings.url)), myId(cpg.self()), - readMax(readMax_), - writeEstimate(writeEstimate_), + readMax(settings.readMax), + writeEstimate(settings.writeEstimate), mcast(cpg, poller, boost::bind(&Cluster::leave, this)), dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1), @@ -121,7 +123,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b } failoverExchange.reset(new FailoverExchange(this)); - if (quorum_) quorum.init(); + if (settings.quorum) quorum.init(); cpg.join(name); // pump the CPG dispatch manually till we get initialized. while (!initialized) @@ -425,10 +427,15 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) { QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url); deliverFrameQueue.stop(); if (updateThread.id()) updateThread.join(); // Join the previous updatethread. + client::ConnectionSettings cs; + cs.username = settings.username; + cs.password = settings.password; + cs.mechanism = settings.mechanism; updateThread = Thread( new UpdateClient(myId, updatee, url, broker, map, frameId, connections.values(), boost::bind(&Cluster::updateOutDone, this), - boost::bind(&Cluster::updateOutError, this, _1))); + boost::bind(&Cluster::updateOutError, this, _1), + cs)); } // Called in update thread. diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 4d994943f7..8c5eb06ff7 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -19,6 +19,7 @@ * */ +#include "ClusterSettings.h" #include "ClusterMap.h" #include "ConnectionMap.h" #include "Cpg.h" @@ -67,9 +68,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { typedef std::vector<ConnectionPtr> Connections; /** Construct the cluster in plugin earlyInitialize */ - Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum, - size_t readMax, size_t writeEstimate); - + Cluster(const ClusterSettings&, broker::Broker&); virtual ~Cluster(); /** Join the cluster in plugin initialize. Requires transport @@ -178,6 +177,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void setClusterId(const framing::Uuid&); // Immutable members set on construction, never changed. + ClusterSettings settings; broker::Broker& broker; qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle boost::shared_ptr<sys::Poller> poller; diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index d54d8389e0..266e7f00b0 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -18,6 +18,7 @@ #include "Connection.h" #include "ConnectionCodec.h" +#include "ClusterSettings.h" #include "qpid/cluster/Cluster.h" #include "qpid/cluster/ConnectionCodec.h" @@ -35,6 +36,7 @@ #include "qpid/broker/Exchange.h" #include "qpid/broker/Queue.h" #include "qpid/broker/SessionState.h" +#include "qpid/client/ConnectionSettings.h" #include <boost/utility/in_place_factory.hpp> #include <boost/scoped_ptr.hpp> @@ -48,40 +50,31 @@ using management::IdAllocator; using management::ManagementAgent; using management::ManagementBroker; -struct ClusterValues { - string name; - string url; - bool quorum; - size_t readMax, writeEstimate; - - ClusterValues() : quorum(false), readMax(10), writeEstimate(64) {} - - Url getUrl(uint16_t port) const { - if (url.empty()) return Url::getIpAddressesUrl(port); - return Url(url); - } -}; -/** Note separating options from values to work around boost version differences. +/** Note separating options from settings to work around boost version differences. * Old boost takes a reference to options objects, but new boost makes a copy. * New boost allows a shared_ptr but that's not compatible with old boost. */ struct ClusterOptions : public Options { - ClusterValues& values; + ClusterSettings& settings; - ClusterOptions(ClusterValues& v) : Options("Cluster Options"), values(v) { + ClusterOptions(ClusterSettings& v) : Options("Cluster Options"), settings(v) { addOptions() - ("cluster-name", optValue(values.name, "NAME"), "Name of cluster to join") - ("cluster-url", optValue(values.url,"URL"), + ("cluster-name", optValue(settings.name, "NAME"), "Name of cluster to join") + ("cluster-url", optValue(settings.url,"URL"), "URL of this broker, advertized to the cluster.\n" "Defaults to a URL listing all the local IP addresses\n") + ("cluster-username", optValue(settings.username, ""), "Username for connections between brokers") + ("cluster-password", optValue(settings.password, ""), "Password for connections between brokers") + ("cluster-mechanism", optValue(settings.mechanism, ""), "Authentication mechanism for connections between brokers") #if HAVE_LIBCMAN - ("cluster-cman", optValue(values.quorum), "Integrate with Cluster Manager (CMAN) cluster.") + ("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.") #endif - ("cluster-read-max", optValue(values.readMax,"N"), + ("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: Limit per-client-connection queue of read buffers. 0=no limit.") - ("cluster-write-estimate", optValue(values.writeEstimate, "Kb"), - "Experimental: initial estimate for connection write rate per multicast cycle"); + ("cluster-write-estimate", optValue(settings.writeEstimate, "Kb"), + "Experimental: initial estimate for connection write rate per multicast cycle") + ; } }; @@ -127,26 +120,20 @@ struct UpdateClientIdAllocator : management::IdAllocator struct ClusterPlugin : public Plugin { - ClusterValues values; + ClusterSettings settings; ClusterOptions options; Cluster* cluster; boost::scoped_ptr<ConnectionCodec::Factory> factory; - ClusterPlugin() : options(values), cluster(0) {} + ClusterPlugin() : options(settings), cluster(0) {} Options* getOptions() { return &options; } void earlyInitialize(Plugin::Target& target) { - if (values.name.empty()) return; // Only if --cluster-name option was specified. + if (settings.name.empty()) return; // Only if --cluster-name option was specified. Broker* broker = dynamic_cast<Broker*>(&target); if (!broker) return; - cluster = new Cluster( - values.name, - values.url.empty() ? Url() : Url(values.url), - *broker, - values.quorum, - values.readMax, values.writeEstimate*1024 - ); + cluster = new Cluster(settings, *broker); broker->setConnectionFactory( boost::shared_ptr<sys::ConnectionCodec::Factory>( new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); diff --git a/cpp/src/qpid/cluster/ClusterSettings.h b/cpp/src/qpid/cluster/ClusterSettings.h new file mode 100644 index 0000000000..a8f33be75e --- /dev/null +++ b/cpp/src/qpid/cluster/ClusterSettings.h @@ -0,0 +1,48 @@ +#ifndef QPID_CLUSTER_CLUSTERSETTINGS_H +#define QPID_CLUSTER_CLUSTERSETTINGS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/Url.h> +#include <string> + +namespace qpid { +namespace cluster { + +struct ClusterSettings { + std::string name; + std::string url; + bool quorum; + size_t readMax, writeEstimate; + std::string username, password, mechanism; + + ClusterSettings() : quorum(false), readMax(10), writeEstimate(64), username("guest"), password("guest") {} + + Url getUrl(uint16_t port) const { + if (url.empty()) return Url::getIpAddressesUrl(port); + return Url(url); + } +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_CLUSTERSETTINGS_H*/ diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 4a13d24499..1a3f7c4ef7 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -253,10 +253,11 @@ void Connection::sessionState( QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); } -void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { +void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username) { ConnectionId shadow = ConnectionId(memberId, connectionId); QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadow); self = shadow; + connection.setUserId(username); } void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) { diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 1637b8609c..98b47e1bc0 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -117,7 +117,7 @@ class Connection : const framing::SequenceNumber& received, const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete); - void shadowReady(uint64_t memberId, uint64_t connectionId); + void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username); void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId); diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index e50c936b50..18746ccb7e 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -89,21 +89,22 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con broker::Broker& broker, const ClusterMap& m, uint64_t frameId_, const Cluster::Connections& cons, const boost::function<void()>& ok, - const boost::function<void(const std::exception&)>& fail) + const boost::function<void(const std::exception&)>& fail, + const client::ConnectionSettings& cs +) : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), frameId(frameId_), connections(cons), connection(catchUpConnection()), shadowConnection(catchUpConnection()), - done(ok), failed(fail) + done(ok), failed(fail) { - connection.open(url); + connection.open(url, cs); session = connection.newSession("update_shared"); } UpdateClient::~UpdateClient() {} // Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges. -static const char UPDATE_CHARS[] = "qpid.qpid-update"; -const std::string UpdateClient::UPDATE(UPDATE_CHARS, sizeof(UPDATE_CHARS)); +const std::string UpdateClient::UPDATE("qpid.qpid-update"); void UpdateClient::update() { QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl); @@ -232,7 +233,9 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1)); ClusterConnectionProxy(shadowConnection).shadowReady( updateConnection->getId().getMember(), - reinterpret_cast<uint64_t>(updateConnection->getId().getPointer())); + reinterpret_cast<uint64_t>(updateConnection->getId().getPointer()), + updateConnection->getBrokerConnection().getUserId() + ); shadowConnection.close(); QPID_LOG(debug, updaterId << " updated connection " << *updateConnection); } diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h index 0819eb4cdb..23f647c820 100644 --- a/cpp/src/qpid/cluster/UpdateClient.h +++ b/cpp/src/qpid/cluster/UpdateClient.h @@ -66,7 +66,9 @@ class UpdateClient : public sys::Runnable { broker::Broker& donor, const ClusterMap& map, uint64_t sequence, const std::vector<boost::intrusive_ptr<Connection> >& , const boost::function<void()>& done, - const boost::function<void(const std::exception&)>& fail); + const boost::function<void(const std::exception&)>& fail, + const client::ConnectionSettings& + ); ~UpdateClient(); void update(); |
