diff options
author | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
commit | f83677056891e436bf5ba99e79240df2a44528cd (patch) | |
tree | 625bfd644b948e89105630759cf6decb0435354d /cpp/src/qpid/sys/SslPlugin.cpp | |
parent | ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (diff) | |
download | qpid-python-QPID-2519.tar.gz |
Merged out from trunkQPID-2519
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/SslPlugin.cpp')
-rw-r--r-- | cpp/src/qpid/sys/SslPlugin.cpp | 154 |
1 files changed, 123 insertions, 31 deletions
diff --git a/cpp/src/qpid/sys/SslPlugin.cpp b/cpp/src/qpid/sys/SslPlugin.cpp index b0e791d60b..ab15785492 100644 --- a/cpp/src/qpid/sys/SslPlugin.cpp +++ b/cpp/src/qpid/sys/SslPlugin.cpp @@ -25,6 +25,8 @@ #include "qpid/sys/ssl/check.h" #include "qpid/sys/ssl/util.h" #include "qpid/sys/ssl/SslHandler.h" +#include "qpid/sys/AsynchIOHandler.h" +#include "qpid/sys/AsynchIO.h" #include "qpid/sys/ssl/SslIo.h" #include "qpid/sys/ssl/SslSocket.h" #include "qpid/broker/Broker.h" @@ -37,15 +39,19 @@ namespace qpid { namespace sys { +using namespace qpid::sys::ssl; + struct SslServerOptions : ssl::SslOptions { uint16_t port; bool clientAuth; bool nodict; + bool multiplex; SslServerOptions() : port(5671), clientAuth(false), - nodict(false) + nodict(false), + multiplex(false) { addOptions() ("ssl-port", optValue(port, "PORT"), "Port on which to listen for SSL connections") @@ -56,29 +62,37 @@ struct SslServerOptions : ssl::SslOptions } }; -class SslProtocolFactory : public ProtocolFactory { +template <class T> +class SslProtocolFactoryTmpl : public ProtocolFactory { + private: + + typedef SslAcceptorTmpl<T> SslAcceptor; + const bool tcpNoDelay; - qpid::sys::ssl::SslSocket listener; + T listener; const uint16_t listeningPort; - std::auto_ptr<qpid::sys::ssl::SslAcceptor> acceptor; + std::auto_ptr<SslAcceptor> acceptor; bool nodict; public: - SslProtocolFactory(const SslServerOptions&, int backlog, bool nodelay); + SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); - void connect(Poller::shared_ptr, const std::string& host, int16_t port, + void connect(Poller::shared_ptr, const std::string& host, const std::string& port, ConnectionCodec::Factory*, boost::function2<void, int, std::string> failed); uint16_t getPort() const; - std::string getHost() const; bool supports(const std::string& capability); private: - void established(Poller::shared_ptr, const qpid::sys::ssl::SslSocket&, ConnectionCodec::Factory*, + void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, bool isClient); }; +typedef SslProtocolFactoryTmpl<SslSocket> SslProtocolFactory; +typedef SslProtocolFactoryTmpl<SslMuxSocket> SslMuxProtocolFactory; + + // Static instance to initialise plugin static struct SslPlugin : public Plugin { SslServerOptions options; @@ -87,24 +101,48 @@ static struct SslPlugin : public Plugin { ~SslPlugin() { ssl::shutdownNSS(); } - void earlyInitialize(Target&) { + void earlyInitialize(Target& target) { + broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); + if (broker && !options.certDbPath.empty()) { + const broker::Broker::Options& opts = broker->getOptions(); + + if (opts.port == options.port && // AMQP & AMQPS ports are the same + opts.port != 0) { + // The presence of this option is used to signal to the TCP + // plugin not to start listening on the shared port. The actual + // value cannot be configured through the command line or config + // file (other than by setting the ports to the same value) + // because we are only adding it after option parsing. + options.multiplex = true; + options.addOptions()("ssl-multiplex", optValue(options.multiplex), "Allow SSL and non-SSL connections on the same port"); + } + } } void initialize(Target& target) { + QPID_LOG(trace, "Initialising SSL plugin"); broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); // Only provide to a Broker if (broker) { if (options.certDbPath.empty()) { - QPID_LOG(info, "SSL plugin not enabled, you must set --ssl-cert-db to enable it."); + QPID_LOG(notice, "SSL plugin not enabled, you must set --ssl-cert-db to enable it."); } else { try { ssl::initNSS(options, true); const broker::Broker::Options& opts = broker->getOptions(); - ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(options, - opts.connectionBacklog, - opts.tcpNoDelay)); - QPID_LOG(notice, "Listening for SSL connections on TCP port " << protocol->getPort()); + + ProtocolFactory::shared_ptr protocol(options.multiplex ? + static_cast<ProtocolFactory*>(new SslMuxProtocolFactory(options, + opts.connectionBacklog, + opts.tcpNoDelay)) : + static_cast<ProtocolFactory*>(new SslProtocolFactory(options, + opts.connectionBacklog, + opts.tcpNoDelay))); + QPID_LOG(notice, "Listening for " << + (options.multiplex ? "SSL or TCP" : "SSL") << + " connections on TCP port " << + protocol->getPort()); broker->registerProtocolFactory("ssl", protocol); } catch (const std::exception& e) { QPID_LOG(error, "Failed to initialise SSL plugin: " << e.what()); @@ -114,13 +152,15 @@ static struct SslPlugin : public Plugin { } } sslPlugin; -SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options, int backlog, bool nodelay) : +template <class T> +SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& options, int backlog, bool nodelay) : tcpNoDelay(nodelay), listeningPort(listener.listen(options.port, backlog, options.certName, options.clientAuth)), nodict(options.nodict) {} -void SslProtocolFactory::established(Poller::shared_ptr poller, const qpid::sys::ssl::SslSocket& s, - ConnectionCodec::Factory* f, bool isClient) { +void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s, + ConnectionCodec::Factory* f, bool isClient, + bool tcpNoDelay, bool nodict) { qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getFullAddress(), f, nodict); if (tcpNoDelay) { @@ -128,8 +168,10 @@ void SslProtocolFactory::established(Poller::shared_ptr poller, const qpid::sys: QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress()); } - if (isClient) + if (isClient) { async->setClient(); + } + qpid::sys::ssl::SslIO* aio = new qpid::sys::ssl::SslIO(s, boost::bind(&qpid::sys::ssl::SslHandler::readbuff, async, _1, _2), boost::bind(&qpid::sys::ssl::SslHandler::eof, async, _1), @@ -142,25 +184,66 @@ void SslProtocolFactory::established(Poller::shared_ptr poller, const qpid::sys: aio->start(poller); } -uint16_t SslProtocolFactory::getPort() const { - return listeningPort; // Immutable no need for lock. +template <> +void SslProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, + ConnectionCodec::Factory* f, bool isClient) { + const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s); + + SslEstablished(poller, *sslSock, f, isClient, tcpNoDelay, nodict); } -std::string SslProtocolFactory::getHost() const { - return listener.getSockname(); +template <class T> +uint16_t SslProtocolFactoryTmpl<T>::getPort() const { + return listeningPort; // Immutable no need for lock. } -void SslProtocolFactory::accept(Poller::shared_ptr poller, - ConnectionCodec::Factory* fact) { +template <class T> +void SslProtocolFactoryTmpl<T>::accept(Poller::shared_ptr poller, + ConnectionCodec::Factory* fact) { acceptor.reset( - new qpid::sys::ssl::SslAcceptor(listener, - boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false))); + new SslAcceptor(listener, + boost::bind(&SslProtocolFactoryTmpl<T>::established, + this, poller, _1, fact, false))); acceptor->start(poller); } -void SslProtocolFactory::connect( +template <> +void SslMuxProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, + ConnectionCodec::Factory* f, bool isClient) { + const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s); + + if (sslSock) { + SslEstablished(poller, *sslSock, f, isClient, tcpNoDelay, nodict); + return; + } + + AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f); + + if (tcpNoDelay) { + s.setTcpNoDelay(); + QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress()); + } + + if (isClient) { + async->setClient(); + } + AsynchIO* aio = AsynchIO::create + (s, + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&AsynchIOHandler::eof, async, _1), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); + + async->init(aio, 4); + aio->start(poller); +} + +template <class T> +void SslProtocolFactoryTmpl<T>::connect( Poller::shared_ptr poller, - const std::string& host, int16_t port, + const std::string& host, const std::string& port, ConnectionCodec::Factory* fact, ConnectFailedCallback failed) { @@ -171,9 +254,9 @@ void SslProtocolFactory::connect( // is no longer needed. qpid::sys::ssl::SslSocket* socket = new qpid::sys::ssl::SslSocket(); - new qpid::sys::ssl::SslConnector (*socket, poller, host, port, - boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, true), - failed); + new SslConnector(*socket, poller, host, port, + boost::bind(&SslProtocolFactoryTmpl<T>::established, this, poller, _1, fact, true), + failed); } namespace @@ -181,6 +264,7 @@ namespace const std::string SSL = "ssl"; } +template <> bool SslProtocolFactory::supports(const std::string& capability) { std::string s = capability; @@ -188,4 +272,12 @@ bool SslProtocolFactory::supports(const std::string& capability) return s == SSL; } +template <> +bool SslMuxProtocolFactory::supports(const std::string& capability) +{ + std::string s = capability; + transform(s.begin(), s.end(), s.begin(), tolower); + return s == SSL || s == "tcp"; +} + }} // namespace qpid::sys |