summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/SslPlugin.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/SslPlugin.cpp')
-rw-r--r--cpp/src/qpid/sys/SslPlugin.cpp154
1 files changed, 123 insertions, 31 deletions
diff --git a/cpp/src/qpid/sys/SslPlugin.cpp b/cpp/src/qpid/sys/SslPlugin.cpp
index b0e791d60b..ab15785492 100644
--- a/cpp/src/qpid/sys/SslPlugin.cpp
+++ b/cpp/src/qpid/sys/SslPlugin.cpp
@@ -25,6 +25,8 @@
#include "qpid/sys/ssl/check.h"
#include "qpid/sys/ssl/util.h"
#include "qpid/sys/ssl/SslHandler.h"
+#include "qpid/sys/AsynchIOHandler.h"
+#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/ssl/SslIo.h"
#include "qpid/sys/ssl/SslSocket.h"
#include "qpid/broker/Broker.h"
@@ -37,15 +39,19 @@
namespace qpid {
namespace sys {
+using namespace qpid::sys::ssl;
+
struct SslServerOptions : ssl::SslOptions
{
uint16_t port;
bool clientAuth;
bool nodict;
+ bool multiplex;
SslServerOptions() : port(5671),
clientAuth(false),
- nodict(false)
+ nodict(false),
+ multiplex(false)
{
addOptions()
("ssl-port", optValue(port, "PORT"), "Port on which to listen for SSL connections")
@@ -56,29 +62,37 @@ struct SslServerOptions : ssl::SslOptions
}
};
-class SslProtocolFactory : public ProtocolFactory {
+template <class T>
+class SslProtocolFactoryTmpl : public ProtocolFactory {
+ private:
+
+ typedef SslAcceptorTmpl<T> SslAcceptor;
+
const bool tcpNoDelay;
- qpid::sys::ssl::SslSocket listener;
+ T listener;
const uint16_t listeningPort;
- std::auto_ptr<qpid::sys::ssl::SslAcceptor> acceptor;
+ std::auto_ptr<SslAcceptor> acceptor;
bool nodict;
public:
- SslProtocolFactory(const SslServerOptions&, int backlog, bool nodelay);
+ SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& host, int16_t port,
+ void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
boost::function2<void, int, std::string> failed);
uint16_t getPort() const;
- std::string getHost() const;
bool supports(const std::string& capability);
private:
- void established(Poller::shared_ptr, const qpid::sys::ssl::SslSocket&, ConnectionCodec::Factory*,
+ void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
bool isClient);
};
+typedef SslProtocolFactoryTmpl<SslSocket> SslProtocolFactory;
+typedef SslProtocolFactoryTmpl<SslMuxSocket> SslMuxProtocolFactory;
+
+
// Static instance to initialise plugin
static struct SslPlugin : public Plugin {
SslServerOptions options;
@@ -87,24 +101,48 @@ static struct SslPlugin : public Plugin {
~SslPlugin() { ssl::shutdownNSS(); }
- void earlyInitialize(Target&) {
+ void earlyInitialize(Target& target) {
+ broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
+ if (broker && !options.certDbPath.empty()) {
+ const broker::Broker::Options& opts = broker->getOptions();
+
+ if (opts.port == options.port && // AMQP & AMQPS ports are the same
+ opts.port != 0) {
+ // The presence of this option is used to signal to the TCP
+ // plugin not to start listening on the shared port. The actual
+ // value cannot be configured through the command line or config
+ // file (other than by setting the ports to the same value)
+ // because we are only adding it after option parsing.
+ options.multiplex = true;
+ options.addOptions()("ssl-multiplex", optValue(options.multiplex), "Allow SSL and non-SSL connections on the same port");
+ }
+ }
}
void initialize(Target& target) {
+ QPID_LOG(trace, "Initialising SSL plugin");
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
// Only provide to a Broker
if (broker) {
if (options.certDbPath.empty()) {
- QPID_LOG(info, "SSL plugin not enabled, you must set --ssl-cert-db to enable it.");
+ QPID_LOG(notice, "SSL plugin not enabled, you must set --ssl-cert-db to enable it.");
} else {
try {
ssl::initNSS(options, true);
const broker::Broker::Options& opts = broker->getOptions();
- ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(options,
- opts.connectionBacklog,
- opts.tcpNoDelay));
- QPID_LOG(notice, "Listening for SSL connections on TCP port " << protocol->getPort());
+
+ ProtocolFactory::shared_ptr protocol(options.multiplex ?
+ static_cast<ProtocolFactory*>(new SslMuxProtocolFactory(options,
+ opts.connectionBacklog,
+ opts.tcpNoDelay)) :
+ static_cast<ProtocolFactory*>(new SslProtocolFactory(options,
+ opts.connectionBacklog,
+ opts.tcpNoDelay)));
+ QPID_LOG(notice, "Listening for " <<
+ (options.multiplex ? "SSL or TCP" : "SSL") <<
+ " connections on TCP port " <<
+ protocol->getPort());
broker->registerProtocolFactory("ssl", protocol);
} catch (const std::exception& e) {
QPID_LOG(error, "Failed to initialise SSL plugin: " << e.what());
@@ -114,13 +152,15 @@ static struct SslPlugin : public Plugin {
}
} sslPlugin;
-SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options, int backlog, bool nodelay) :
+template <class T>
+SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& options, int backlog, bool nodelay) :
tcpNoDelay(nodelay), listeningPort(listener.listen(options.port, backlog, options.certName, options.clientAuth)),
nodict(options.nodict)
{}
-void SslProtocolFactory::established(Poller::shared_ptr poller, const qpid::sys::ssl::SslSocket& s,
- ConnectionCodec::Factory* f, bool isClient) {
+void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s,
+ ConnectionCodec::Factory* f, bool isClient,
+ bool tcpNoDelay, bool nodict) {
qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getFullAddress(), f, nodict);
if (tcpNoDelay) {
@@ -128,8 +168,10 @@ void SslProtocolFactory::established(Poller::shared_ptr poller, const qpid::sys:
QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
}
- if (isClient)
+ if (isClient) {
async->setClient();
+ }
+
qpid::sys::ssl::SslIO* aio = new qpid::sys::ssl::SslIO(s,
boost::bind(&qpid::sys::ssl::SslHandler::readbuff, async, _1, _2),
boost::bind(&qpid::sys::ssl::SslHandler::eof, async, _1),
@@ -142,25 +184,66 @@ void SslProtocolFactory::established(Poller::shared_ptr poller, const qpid::sys:
aio->start(poller);
}
-uint16_t SslProtocolFactory::getPort() const {
- return listeningPort; // Immutable no need for lock.
+template <>
+void SslProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f, bool isClient) {
+ const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
+
+ SslEstablished(poller, *sslSock, f, isClient, tcpNoDelay, nodict);
}
-std::string SslProtocolFactory::getHost() const {
- return listener.getSockname();
+template <class T>
+uint16_t SslProtocolFactoryTmpl<T>::getPort() const {
+ return listeningPort; // Immutable no need for lock.
}
-void SslProtocolFactory::accept(Poller::shared_ptr poller,
- ConnectionCodec::Factory* fact) {
+template <class T>
+void SslProtocolFactoryTmpl<T>::accept(Poller::shared_ptr poller,
+ ConnectionCodec::Factory* fact) {
acceptor.reset(
- new qpid::sys::ssl::SslAcceptor(listener,
- boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false)));
+ new SslAcceptor(listener,
+ boost::bind(&SslProtocolFactoryTmpl<T>::established,
+ this, poller, _1, fact, false)));
acceptor->start(poller);
}
-void SslProtocolFactory::connect(
+template <>
+void SslMuxProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f, bool isClient) {
+ const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
+
+ if (sslSock) {
+ SslEstablished(poller, *sslSock, f, isClient, tcpNoDelay, nodict);
+ return;
+ }
+
+ AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f);
+
+ if (tcpNoDelay) {
+ s.setTcpNoDelay();
+ QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
+ }
+
+ if (isClient) {
+ async->setClient();
+ }
+ AsynchIO* aio = AsynchIO::create
+ (s,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
+
+ async->init(aio, 4);
+ aio->start(poller);
+}
+
+template <class T>
+void SslProtocolFactoryTmpl<T>::connect(
Poller::shared_ptr poller,
- const std::string& host, int16_t port,
+ const std::string& host, const std::string& port,
ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
{
@@ -171,9 +254,9 @@ void SslProtocolFactory::connect(
// is no longer needed.
qpid::sys::ssl::SslSocket* socket = new qpid::sys::ssl::SslSocket();
- new qpid::sys::ssl::SslConnector (*socket, poller, host, port,
- boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, true),
- failed);
+ new SslConnector(*socket, poller, host, port,
+ boost::bind(&SslProtocolFactoryTmpl<T>::established, this, poller, _1, fact, true),
+ failed);
}
namespace
@@ -181,6 +264,7 @@ namespace
const std::string SSL = "ssl";
}
+template <>
bool SslProtocolFactory::supports(const std::string& capability)
{
std::string s = capability;
@@ -188,4 +272,12 @@ bool SslProtocolFactory::supports(const std::string& capability)
return s == SSL;
}
+template <>
+bool SslMuxProtocolFactory::supports(const std::string& capability)
+{
+ std::string s = capability;
+ transform(s.begin(), s.end(), s.begin(), tolower);
+ return s == SSL || s == "tcp";
+}
+
}} // namespace qpid::sys