diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/sys/SslPlugin.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/SslPlugin.cpp')
-rw-r--r-- | cpp/src/qpid/sys/SslPlugin.cpp | 289 |
1 files changed, 153 insertions, 136 deletions
diff --git a/cpp/src/qpid/sys/SslPlugin.cpp b/cpp/src/qpid/sys/SslPlugin.cpp index 069e97758e..a40da24eb8 100644 --- a/cpp/src/qpid/sys/SslPlugin.cpp +++ b/cpp/src/qpid/sys/SslPlugin.cpp @@ -22,19 +22,19 @@ #include "qpid/sys/ProtocolFactory.h" #include "qpid/Plugin.h" -#include "qpid/sys/ssl/check.h" -#include "qpid/sys/ssl/util.h" -#include "qpid/sys/ssl/SslHandler.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/NameGenerator.h" +#include "qpid/log/Statement.h" #include "qpid/sys/AsynchIOHandler.h" #include "qpid/sys/AsynchIO.h" -#include "qpid/sys/ssl/SslIo.h" +#include "qpid/sys/ssl/util.h" #include "qpid/sys/ssl/SslSocket.h" -#include "qpid/broker/Broker.h" -#include "qpid/log/Statement.h" +#include "qpid/sys/SocketAddress.h" +#include "qpid/sys/SystemInfo.h" +#include "qpid/sys/Poller.h" #include <boost/bind.hpp> -#include <memory> - +#include <boost/ptr_container/ptr_vector.hpp> namespace qpid { namespace sys { @@ -64,38 +64,32 @@ struct SslServerOptions : ssl::SslOptions } }; -template <class T> -class SslProtocolFactoryTmpl : public ProtocolFactory { - private: - - typedef SslAcceptorTmpl<T> SslAcceptor; - +class SslProtocolFactory : public ProtocolFactory { + boost::ptr_vector<Socket> listeners; + boost::ptr_vector<AsynchAcceptor> acceptors; Timer& brokerTimer; uint32_t maxNegotiateTime; + uint16_t listeningPort; const bool tcpNoDelay; - T listener; - const uint16_t listeningPort; - std::auto_ptr<SslAcceptor> acceptor; bool nodict; public: - SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay, Timer& timer, uint32_t maxTime); + SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options, + Timer& timer); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); - void connect(Poller::shared_ptr, const std::string& host, const std::string& port, + void connect(Poller::shared_ptr, const std::string& name, const std::string& host, const std::string& port, ConnectionCodec::Factory*, - boost::function2<void, int, std::string> failed); + ConnectFailedCallback); uint16_t getPort() const; - bool supports(const std::string& capability); private: - void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, - bool isClient); + void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*); + void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&); + void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&); + void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback); }; -typedef SslProtocolFactoryTmpl<SslSocket> SslProtocolFactory; -typedef SslProtocolFactoryTmpl<SslMuxSocket> SslMuxProtocolFactory; - // Static instance to initialise plugin static struct SslPlugin : public Plugin { @@ -124,7 +118,7 @@ static struct SslPlugin : public Plugin { } } } - + void initialize(Target& target) { QPID_LOG(trace, "Initialising SSL plugin"); broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); @@ -139,19 +133,16 @@ static struct SslPlugin : public Plugin { const broker::Broker::Options& opts = broker->getOptions(); - ProtocolFactory::shared_ptr protocol(options.multiplex ? - static_cast<ProtocolFactory*>(new SslMuxProtocolFactory(options, - opts.connectionBacklog, - opts.tcpNoDelay, - broker->getTimer(), opts.maxNegotiateTime)) : - static_cast<ProtocolFactory*>(new SslProtocolFactory(options, - opts.connectionBacklog, - opts.tcpNoDelay, - broker->getTimer(), opts.maxNegotiateTime))); - QPID_LOG(notice, "Listening for " << - (options.multiplex ? "SSL or TCP" : "SSL") << - " connections on TCP port " << - protocol->getPort()); + ProtocolFactory::shared_ptr protocol( + static_cast<ProtocolFactory*>(new SslProtocolFactory(opts, options, broker->getTimer()))); + + if (protocol->getPort()!=0 ) { + QPID_LOG(notice, "Listening for " << + (options.multiplex ? "SSL or TCP" : "SSL") << + " connections on TCP/TCP6 port " << + protocol->getPort()); + } + broker->registerProtocolFactory("ssl", protocol); } catch (const std::exception& e) { QPID_LOG(error, "Failed to initialise SSL plugin: " << e.what()); @@ -161,99 +152,133 @@ static struct SslPlugin : public Plugin { } } sslPlugin; -template <class T> -SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& options, int backlog, bool nodelay, Timer& timer, uint32_t maxTime) : +namespace { + // Expand list of Interfaces and addresses to a list of addresses + std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) { + std::vector<std::string> addresses; + // If there are no specific interfaces listed use a single "" to listen on every interface + if (interfaces.empty()) { + addresses.push_back(""); + return addresses; + } + for (unsigned i = 0; i < interfaces.size(); ++i) { + const std::string& interface = interfaces[i]; + if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) { + // We don't have an interface of that name - + // Check for IPv6 ('[' ']') brackets and remove them + // then pass to be looked up directly + if (interface[0]=='[' && interface[interface.size()-1]==']') { + addresses.push_back(interface.substr(1, interface.size()-2)); + } else { + addresses.push_back(interface); + } + } + } + return addresses; + } +} + +SslProtocolFactory::SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options, + Timer& timer) : brokerTimer(timer), - maxNegotiateTime(maxTime), - tcpNoDelay(nodelay), listeningPort(listener.listen(options.port, backlog, options.certName, options.clientAuth)), + maxNegotiateTime(opts.maxNegotiateTime), + tcpNoDelay(opts.tcpNoDelay), nodict(options.nodict) -{} - -void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s, - ConnectionCodec::Factory* f, bool isClient, - Timer& timer, uint32_t maxTime, bool tcpNoDelay, bool nodict) { - qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getFullAddress(), f, nodict); - - if (tcpNoDelay) { - s.setTcpNoDelay(tcpNoDelay); - QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress()); +{ + std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces); + if (addresses.empty()) { + // We specified some interfaces, but couldn't find addresses for them + QPID_LOG(warning, "SSL: No specified network interfaces found: Not Listening"); + listeningPort = 0; } - if (isClient) { - async->setClient(); + for (unsigned i = 0; i<addresses.size(); ++i) { + QPID_LOG(debug, "Using interface: " << addresses[i]); + SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(options.port)); + + // We must have at least one resolved address + QPID_LOG(info, "Listening to: " << sa.asString()) + Socket* s = options.multiplex ? + new SslMuxSocket(options.certName, options.clientAuth) : + new SslSocket(options.certName, options.clientAuth); + uint16_t lport = s->listen(sa, opts.connectionBacklog); + QPID_LOG(debug, "Listened to: " << lport); + listeners.push_back(s); + + listeningPort = lport; + + // Try any other resolved addresses + while (sa.nextAddress()) { + // Hack to ensure that all listening connections are on the same port + sa.setAddrInfoPort(listeningPort); + QPID_LOG(info, "Listening to: " << sa.asString()) + Socket* s = options.multiplex ? + new SslMuxSocket(options.certName, options.clientAuth) : + new SslSocket(options.certName, options.clientAuth); + uint16_t lport = s->listen(sa, opts.connectionBacklog); + QPID_LOG(debug, "Listened to: " << lport); + listeners.push_back(s); + } } - - qpid::sys::ssl::SslIO* aio = new qpid::sys::ssl::SslIO(s, - boost::bind(&qpid::sys::ssl::SslHandler::readbuff, async, _1, _2), - boost::bind(&qpid::sys::ssl::SslHandler::eof, async, _1), - boost::bind(&qpid::sys::ssl::SslHandler::disconnect, async, _1), - boost::bind(&qpid::sys::ssl::SslHandler::closedSocket, async, _1, _2), - boost::bind(&qpid::sys::ssl::SslHandler::nobuffs, async, _1), - boost::bind(&qpid::sys::ssl::SslHandler::idle, async, _1)); - - async->init(aio,timer, maxTime); - aio->start(poller); -} - -template <> -void SslProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, - ConnectionCodec::Factory* f, bool isClient) { - const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s); - - SslEstablished(poller, *sslSock, f, isClient, brokerTimer, maxNegotiateTime, tcpNoDelay, nodict); } -template <class T> -uint16_t SslProtocolFactoryTmpl<T>::getPort() const { - return listeningPort; // Immutable no need for lock. +void SslProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s, + ConnectionCodec::Factory* f) { + AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false); + establishedCommon(async, poller, s); } -template <class T> -void SslProtocolFactoryTmpl<T>::accept(Poller::shared_ptr poller, - ConnectionCodec::Factory* fact) { - acceptor.reset( - new SslAcceptor(listener, - boost::bind(&SslProtocolFactoryTmpl<T>::established, - this, poller, _1, fact, false))); - acceptor->start(poller); +void SslProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s, + ConnectionCodec::Factory* f, const std::string& name) { + AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false); + establishedCommon(async, poller, s); } -template <> -void SslMuxProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, - ConnectionCodec::Factory* f, bool isClient) { - const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s); - - if (sslSock) { - SslEstablished(poller, *sslSock, f, isClient, brokerTimer, maxNegotiateTime, tcpNoDelay, nodict); - return; - } - - AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f); - +void SslProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) { if (tcpNoDelay) { s.setTcpNoDelay(); QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress()); } - if (isClient) { - async->setClient(); - } - AsynchIO* aio = AsynchIO::create - (s, - boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), - boost::bind(&AsynchIOHandler::eof, async, _1), - boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), - boost::bind(&AsynchIOHandler::nobuffs, async, _1), - boost::bind(&AsynchIOHandler::idle, async, _1)); + AsynchIO* aio = AsynchIO::create( + s, + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&AsynchIOHandler::eof, async, _1), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); async->init(aio, brokerTimer, maxNegotiateTime); aio->start(poller); } -template <class T> -void SslProtocolFactoryTmpl<T>::connect( +uint16_t SslProtocolFactory::getPort() const { + return listeningPort; // Immutable no need for lock. +} + +void SslProtocolFactory::accept(Poller::shared_ptr poller, + ConnectionCodec::Factory* fact) { + for (unsigned i = 0; i<listeners.size(); ++i) { + acceptors.push_back( + AsynchAcceptor::create(listeners[i], + boost::bind(&SslProtocolFactory::establishedIncoming, this, poller, _1, fact))); + acceptors[i].start(poller); + } +} + +void SslProtocolFactory::connectFailed( + const Socket& s, int ec, const std::string& emsg, + ConnectFailedCallback failedCb) +{ + failedCb(ec, emsg); + s.close(); + delete &s; +} + +void SslProtocolFactory::connect( Poller::shared_ptr poller, + const std::string& name, const std::string& host, const std::string& port, ConnectionCodec::Factory* fact, ConnectFailedCallback failed) @@ -264,31 +289,23 @@ void SslProtocolFactoryTmpl<T>::connect( // shutdown. The allocated SslConnector frees itself when it // is no longer needed. - qpid::sys::ssl::SslSocket* socket = new qpid::sys::ssl::SslSocket(); - new SslConnector(*socket, poller, host, port, - boost::bind(&SslProtocolFactoryTmpl<T>::established, this, poller, _1, fact, true), - failed); -} - -namespace -{ -const std::string SSL = "ssl"; -} - -template <> -bool SslProtocolFactory::supports(const std::string& capability) -{ - std::string s = capability; - transform(s.begin(), s.end(), s.begin(), tolower); - return s == SSL; -} - -template <> -bool SslMuxProtocolFactory::supports(const std::string& capability) -{ - std::string s = capability; - transform(s.begin(), s.end(), s.begin(), tolower); - return s == SSL || s == "tcp"; + Socket* socket = new qpid::sys::ssl::SslSocket(); + try { + AsynchConnector* c = AsynchConnector::create( + *socket, + host, + port, + boost::bind(&SslProtocolFactory::establishedOutgoing, + this, poller, _1, fact, name), + boost::bind(&SslProtocolFactory::connectFailed, + this, _1, _2, _3, failed)); + c->start(poller); + } catch (std::exception&) { + // TODO: Design question - should we do the error callback and also throw? + int errCode = socket->getError(); + connectFailed(*socket, errCode, strError(errCode), failed); + throw; + } } }} // namespace qpid::sys |