summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp21
-rw-r--r--cpp/src/qpid/cluster/Cluster.h6
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp51
-rw-r--r--cpp/src/qpid/cluster/ClusterSettings.h48
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp3
-rw-r--r--cpp/src/qpid/cluster/Connection.h2
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp15
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.h4
8 files changed, 99 insertions, 51 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index d9a5125760..f845492dbc 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -17,6 +17,7 @@
*/
#include "Cluster.h"
+#include "ClusterSettings.h"
#include "Connection.h"
#include "UpdateClient.h"
#include "FailoverExchange.h"
@@ -82,16 +83,17 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
};
-Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_, size_t writeEstimate_) :
+Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
+ settings(set),
broker(b),
mgmtObject(0),
poller(b.getPoller()),
cpg(*this),
- name(name_),
- myUrl(url_),
+ name(settings.name),
+ myUrl(settings.url.empty() ? Url() : Url(settings.url)),
myId(cpg.self()),
- readMax(readMax_),
- writeEstimate(writeEstimate_),
+ readMax(settings.readMax),
+ writeEstimate(settings.writeEstimate),
mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1),
@@ -121,7 +123,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
}
failoverExchange.reset(new FailoverExchange(this));
- if (quorum_) quorum.init();
+ if (settings.quorum) quorum.init();
cpg.join(name);
// pump the CPG dispatch manually till we get initialized.
while (!initialized)
@@ -425,10 +427,15 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) {
QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url);
deliverFrameQueue.stop();
if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
+ client::ConnectionSettings cs;
+ cs.username = settings.username;
+ cs.password = settings.password;
+ cs.mechanism = settings.mechanism;
updateThread = Thread(
new UpdateClient(myId, updatee, url, broker, map, frameId, connections.values(),
boost::bind(&Cluster::updateOutDone, this),
- boost::bind(&Cluster::updateOutError, this, _1)));
+ boost::bind(&Cluster::updateOutError, this, _1),
+ cs));
}
// Called in update thread.
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 4d994943f7..8c5eb06ff7 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -19,6 +19,7 @@
*
*/
+#include "ClusterSettings.h"
#include "ClusterMap.h"
#include "ConnectionMap.h"
#include "Cpg.h"
@@ -67,9 +68,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
typedef std::vector<ConnectionPtr> Connections;
/** Construct the cluster in plugin earlyInitialize */
- Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum,
- size_t readMax, size_t writeEstimate);
-
+ Cluster(const ClusterSettings&, broker::Broker&);
virtual ~Cluster();
/** Join the cluster in plugin initialize. Requires transport
@@ -178,6 +177,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void setClusterId(const framing::Uuid&);
// Immutable members set on construction, never changed.
+ ClusterSettings settings;
broker::Broker& broker;
qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle
boost::shared_ptr<sys::Poller> poller;
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index d54d8389e0..266e7f00b0 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -18,6 +18,7 @@
#include "Connection.h"
#include "ConnectionCodec.h"
+#include "ClusterSettings.h"
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/ConnectionCodec.h"
@@ -35,6 +36,7 @@
#include "qpid/broker/Exchange.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/SessionState.h"
+#include "qpid/client/ConnectionSettings.h"
#include <boost/utility/in_place_factory.hpp>
#include <boost/scoped_ptr.hpp>
@@ -48,40 +50,31 @@ using management::IdAllocator;
using management::ManagementAgent;
using management::ManagementBroker;
-struct ClusterValues {
- string name;
- string url;
- bool quorum;
- size_t readMax, writeEstimate;
-
- ClusterValues() : quorum(false), readMax(10), writeEstimate(64) {}
-
- Url getUrl(uint16_t port) const {
- if (url.empty()) return Url::getIpAddressesUrl(port);
- return Url(url);
- }
-};
-/** Note separating options from values to work around boost version differences.
+/** Note separating options from settings to work around boost version differences.
* Old boost takes a reference to options objects, but new boost makes a copy.
* New boost allows a shared_ptr but that's not compatible with old boost.
*/
struct ClusterOptions : public Options {
- ClusterValues& values;
+ ClusterSettings& settings;
- ClusterOptions(ClusterValues& v) : Options("Cluster Options"), values(v) {
+ ClusterOptions(ClusterSettings& v) : Options("Cluster Options"), settings(v) {
addOptions()
- ("cluster-name", optValue(values.name, "NAME"), "Name of cluster to join")
- ("cluster-url", optValue(values.url,"URL"),
+ ("cluster-name", optValue(settings.name, "NAME"), "Name of cluster to join")
+ ("cluster-url", optValue(settings.url,"URL"),
"URL of this broker, advertized to the cluster.\n"
"Defaults to a URL listing all the local IP addresses\n")
+ ("cluster-username", optValue(settings.username, ""), "Username for connections between brokers")
+ ("cluster-password", optValue(settings.password, ""), "Password for connections between brokers")
+ ("cluster-mechanism", optValue(settings.mechanism, ""), "Authentication mechanism for connections between brokers")
#if HAVE_LIBCMAN
- ("cluster-cman", optValue(values.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
+ ("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
#endif
- ("cluster-read-max", optValue(values.readMax,"N"),
+ ("cluster-read-max", optValue(settings.readMax,"N"),
"Experimental: Limit per-client-connection queue of read buffers. 0=no limit.")
- ("cluster-write-estimate", optValue(values.writeEstimate, "Kb"),
- "Experimental: initial estimate for connection write rate per multicast cycle");
+ ("cluster-write-estimate", optValue(settings.writeEstimate, "Kb"),
+ "Experimental: initial estimate for connection write rate per multicast cycle")
+ ;
}
};
@@ -127,26 +120,20 @@ struct UpdateClientIdAllocator : management::IdAllocator
struct ClusterPlugin : public Plugin {
- ClusterValues values;
+ ClusterSettings settings;
ClusterOptions options;
Cluster* cluster;
boost::scoped_ptr<ConnectionCodec::Factory> factory;
- ClusterPlugin() : options(values), cluster(0) {}
+ ClusterPlugin() : options(settings), cluster(0) {}
Options* getOptions() { return &options; }
void earlyInitialize(Plugin::Target& target) {
- if (values.name.empty()) return; // Only if --cluster-name option was specified.
+ if (settings.name.empty()) return; // Only if --cluster-name option was specified.
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
- cluster = new Cluster(
- values.name,
- values.url.empty() ? Url() : Url(values.url),
- *broker,
- values.quorum,
- values.readMax, values.writeEstimate*1024
- );
+ cluster = new Cluster(settings, *broker);
broker->setConnectionFactory(
boost::shared_ptr<sys::ConnectionCodec::Factory>(
new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
diff --git a/cpp/src/qpid/cluster/ClusterSettings.h b/cpp/src/qpid/cluster/ClusterSettings.h
new file mode 100644
index 0000000000..a8f33be75e
--- /dev/null
+++ b/cpp/src/qpid/cluster/ClusterSettings.h
@@ -0,0 +1,48 @@
+#ifndef QPID_CLUSTER_CLUSTERSETTINGS_H
+#define QPID_CLUSTER_CLUSTERSETTINGS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/Url.h>
+#include <string>
+
+namespace qpid {
+namespace cluster {
+
+struct ClusterSettings {
+ std::string name;
+ std::string url;
+ bool quorum;
+ size_t readMax, writeEstimate;
+ std::string username, password, mechanism;
+
+ ClusterSettings() : quorum(false), readMax(10), writeEstimate(64), username("guest"), password("guest") {}
+
+ Url getUrl(uint16_t port) const {
+ if (url.empty()) return Url::getIpAddressesUrl(port);
+ return Url(url);
+ }
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_CLUSTERSETTINGS_H*/
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 4a13d24499..1a3f7c4ef7 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -253,10 +253,11 @@ void Connection::sessionState(
QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
}
-void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
+void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username) {
ConnectionId shadow = ConnectionId(memberId, connectionId);
QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadow);
self = shadow;
+ connection.setUserId(username);
}
void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) {
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 1637b8609c..98b47e1bc0 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -117,7 +117,7 @@ class Connection :
const framing::SequenceNumber& received,
const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
- void shadowReady(uint64_t memberId, uint64_t connectionId);
+ void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username);
void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId);
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index e50c936b50..18746ccb7e 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -89,21 +89,22 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con
broker::Broker& broker, const ClusterMap& m, uint64_t frameId_,
const Cluster::Connections& cons,
const boost::function<void()>& ok,
- const boost::function<void(const std::exception&)>& fail)
+ const boost::function<void(const std::exception&)>& fail,
+ const client::ConnectionSettings& cs
+)
: updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m),
frameId(frameId_), connections(cons),
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
- done(ok), failed(fail)
+ done(ok), failed(fail)
{
- connection.open(url);
+ connection.open(url, cs);
session = connection.newSession("update_shared");
}
UpdateClient::~UpdateClient() {}
// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
-static const char UPDATE_CHARS[] = "qpid.qpid-update";
-const std::string UpdateClient::UPDATE(UPDATE_CHARS, sizeof(UPDATE_CHARS));
+const std::string UpdateClient::UPDATE("qpid.qpid-update");
void UpdateClient::update() {
QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl);
@@ -232,7 +233,9 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1));
ClusterConnectionProxy(shadowConnection).shadowReady(
updateConnection->getId().getMember(),
- reinterpret_cast<uint64_t>(updateConnection->getId().getPointer()));
+ reinterpret_cast<uint64_t>(updateConnection->getId().getPointer()),
+ updateConnection->getBrokerConnection().getUserId()
+ );
shadowConnection.close();
QPID_LOG(debug, updaterId << " updated connection " << *updateConnection);
}
diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h
index 0819eb4cdb..23f647c820 100644
--- a/cpp/src/qpid/cluster/UpdateClient.h
+++ b/cpp/src/qpid/cluster/UpdateClient.h
@@ -66,7 +66,9 @@ class UpdateClient : public sys::Runnable {
broker::Broker& donor, const ClusterMap& map, uint64_t sequence,
const std::vector<boost::intrusive_ptr<Connection> >& ,
const boost::function<void()>& done,
- const boost::function<void(const std::exception&)>& fail);
+ const boost::function<void(const std::exception&)>& fail,
+ const client::ConnectionSettings&
+ );
~UpdateClient();
void update();