diff options
| author | Alan Conway <aconway@apache.org> | 2011-12-06 15:56:40 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2011-12-06 15:56:40 +0000 |
| commit | 50083a9b6553d832856bc7d402dd186f74d80254 (patch) | |
| tree | b6b0c097cd643e51b7c1615e6b2f8c2aca850efa /cpp/src/qpid/cluster/Cluster.cpp | |
| parent | efd035d01dd87dd146f3fc6aacabc8c28b10316d (diff) | |
| download | qpid-python-50083a9b6553d832856bc7d402dd186f74d80254.tar.gz | |
QPID-3652: Fix cluster authentication.
Only allow brokers that authenticate as the cluster-username to join a cluster.
New broker first connects to a cluster broker authenticates as the cluster-username
and sends its CPG member ID to the qpid.cluster-credentials exchange.
The cluster broker that subsequently acts as updater verifies that the credentials are
valid before connecting to give the update.
NOTE 1: If you are using an ACL, the cluster-username must be allowed to
publish to the qpid.cluster-credentials exchange. E.g. in your ACL file:
acl allow foo@QPID publish exchange name=qpid.cluster-credentials
NOTE 2: This changes the cluster initialization protocol, you will
need to restart the cluster with all new version brokers.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1210989 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 54 |
1 files changed, 49 insertions, 5 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 0241b0946b..40bfcd9285 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -130,6 +130,7 @@ #include "qpid/cluster/UpdateDataExchange.h" #include "qpid/cluster/UpdateExchange.h" #include "qpid/cluster/ClusterTimer.h" +#include "qpid/cluster/CredentialsExchange.h" #include "qpid/assert.h" #include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" @@ -162,6 +163,7 @@ #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Helpers.h" #include "qpid/log/Statement.h" +#include "qpid/UrlArray.h" #include "qpid/management/ManagementAgent.h" #include "qpid/memory.h" #include "qpid/sys/Thread.h" @@ -189,6 +191,7 @@ using management::ManagementObject; using management::Manageable; using management::Args; namespace _qmf = ::qmf::org::apache::qpid::cluster; +namespace arg=client::arg; /** * NOTE: must increment this number whenever any incompatible changes in @@ -199,7 +202,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 1159330; +const uint32_t Cluster::CLUSTER_VERSION = 1207877; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -211,12 +214,12 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void initialStatus(uint32_t version, bool active, const Uuid& clusterId, uint8_t storeState, const Uuid& shutdownId, - const std::string& firstConfig) + const std::string& firstConfig, const framing::Array& urls) { cluster.initialStatus( member, version, active, clusterId, framing::cluster::StoreState(storeState), shutdownId, - firstConfig, l); + firstConfig, urls, l); } void ready(const std::string& url) { cluster.ready(member, url, l); @@ -267,6 +270,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : poller), failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)), updateDataExchange(new UpdateDataExchange(*this)), + credentialsExchange(new CredentialsExchange(*this)), quorum(boost::bind(&Cluster::leave, this)), decoder(boost::bind(&Cluster::deliverFrame, this, _1)), discarding(true), @@ -300,6 +304,9 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : // for single control frame. broker.getExchanges().registerExchange(updateDataExchange); + // CredentialsExchange is used to authenticate new cluster members + broker.getExchanges().registerExchange(credentialsExchange); + // Load my store status before we go into initialization if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { store.load(); @@ -661,6 +668,7 @@ void Cluster::initMapCompleted(Lock& l) { setClusterId(initMap.getClusterId(), l); if (initMap.isUpdateNeeded()) { // Joining established cluster. + authenticate(); broker.setRecovery(false); // Ditch my current store. broker.setClusterUpdatee(true); if (mAgent) mAgent->suppress(true); // Suppress mgmt output during update. @@ -711,7 +719,8 @@ void Cluster::configChange(const MemberId&, ClusterInitialStatusBody( ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, store.getState(), store.getShutdownId(), - initMap.getFirstConfigStr() + initMap.getFirstConfigStr(), + vectorToUrlArray(getUrls(l)) ), self); } @@ -803,6 +812,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ framing::cluster::StoreState store, const framing::Uuid& shutdownId, const std::string& firstConfig, + const framing::Array& urls, Lock& l) { if (version != CLUSTER_VERSION) { @@ -816,7 +826,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ initMap.received( member, ClusterInitialStatusBody(ProtocolVersion(), version, active, id, - store, shutdownId, firstConfig) + store, shutdownId, firstConfig, urls) ); if (initMap.transitionToComplete()) initMapCompleted(l); } @@ -903,6 +913,11 @@ void Cluster::retractOffer(const MemberId& updater, uint64_t updateeInt, Lock& l } void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { + // Check for credentials if authentication is enabled. + if (broker.getOptions().auth && !credentialsExchange->check(updatee)) { + QPID_LOG(error, "Un-authenticated attempt to join the cluster"); + return; + } // NOTE: deliverEventQueue is already stopped at the stall point by deliveredEvent. if (state == LEFT) return; assert(state == OFFER); @@ -1115,6 +1130,35 @@ void Cluster::updateMgmtMembership(Lock& l) { mgmtObject->set_memberIDs(idstr); } +namespace { +template <class T> struct AutoClose { + T closeme; + AutoClose(T t) : closeme(t) {} + ~AutoClose() { closeme.close(); } +}; +} + +// Updatee connects to established member and stores credentials +// in the qpid.cluster-credentials exchange to prove it +// is safe for updater to connect and give an update. +void Cluster::authenticate() { + if (!broker.getOptions().auth) return; + std::vector<Url> urls = initMap.getUrls(); + for (std::vector<Url>::iterator i = urls.begin(); i != urls.end(); ++i) { + if (!i->empty()) { + client::Connection c; + c.open(*i, connectionSettings(settings)); + AutoClose<client::Connection> closeConnection(c); + client::Session s = c.newSession(CredentialsExchange::NAME); + AutoClose<client::Session> closeSession(s); + client::Message credentials; + credentials.getHeaders().setUInt64(CredentialsExchange::NAME, getId()); + s.messageTransfer(arg::content=credentials, arg::destination=CredentialsExchange::NAME); + s.sync(); + } + } +} + std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { static const char* STATE[] = { "PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP", |
