summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp54
1 files changed, 49 insertions, 5 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 0241b0946b..40bfcd9285 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -130,6 +130,7 @@
#include "qpid/cluster/UpdateDataExchange.h"
#include "qpid/cluster/UpdateExchange.h"
#include "qpid/cluster/ClusterTimer.h"
+#include "qpid/cluster/CredentialsExchange.h"
#include "qpid/assert.h"
#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
@@ -162,6 +163,7 @@
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Helpers.h"
#include "qpid/log/Statement.h"
+#include "qpid/UrlArray.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/memory.h"
#include "qpid/sys/Thread.h"
@@ -189,6 +191,7 @@ using management::ManagementObject;
using management::Manageable;
using management::Args;
namespace _qmf = ::qmf::org::apache::qpid::cluster;
+namespace arg=client::arg;
/**
* NOTE: must increment this number whenever any incompatible changes in
@@ -199,7 +202,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 1159330;
+const uint32_t Cluster::CLUSTER_VERSION = 1207877;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -211,12 +214,12 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void initialStatus(uint32_t version, bool active, const Uuid& clusterId,
uint8_t storeState, const Uuid& shutdownId,
- const std::string& firstConfig)
+ const std::string& firstConfig, const framing::Array& urls)
{
cluster.initialStatus(
member, version, active, clusterId,
framing::cluster::StoreState(storeState), shutdownId,
- firstConfig, l);
+ firstConfig, urls, l);
}
void ready(const std::string& url) {
cluster.ready(member, url, l);
@@ -267,6 +270,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
poller),
failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)),
updateDataExchange(new UpdateDataExchange(*this)),
+ credentialsExchange(new CredentialsExchange(*this)),
quorum(boost::bind(&Cluster::leave, this)),
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
@@ -300,6 +304,9 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
// for single control frame.
broker.getExchanges().registerExchange(updateDataExchange);
+ // CredentialsExchange is used to authenticate new cluster members
+ broker.getExchanges().registerExchange(credentialsExchange);
+
// Load my store status before we go into initialization
if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
store.load();
@@ -661,6 +668,7 @@ void Cluster::initMapCompleted(Lock& l) {
setClusterId(initMap.getClusterId(), l);
if (initMap.isUpdateNeeded()) { // Joining established cluster.
+ authenticate();
broker.setRecovery(false); // Ditch my current store.
broker.setClusterUpdatee(true);
if (mAgent) mAgent->suppress(true); // Suppress mgmt output during update.
@@ -711,7 +719,8 @@ void Cluster::configChange(const MemberId&,
ClusterInitialStatusBody(
ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
store.getState(), store.getShutdownId(),
- initMap.getFirstConfigStr()
+ initMap.getFirstConfigStr(),
+ vectorToUrlArray(getUrls(l))
),
self);
}
@@ -803,6 +812,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ
framing::cluster::StoreState store,
const framing::Uuid& shutdownId,
const std::string& firstConfig,
+ const framing::Array& urls,
Lock& l)
{
if (version != CLUSTER_VERSION) {
@@ -816,7 +826,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ
initMap.received(
member,
ClusterInitialStatusBody(ProtocolVersion(), version, active, id,
- store, shutdownId, firstConfig)
+ store, shutdownId, firstConfig, urls)
);
if (initMap.transitionToComplete()) initMapCompleted(l);
}
@@ -903,6 +913,11 @@ void Cluster::retractOffer(const MemberId& updater, uint64_t updateeInt, Lock& l
}
void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
+ // Check for credentials if authentication is enabled.
+ if (broker.getOptions().auth && !credentialsExchange->check(updatee)) {
+ QPID_LOG(error, "Un-authenticated attempt to join the cluster");
+ return;
+ }
// NOTE: deliverEventQueue is already stopped at the stall point by deliveredEvent.
if (state == LEFT) return;
assert(state == OFFER);
@@ -1115,6 +1130,35 @@ void Cluster::updateMgmtMembership(Lock& l) {
mgmtObject->set_memberIDs(idstr);
}
+namespace {
+template <class T> struct AutoClose {
+ T closeme;
+ AutoClose(T t) : closeme(t) {}
+ ~AutoClose() { closeme.close(); }
+};
+}
+
+// Updatee connects to established member and stores credentials
+// in the qpid.cluster-credentials exchange to prove it
+// is safe for updater to connect and give an update.
+void Cluster::authenticate() {
+ if (!broker.getOptions().auth) return;
+ std::vector<Url> urls = initMap.getUrls();
+ for (std::vector<Url>::iterator i = urls.begin(); i != urls.end(); ++i) {
+ if (!i->empty()) {
+ client::Connection c;
+ c.open(*i, connectionSettings(settings));
+ AutoClose<client::Connection> closeConnection(c);
+ client::Session s = c.newSession(CredentialsExchange::NAME);
+ AutoClose<client::Session> closeSession(s);
+ client::Message credentials;
+ credentials.getHeaders().setUInt64(CredentialsExchange::NAME, getId());
+ s.messageTransfer(arg::content=credentials, arg::destination=CredentialsExchange::NAME);
+ s.sync();
+ }
+ }
+}
+
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
static const char* STATE[] = {
"PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP",