diff options
| author | Michael Goulish <mgoulish@apache.org> | 2010-05-14 08:56:45 +0000 |
|---|---|---|
| committer | Michael Goulish <mgoulish@apache.org> | 2010-05-14 08:56:45 +0000 |
| commit | d9af71e691e50d7c9f3f16cd259298d3b8f0cd14 (patch) | |
| tree | 720c505bd510a48bf3555ac4971b8fca66fd746a /cpp/src/qpid/cluster | |
| parent | 1318c94eff0722c27c9c45d9844485e30cd954f6 (diff) | |
| download | qpid-python-d9af71e691e50d7c9f3f16cd259298d3b8f0cd14.tar.gz | |
Cluster + Security
-----------------------------------
* initial observation of a problem was a 2% failure rate in perftests
of 20,000 messages against a cluster with security enabled.
Problem was occasional receit of encrypted frames before the
security codec had been enabled. This is fixed with locking in
cluster code (no new locks in broker code) and a callback that is
fired by broker::ConnectionHandler::Handler to tell the cluster
code when the opening handshake has finished.
This was never a problem in the non-clustered broker before because
everything happened in a single thread.
* the brokers that "shadow" the connection must not have null
authenticators rather than real ones, so that they go through all
the motions but don't do anythig. Only the directly-connected
broker can perform the security handshake.
* once the directly-connected broker receives the real user ID
from its callback, it mcasts that ID to all other brokers.
Otherwise the shadowing brokers will al think that the user ID
is "anonymous".
Check this by doing a substantial perftest, and using
qpid-stat -c localhost:PORT
to confirm that the brokers all have the same userID for the
same connection.
* the user ID, negotiated during the Sasl security startup, is
communicated from the directly connected broker to all other
cluster brokers.
* If security is *not* being used, then this code should *not* tell
the brokers anything about the userID -- or it will step on the value
that is being set by other code pathways.
* test program at cpp/src/tests/cluster_authentication_soak is not yet
fully automated -- run it with something like
"sudo ./cluster_authentication_soak 500"
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@944158 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 11 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 77 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 16 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ConnectionCodec.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/SecureConnectionFactory.cpp | 73 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/SecureConnectionFactory.h | 58 |
6 files changed, 227 insertions, 10 deletions
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 955487ee03..75c8d328cf 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -21,6 +21,8 @@ #include "qpid/cluster/ConnectionCodec.h" #include "qpid/cluster/ClusterSettings.h" +#include "qpid/cluster/SecureConnectionFactory.h" + #include "qpid/cluster/Cluster.h" #include "qpid/cluster/ConnectionCodec.h" #include "qpid/cluster/UpdateClient.h" @@ -75,6 +77,8 @@ struct ClusterOptions : public Options { } }; +typedef boost::shared_ptr<sys::ConnectionCodec::Factory> CodecFactoryPtr; + struct ClusterPlugin : public Plugin { ClusterSettings settings; @@ -94,9 +98,10 @@ struct ClusterPlugin : public Plugin { Broker* broker = dynamic_cast<Broker*>(&target); if (!broker) return; cluster = new Cluster(settings, *broker); - broker->setConnectionFactory( - boost::shared_ptr<sys::ConnectionCodec::Factory>( - new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); + CodecFactoryPtr simpleFactory(new broker::ConnectionFactory(*broker)); + CodecFactoryPtr clusterFactory(new ConnectionCodec::Factory(simpleFactory, *cluster)); + CodecFactoryPtr secureFactory(new SecureConnectionFactory(clusterFactory)); + broker->setConnectionFactory(secureFactory); } void disallowManagementMethods(ManagementAgent* agent) { diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 30828d7bd9..118be27bb5 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -39,6 +39,7 @@ #include "qpid/framing/DeliveryProperties.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionAnnounceBody.h" +#include "qpid/framing/ClusterConnectionSecureUserIdBody.h" #include "qpid/framing/ConnectionCloseBody.h" #include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" @@ -46,6 +47,9 @@ #include <boost/current_function.hpp> + +typedef boost::function<void ( std::string& )> UserIdCallback; + // TODO aconway 2008-11-03: // // Refactor code for receiving an update into a separate UpdateConnection @@ -59,6 +63,7 @@ namespace cluster { using namespace framing; using namespace framing::cluster; + qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL); Connection::NullFrameHandler Connection::nullFrameHandler; @@ -82,8 +87,11 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, connectionCtor(&output, cluster.getBroker(), mgmtId, external, false, 0, true), expectProtocolHeader(false), mcastFrameHandler(cluster.getMulticast(), self), - updateIn(c.getUpdateReceiver()) -{} + updateIn(c.getUpdateReceiver()), + secureConnection(0), + mcastSentButNotReceived(false), + inConnectionNegotiation(true) +{ } // Local connection Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, @@ -98,7 +106,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, isCatchUp), // isCatchUp => shadow expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self), - updateIn(c.getUpdateReceiver()) + updateIn(c.getUpdateReceiver()), + secureConnection(0), + mcastSentButNotReceived(false) { cluster.addLocalConnection(this); if (isLocalClient()) { @@ -120,13 +130,19 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, updateIn.nextShadowMgmtId.clear(); init(); } + +} + +void Connection::setSecureConnection(broker::SecureConnection* sc) { + secureConnection = sc; } void Connection::init() { connection = connectionCtor.construct(); QPID_LOG(debug, cluster << " initialized connection: " << *this << " ssf=" << connection->getExternalSecuritySettings().ssf); - if (isLocalClient()) { + if (isLocalClient()) { + if (secureConnection) connection->setSecureConnection(secureConnection); // Actively send cluster-order frames from local node connection->setClusterOrderOutput(mcastFrameHandler); } @@ -138,9 +154,19 @@ void Connection::init() { } if (!isCatchUp()) connection->setErrorListener(this); + UserIdCallback fn = boost::bind ( &Connection::mcastUserId, this, _1 ); + connection->setUserIdCallback ( fn ); } void Connection::giveReadCredit(int credit) { + { + sys::Mutex::ScopedLock l(connectionNegotiationMonitor); + if (inConnectionNegotiation) { + mcastSentButNotReceived = false; + connectionNegotiationMonitor.notify(); + } + } + if (cluster.getSettings().readMax && credit) output.giveReadCredit(credit); } @@ -278,8 +304,9 @@ void Connection::abort() { cluster.erase(self); } -// ConnectoinCodec::decode receives read buffers from directly-connected clients. +// ConnectionCodec::decode receives read buffers from directly-connected clients. size_t Connection::decode(const char* buffer, size_t size) { + if (catchUp) { // Handle catch-up locally. Buffer buf(const_cast<char*>(buffer), size); while (localDecoder.decode(buf)) @@ -289,6 +316,15 @@ size_t Connection::decode(const char* buffer, size_t size) { assert(isLocal()); const char* remainingData = buffer; size_t remainingSize = size; + + { // scope for scoped lock. + sys::Mutex::ScopedLock l(connectionNegotiationMonitor); + if ( inConnectionNegotiation ) { + assert(!mcastSentButNotReceived); + mcastSentButNotReceived = true; + } + } + if (expectProtocolHeader) { //If this is an outgoing link, we will receive a protocol //header which needs to be decoded first @@ -307,6 +343,13 @@ size_t Connection::decode(const char* buffer, size_t size) { } } cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self); + + { // scope for scoped lock. + sys::Mutex::ScopedLock l(connectionNegotiationMonitor); + if ( inConnectionNegotiation ) + while (inConnectionNegotiation && mcastSentButNotReceived) + connectionNegotiationMonitor.wait(); + } } return size; } @@ -570,5 +613,29 @@ void Connection::managementAgents(const std::string& data) { QPID_LOG(debug, cluster << " updated management agents"); } + +// Only the direct, non-shadow gets this call. +void Connection::mcastUserId ( std::string & id ) { + cluster.getMulticast().mcastControl( ClusterConnectionSecureUserIdBody(ProtocolVersion(), string(id)), getId() ); + + { + sys::Mutex::ScopedLock l(connectionNegotiationMonitor); + inConnectionNegotiation = false; + connectionNegotiationMonitor.notify(); + } +} + +// All connections, shadow or not, get this call. +void Connection::secureUserId(const std::string& id) { + if ( isShadow() ) { + // If the user ID is "none", it is not legitimate. Take no action. + if ( strcmp ( id.c_str(), "none" ) ) { + connection->setUserId ( id ); + } + } +} + + + }} // Namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 000d00f7d9..4f69bf7cf4 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -29,6 +29,7 @@ #include "UpdateReceiver.h" #include "qpid/broker/Connection.h" +#include "qpid/broker/SecureConnection.h" #include "qpid/broker/SemanticState.h" #include "qpid/amqp_0_10/Connection.h" #include "qpid/sys/AtomicValue.h" @@ -64,7 +65,7 @@ class Connection : { public: - + /** Local connection. */ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, MemberId, bool catchUp, bool isLink, const qpid::sys::SecuritySettings& external); @@ -164,6 +165,7 @@ class Connection : void giveReadCredit(int credit); void announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict); + void secureUserId(const std::string&); void abort(); void deliverClose(); @@ -176,6 +178,13 @@ class Connection : //uint32_t getSsf() const { return connectionCtor.external.ssf; } + void setSecureConnection ( broker::SecureConnection * sc ); + + // This is a callback, registered with the broker connection. + // It gives me the user ID, if one is negotiated through Sasl. + void mcastUserId ( std::string & ); + + private: struct NullFrameHandler : public framing::FrameHandler { void handle(framing::AMQFrame&) {} @@ -237,8 +246,13 @@ class Connection : bool expectProtocolHeader; McastFrameHandler mcastFrameHandler; UpdateReceiver& updateIn; + qpid::broker::SecureConnection* secureConnection; static qpid::sys::AtomicValue<uint64_t> catchUpId; + + mutable sys::Monitor connectionNegotiationMonitor; + bool mcastSentButNotReceived; + bool inConnectionNegotiation; friend std::ostream& operator<<(std::ostream&, const Connection&); }; diff --git a/cpp/src/qpid/cluster/ConnectionCodec.h b/cpp/src/qpid/cluster/ConnectionCodec.h index 4b919ed351..17a08904d9 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.h +++ b/cpp/src/qpid/cluster/ConnectionCodec.h @@ -70,7 +70,7 @@ class ConnectionCodec : public sys::ConnectionCodec { void closed(); bool isClosed() const; framing::ProtocolVersion getVersion() const; - + void setSecureConnection(broker::SecureConnection* sc) { interceptor->setSecureConnection(sc); } private: amqp_0_10::Connection codec; diff --git a/cpp/src/qpid/cluster/SecureConnectionFactory.cpp b/cpp/src/qpid/cluster/SecureConnectionFactory.cpp new file mode 100644 index 0000000000..6ddef66226 --- /dev/null +++ b/cpp/src/qpid/cluster/SecureConnectionFactory.cpp @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/cluster/SecureConnectionFactory.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/cluster/ConnectionCodec.h" +#include "qpid/broker/SecureConnection.h" +#include "qpid/sys/SecuritySettings.h" +#include "qpid/log/Statement.h" +#include <memory> + + +namespace qpid { +namespace cluster { + +using framing::ProtocolVersion; +using qpid::sys::SecuritySettings; +using qpid::broker::SecureConnection; + +typedef std::auto_ptr<qpid::broker::SecureConnection> SecureConnectionPtr; +typedef std::auto_ptr<qpid::sys::ConnectionCodec> CodecPtr; + +SecureConnectionFactory::SecureConnectionFactory(CodecFactoryPtr f) : codecFactory(f) { +} + +sys::ConnectionCodec* +SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id, + const SecuritySettings& external) { + CodecPtr codec(codecFactory->create(v, out, id, external)); + ConnectionCodec* clusterCodec = dynamic_cast<qpid::cluster::ConnectionCodec*>(codec.get()); + if (clusterCodec) { + SecureConnectionPtr sc(new SecureConnection()); + clusterCodec->setSecureConnection(sc.get()); + sc->setCodec(codec); + return sc.release(); + } + return 0; +} + +sys::ConnectionCodec* +SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id, + const SecuritySettings& external) { + // used to create connections from one broker to another + CodecPtr codec(codecFactory->create(out, id, external)); + ConnectionCodec* clusterCodec = dynamic_cast<qpid::cluster::ConnectionCodec*>(codec.get()); + if (clusterCodec) { + SecureConnectionPtr sc(new SecureConnection()); + clusterCodec->setSecureConnection(sc.get()); + sc->setCodec(codec); + return sc.release(); + } + return 0; +} + + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/SecureConnectionFactory.h b/cpp/src/qpid/cluster/SecureConnectionFactory.h new file mode 100644 index 0000000000..24d1fcfee5 --- /dev/null +++ b/cpp/src/qpid/cluster/SecureConnectionFactory.h @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef QPID_CLUSTER_SecureconnectionFactory +#define QPID_CLUSTER_SecureconnectionFactory + +#include "qpid/sys/ConnectionCodec.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { + +namespace broker { + class Broker; +} + +namespace cluster { + +class SecureConnectionFactory : public qpid::sys::ConnectionCodec::Factory +{ + public: + typedef boost::shared_ptr<qpid::sys::ConnectionCodec::Factory> CodecFactoryPtr; + SecureConnectionFactory(CodecFactoryPtr f); + + qpid::sys::ConnectionCodec* create( + framing::ProtocolVersion, qpid::sys::OutputControl&, const std::string& id, + const qpid::sys::SecuritySettings& + ); + + /** Return "preferred" codec for outbound connections. */ + qpid::sys::ConnectionCodec* create( + qpid::sys::OutputControl&, const std::string& id, const qpid::sys::SecuritySettings& + ); + + private: + CodecFactoryPtr codecFactory; +}; + +}} // namespace qpid::cluster + + +#endif // QPID_CLUSTER_SecureconnectionFactory |
