diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2013-03-01 00:21:12 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2013-03-01 00:21:12 +0000 |
| commit | 0d35f541a728091195e6bc2797d974159e9247bd (patch) | |
| tree | a29fb2ae17a4c8a2cca20ade45b56da911a3ec00 /qpid/cpp | |
| parent | 6df331f948acb5e04461f0bb3edd7e6e616aa996 (diff) | |
| download | qpid-python-0d35f541a728091195e6bc2797d974159e9247bd.tar.gz | |
QPID-4610: Remove duplicated transport code from C++ broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1451443 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/Makefile.am | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 36 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 32 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp | 693 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp | 11 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/SocketTransport.cpp | 209 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/SocketTransport.h | 91 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/SslPlugin.cpp | 222 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp | 208 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/TransportFactory.h (renamed from qpid/cpp/src/qpid/sys/ProtocolFactory.h) | 34 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/windows/WinSocket.cpp | 4 |
12 files changed, 732 insertions, 814 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index f2a19ba670..1e891a93c8 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -1224,6 +1224,7 @@ set (qpidbroker_SOURCES qpid/management/ManagementAgent.cpp qpid/management/ManagementDirectExchange.cpp qpid/management/ManagementTopicExchange.cpp + qpid/sys/SocketTransport.cpp qpid/sys/TCPIOPlugin.cpp ) add_msvc_version (qpidbroker library dll) diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 8dca041df3..326cacd9d1 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -755,7 +755,10 @@ libqpidbroker_la_SOURCES = \ qpid/management/ManagementDirectExchange.h \ qpid/management/ManagementTopicExchange.cpp \ qpid/management/ManagementTopicExchange.h \ - qpid/sys/TCPIOPlugin.cpp + qpid/sys/TCPIOPlugin.cpp \ + qpid/sys/SocketTransport.cpp \ + qpid/sys/SocketTransport.h \ + qpid/sys/TransportFactory.h QPIDBROKER_VERSION_INFO = 2:0:0 libqpidbroker_la_LDFLAGS = -version-info $(QPIDBROKER_VERSION_INFO) diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 3db6221541..47296d94b0 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -68,7 +68,7 @@ #include "qpid/framing/ProtocolInitiation.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/Uuid.h" -#include "qpid/sys/ProtocolFactory.h" +#include "qpid/sys/TransportFactory.h" #include "qpid/sys/Poller.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Thread.h" @@ -89,7 +89,8 @@ #include <iostream> #include <memory> -using qpid::sys::ProtocolFactory; +using qpid::sys::TransportAcceptor; +using qpid::sys::TransportConnector; using qpid::sys::Poller; using qpid::sys::Dispatcher; using qpid::sys::Thread; @@ -485,7 +486,7 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, string transport = hp.i_transport.empty() ? TCP_TRANSPORT : hp.i_transport; QPID_LOG (debug, "Broker::connect() " << hp.i_host << ":" << hp.i_port << "; transport=" << transport << "; durable=" << (hp.i_durable?"T":"F") << "; authMech=\"" << hp.i_authMechanism << "\""); - if (!getProtocolFactory(transport)) { + if (!getTransportInfo(transport).connectorFactory) { QPID_LOG(error, "Transport '" << transport << "' not supported"); text = "transport type not supported"; return Manageable::STATUS_NOT_IMPLEMENTED; @@ -795,7 +796,7 @@ void Broker::createObject(const std::string& type, const std::string& name, } } - if (!getProtocolFactory(transport)) { + if (!getTransportInfo(transport).connectorFactory) { QPID_LOG(error, "Transport '" << transport << "' not supported."); throw UnsupportedTransport(transport); } @@ -1011,30 +1012,29 @@ bool Broker::getLogHiresTimestamp() } -boost::shared_ptr<ProtocolFactory> Broker::getProtocolFactory(const std::string& name) const { - ProtocolFactoryMap::const_iterator i - = name.empty() ? protocolFactories.begin() : protocolFactories.find(name); - if (i == protocolFactories.end()) return boost::shared_ptr<ProtocolFactory>(); +const Broker::TransportInfo& Broker::getTransportInfo(const std::string& name) const { + static TransportInfo nullTransportInfo; + TransportMap::const_iterator i + = name.empty() ? transportMap.begin() : transportMap.find(name); + if (i == transportMap.end()) return nullTransportInfo; else return i->second; } uint16_t Broker::getPort(const std::string& name) const { - boost::shared_ptr<ProtocolFactory> factory = getProtocolFactory(name); - if (factory) { - return factory->getPort(); + if (int p = getTransportInfo(name).port) { + return p; } else { throw NoSuchTransportException(QPID_MSG("No such transport: '" << name << "'")); } } -void Broker::registerProtocolFactory(const std::string& name, ProtocolFactory::shared_ptr protocolFactory) { - protocolFactories[name] = protocolFactory; - Url::addProtocol(name); +void Broker::registerTransport(const std::string& name, boost::shared_ptr<TransportAcceptor> a, boost::shared_ptr<TransportConnector> c, uint16_t p) { + transportMap[name] = TransportInfo(a, c, p); } void Broker::accept() { - for (ProtocolFactoryMap::const_iterator i = protocolFactories.begin(); i != protocolFactories.end(); i++) { - i->second->accept(poller, factory.get()); + for (TransportMap::const_iterator i = transportMap.begin(); i != transportMap.end(); i++) { + if (i->second.acceptor) i->second.acceptor->accept(poller, factory.get()); } } @@ -1043,8 +1043,8 @@ void Broker::connect( const std::string& host, const std::string& port, const std::string& transport, boost::function2<void, int, std::string> failed) { - boost::shared_ptr<ProtocolFactory> pf = getProtocolFactory(transport); - if (pf) pf->connect(poller, name, host, port, factory.get(), failed); + boost::shared_ptr<TransportConnector> tcf = getTransportInfo(transport).connectorFactory; + if (tcf) tcf->connect(poller, name, host, port, factory.get(), failed); else throw NoSuchTransportException(QPID_MSG("Unsupported transport type: " << transport)); } diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index e9f501a658..a896d9d70d 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -51,7 +51,8 @@ namespace qpid { namespace sys { -class ProtocolFactory; +class TransportAcceptor; +class TransportConnector; class Poller; class Timer; } @@ -124,8 +125,23 @@ class Broker : public sys::Runnable, public Plugin::Target, }; private: - typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap; + struct TransportInfo { + boost::shared_ptr<sys::TransportAcceptor> acceptor; + boost::shared_ptr<sys::TransportConnector> connectorFactory; + uint16_t port; + + TransportInfo() : + port(0) + {} + TransportInfo(boost::shared_ptr<sys::TransportAcceptor> a, boost::shared_ptr<sys::TransportConnector> c, uint16_t p) : + acceptor(a), + connectorFactory(c), + port(p) + {} + }; + typedef std::map<std::string, TransportInfo > TransportMap; + void declareStandardExchange(const std::string& name, const std::string& type); void setStore (); void setLogLevel(const std::string& level); @@ -150,7 +166,7 @@ class Broker : public sys::Runnable, public Plugin::Target, std::auto_ptr<sys::Timer> timer; Options config; std::auto_ptr<management::ManagementAgent> managementAgent; - ProtocolFactoryMap protocolFactories; + TransportMap transportMap; std::auto_ptr<MessageStore> store; AclModule* acl; DataDir dataDir; @@ -228,9 +244,11 @@ class Broker : public sys::Runnable, public Plugin::Target, uint32_t methodId, management::Args& args, std::string& text); /** Add to the broker's protocolFactorys */ - QPID_BROKER_EXTERN void registerProtocolFactory( - const std::string& name, boost::shared_ptr<sys::ProtocolFactory>); - + QPID_BROKER_EXTERN void registerTransport( + const std::string& name, + boost::shared_ptr<sys::TransportAcceptor>, boost::shared_ptr<sys::TransportConnector>, + uint16_t port); + /** Accept connections */ QPID_BROKER_EXTERN void accept(); @@ -251,7 +269,7 @@ class Broker : public sys::Runnable, public Plugin::Target, uint32_t qty, const qpid::types::Variant::Map& filter); - QPID_BROKER_EXTERN boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory( + QPID_BROKER_EXTERN const TransportInfo& getTransportInfo( const std::string& name = TCP_TRANSPORT) const; /** Expose poller so plugins can register their descriptors. */ diff --git a/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp b/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp index a07afe45ae..368af28286 100644 --- a/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp +++ b/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp @@ -1,379 +1,314 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/ProtocolFactory.h"
-
-#include "qpid/Plugin.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/log/Statement.h"
-#include "qpid/sys/AsynchIOHandler.h"
-#include "qpid/sys/ConnectionCodec.h"
-#include "qpid/sys/Socket.h"
-#include "qpid/sys/SocketAddress.h"
-#include "qpid/sys/SystemInfo.h"
-#include "qpid/sys/windows/SslAsynchIO.h"
-
-#include <boost/bind.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
-#include <memory>
-
-// security.h needs to see this to distinguish from kernel use.
-#define SECURITY_WIN32
-#include <security.h>
-#include <Schnlsp.h>
-#undef SECURITY_WIN32
-
-
-namespace qpid {
-namespace sys {
-
-class Timer;
-
-namespace windows {
-
-struct SslServerOptions : qpid::Options
-{
- std::string certStore;
- std::string certStoreLocation;
- std::string certName;
- uint16_t port;
- bool clientAuth;
-
- SslServerOptions() : qpid::Options("SSL Options"),
- certStore("My"),
- certStoreLocation("CurrentUser"),
- certName("localhost"),
- port(5671),
- clientAuth(false)
- {
- qpid::Address me;
- if (qpid::sys::SystemInfo::getLocalHostname(me))
- certName = me.host;
-
- addOptions()
- ("ssl-cert-store", optValue(certStore, "NAME"), "Local store name from which to obtain certificate")
- ("ssl-cert-store-location", optValue(certStoreLocation, "NAME"),
- "Local store name location for certificates ( CurrentUser | LocalMachine | CurrentService )")
- ("ssl-cert-name", optValue(certName, "NAME"), "Name of the certificate to use")
- ("ssl-port", optValue(port, "PORT"), "Port on which to listen for SSL connections")
- ("ssl-require-client-authentication", optValue(clientAuth),
- "Forces clients to authenticate in order to establish an SSL connection");
- }
-};
-
-class SslProtocolFactory : public qpid::sys::ProtocolFactory {
- boost::ptr_vector<Socket> listeners;
- boost::ptr_vector<AsynchAcceptor> acceptors;
- Timer& brokerTimer;
- uint32_t maxNegotiateTime;
- uint16_t listeningPort;
- const bool tcpNoDelay;
- std::string brokerHost;
- const bool clientAuthSelected;
- std::auto_ptr<qpid::sys::AsynchAcceptor> acceptor;
- ConnectFailedCallback connectFailedCallback;
- CredHandle credHandle;
-
- public:
- SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions&, Timer& timer);
- ~SslProtocolFactory();
- void accept(sys::Poller::shared_ptr, sys::ConnectionCodec::Factory*);
- void connect(sys::Poller::shared_ptr, const std::string& name, const std::string& host, const std::string& port,
- sys::ConnectionCodec::Factory*,
- ConnectFailedCallback failed);
-
- uint16_t getPort() const;
-
- private:
- void connectFailed(const qpid::sys::Socket&,
- int err,
- const std::string& msg);
- void establishedIncoming(sys::Poller::shared_ptr, const qpid::sys::Socket&, sys::ConnectionCodec::Factory*);
- void establishedOutgoing(sys::Poller::shared_ptr, const qpid::sys::Socket&, sys::ConnectionCodec::Factory*, std::string& );
- void establishedCommon(sys::Poller::shared_ptr, sys::AsynchIOHandler*, sys::AsynchIO*, const qpid::sys::Socket&);
-};
-
-// Static instance to initialise plugin
-static struct SslPlugin : public Plugin {
- SslServerOptions options;
-
- Options* getOptions() { return &options; }
-
- void earlyInitialize(Target&) {
- }
-
- void initialize(Target& target) {
- broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
- // Only provide to a Broker
- if (broker) {
- try {
- const broker::Broker::Options& opts = broker->getOptions();
- ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(opts, options, broker->getTimer()));
- QPID_LOG(notice, "Listening for SSL connections on TCP port " << protocol->getPort());
- broker->registerProtocolFactory("ssl", protocol);
- } catch (const std::exception& e) {
- QPID_LOG(error, "Failed to initialise SSL listener: " << e.what());
- }
- }
- }
-} sslPlugin;
-
-namespace {
- // Expand list of Interfaces and addresses to a list of addresses
- std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) {
- std::vector<std::string> addresses;
- // If there are no specific interfaces listed use a single "" to listen on every interface
- if (interfaces.empty()) {
- addresses.push_back("");
- return addresses;
- }
- for (unsigned i = 0; i < interfaces.size(); ++i) {
- const std::string& interface = interfaces[i];
- if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) {
- // We don't have an interface of that name -
- // Check for IPv6 ('[' ']') brackets and remove them
- // then pass to be looked up directly
- if (interface[0]=='[' && interface[interface.size()-1]==']') {
- addresses.push_back(interface.substr(1, interface.size()-2));
- } else {
- addresses.push_back(interface);
- }
- }
- }
- return addresses;
- }
-}
-
-SslProtocolFactory::SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options, Timer& timer)
- : brokerTimer(timer),
- maxNegotiateTime(opts.maxNegotiateTime),
- tcpNoDelay(opts.tcpNoDelay),
- clientAuthSelected(options.clientAuth) {
-
- // Make sure that certificate store is good before listening to sockets
- // to avoid having open and listening sockets when there is no cert store
- SecInvalidateHandle(&credHandle);
-
- // Get the certificate for this server.
- DWORD flags = 0;
- std::string certStoreLocation = options.certStoreLocation;
- std::transform(certStoreLocation.begin(), certStoreLocation.end(), certStoreLocation.begin(), ::tolower);
- if (certStoreLocation == "currentuser") {
- flags = CERT_SYSTEM_STORE_CURRENT_USER;
- } else if (certStoreLocation == "localmachine") {
- flags = CERT_SYSTEM_STORE_LOCAL_MACHINE;
- } else if (certStoreLocation == "currentservice") {
- flags = CERT_SYSTEM_STORE_CURRENT_SERVICE;
- } else {
- QPID_LOG(error, "Unrecognised SSL certificate store location: " << options.certStoreLocation
- << " - Using default location");
- }
- HCERTSTORE certStoreHandle;
- certStoreHandle = ::CertOpenStore(CERT_STORE_PROV_SYSTEM_A,
- X509_ASN_ENCODING,
- 0,
- flags |
- CERT_STORE_READONLY_FLAG,
- options.certStore.c_str());
- if (!certStoreHandle)
- throw qpid::Exception(QPID_MSG("Opening store " << options.certStore << " " << qpid::sys::strError(GetLastError())));
-
- PCCERT_CONTEXT certContext;
- certContext = ::CertFindCertificateInStore(certStoreHandle,
- X509_ASN_ENCODING,
- 0,
- CERT_FIND_SUBJECT_STR_A,
- options.certName.c_str(),
- NULL);
- if (certContext == NULL) {
- int err = ::GetLastError();
- ::CertCloseStore(certStoreHandle, 0);
- throw qpid::Exception(QPID_MSG("Locating certificate " << options.certName << " in store " << options.certStore << " " << qpid::sys::strError(GetLastError())));
- throw QPID_WINDOWS_ERROR(err);
- }
-
- SCHANNEL_CRED cred;
- memset(&cred, 0, sizeof(cred));
- cred.dwVersion = SCHANNEL_CRED_VERSION;
- cred.cCreds = 1;
- cred.paCred = &certContext;
- SECURITY_STATUS status = ::AcquireCredentialsHandle(NULL,
- UNISP_NAME,
- SECPKG_CRED_INBOUND,
- NULL,
- &cred,
- NULL,
- NULL,
- &credHandle,
- NULL);
- if (status != SEC_E_OK)
- throw QPID_WINDOWS_ERROR(status);
- ::CertFreeCertificateContext(certContext);
- ::CertCloseStore(certStoreHandle, 0);
-
- std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces);
- if (addresses.empty()) {
- // We specified some interfaces, but couldn't find addresses for them
- QPID_LOG(warning, "TCP/TCP6: No specified network interfaces found: Not Listening");
- listeningPort = 0;
- }
-
- for (unsigned i = 0; i<addresses.size(); ++i) {
- QPID_LOG(debug, "Using interface: " << addresses[i]);
- SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(options.port));
-
-
- // We must have at least one resolved address
- QPID_LOG(info, "SSL Listening to: " << sa.asString())
- Socket* s = createSocket();
- listeningPort = s->listen(sa, opts.connectionBacklog);
- listeners.push_back(s);
-
- // Try any other resolved addresses
- while (sa.nextAddress()) {
- QPID_LOG(info, "SSL Listening to: " << sa.asString())
- Socket* s = createSocket();
- s->listen(sa, opts.connectionBacklog);
- listeners.push_back(s);
- }
- }
-}
-
-SslProtocolFactory::~SslProtocolFactory() {
- ::FreeCredentialsHandle(&credHandle);
-}
-
-void SslProtocolFactory::connectFailed(const qpid::sys::Socket&,
- int err,
- const std::string& msg) {
- if (connectFailedCallback)
- connectFailedCallback(err, msg);
-}
-
-void SslProtocolFactory::establishedIncoming(sys::Poller::shared_ptr poller,
- const qpid::sys::Socket& s,
- sys::ConnectionCodec::Factory* f) {
- sys::AsynchIOHandler* async = new sys::AsynchIOHandler(s.getFullAddress(), f, false, false);
-
- sys::AsynchIO *aio =
- new qpid::sys::windows::ServerSslAsynchIO(
- clientAuthSelected,
- s,
- credHandle,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&AsynchIOHandler::eof, async, _1),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
-
- establishedCommon(poller, async, aio, s);
-}
-
-void SslProtocolFactory::establishedOutgoing(sys::Poller::shared_ptr poller,
- const qpid::sys::Socket& s,
- sys::ConnectionCodec::Factory* f,
- std::string& name) {
- sys::AsynchIOHandler* async = new sys::AsynchIOHandler(name, f, true, false);
-
- sys::AsynchIO *aio =
- new qpid::sys::windows::ClientSslAsynchIO(
- brokerHost,
- s,
- credHandle,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&AsynchIOHandler::eof, async, _1),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
-
- establishedCommon(poller, async, aio, s);
-}
-
-void SslProtocolFactory::establishedCommon(sys::Poller::shared_ptr poller,
- sys::AsynchIOHandler* async,
- sys::AsynchIO* aio,
- const qpid::sys::Socket& s) {
- if (tcpNoDelay) {
- s.setTcpNoDelay();
- QPID_LOG(info,
- "Set TCP_NODELAY on connection to " << s.getPeerAddress());
- }
-
- async->init(aio, brokerTimer, maxNegotiateTime);
- aio->start(poller);
-}
-
-uint16_t SslProtocolFactory::getPort() const {
- return listeningPort; // Immutable no need for lock.
-}
-
-void SslProtocolFactory::accept(sys::Poller::shared_ptr poller,
- sys::ConnectionCodec::Factory* fact) {
- for (unsigned i = 0; i<listeners.size(); ++i) {
- acceptors.push_back(
- AsynchAcceptor::create(listeners[i],
- boost::bind(&SslProtocolFactory::establishedIncoming, this, poller, _1, fact)));
- acceptors[i].start(poller);
- }
-}
-
-void SslProtocolFactory::connect(sys::Poller::shared_ptr poller,
- const std::string& name,
- const std::string& host,
- const std::string& port,
- sys::ConnectionCodec::Factory* fact,
- ConnectFailedCallback failed)
-{
- SCHANNEL_CRED cred;
- memset(&cred, 0, sizeof(cred));
- cred.dwVersion = SCHANNEL_CRED_VERSION;
- SECURITY_STATUS status = ::AcquireCredentialsHandle(NULL,
- UNISP_NAME,
- SECPKG_CRED_OUTBOUND,
- NULL,
- &cred,
- NULL,
- NULL,
- &credHandle,
- NULL);
- if (status != SEC_E_OK)
- throw QPID_WINDOWS_ERROR(status);
-
- brokerHost = host;
- // Note that the following logic does not cause a memory leak.
- // The allocated Socket is freed either by the AsynchConnector
- // upon connection failure or by the AsynchIO upon connection
- // shutdown. The allocated AsynchConnector frees itself when it
- // is no longer needed.
- qpid::sys::Socket* socket = createSocket();
- connectFailedCallback = failed;
- AsynchConnector::create(*socket,
- host,
- port,
- boost::bind(&SslProtocolFactory::establishedOutgoing,
- this, poller, _1, fact, name),
- boost::bind(&SslProtocolFactory::connectFailed,
- this, _1, _2, _3));
-}
-
-}}} // namespace qpid::sys::windows
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/TransportFactory.h" +#include "qpid/sys/SocketTransport.h" + +#include "qpid/Plugin.h" +#include "qpid/broker/Broker.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/AsynchIOHandler.h" +#include "qpid/sys/ConnectionCodec.h" +#include "qpid/sys/Socket.h" +#include "qpid/sys/SocketAddress.h" +#include "qpid/sys/SystemInfo.h" +#include "qpid/sys/windows/SslAsynchIO.h" + +#include <boost/bind.hpp> +#include <boost/ptr_container/ptr_vector.hpp> +#include <memory> + +// security.h needs to see this to distinguish from kernel use. +#define SECURITY_WIN32 +#include <security.h> +#include <Schnlsp.h> +#undef SECURITY_WIN32 + + +namespace qpid { +namespace sys { + +class Timer; + +namespace windows { + +struct SslServerOptions : qpid::Options +{ + std::string certStore; + std::string certStoreLocation; + std::string certName; + uint16_t port; + bool clientAuth; + + SslServerOptions() : qpid::Options("SSL Options"), + certStore("My"), + certStoreLocation("CurrentUser"), + certName("localhost"), + port(5671), + clientAuth(false) + { + qpid::Address me; + if (qpid::sys::SystemInfo::getLocalHostname(me)) + certName = me.host; + + addOptions() + ("ssl-cert-store", optValue(certStore, "NAME"), "Local store name from which to obtain certificate") + ("ssl-cert-store-location", optValue(certStoreLocation, "NAME"), + "Local store name location for certificates ( CurrentUser | LocalMachine | CurrentService )") + ("ssl-cert-name", optValue(certName, "NAME"), "Name of the certificate to use") + ("ssl-port", optValue(port, "PORT"), "Port on which to listen for SSL connections") + ("ssl-require-client-authentication", optValue(clientAuth), + "Forces clients to authenticate in order to establish an SSL connection"); + } +}; + +class SslProtocolFactory : public qpid::sys::SocketAcceptor, public qpid::sys::TransportConnector { + Timer& brokerTimer; + uint32_t maxNegotiateTime; + const bool tcpNoDelay; + std::string brokerHost; + const bool clientAuthSelected; + std::auto_ptr<qpid::sys::AsynchAcceptor> acceptor; + ConnectFailedCallback connectFailedCallback; + CredHandle credHandle; + + public: + SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions&, Timer& timer); + ~SslProtocolFactory(); + + void connect(sys::Poller::shared_ptr, const std::string& name, const std::string& host, const std::string& port, + sys::ConnectionCodec::Factory*, + ConnectFailedCallback failed); + + private: + void connectFailed(const qpid::sys::Socket&, + int err, + const std::string& msg); + void establishedIncoming(sys::Poller::shared_ptr, const qpid::sys::Socket&, sys::ConnectionCodec::Factory*); + void establishedOutgoing(sys::Poller::shared_ptr, const qpid::sys::Socket&, sys::ConnectionCodec::Factory*, std::string& ); + void establishedCommon(sys::Poller::shared_ptr, sys::AsynchIOHandler*, sys::AsynchIO*, const qpid::sys::Socket&); +}; + +// Static instance to initialise plugin +static struct SslPlugin : public Plugin { + SslServerOptions options; + + Options* getOptions() { return &options; } + + void earlyInitialize(Target&) { + } + + void initialize(Target& target) { + broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); + // Only provide to a Broker + if (broker) { + try { + const broker::Broker::Options& opts = broker->getOptions(); + boost::shared_ptr<SslProtocolFactory> protocol(new SslProtocolFactory(opts, options, broker->getTimer())); + uint16_t port = + protocol->listen(opts.listenInterfaces, + boost::lexical_cast<std::string>(opts.port), opts.connectionBacklog, + &createSocket); + QPID_LOG(notice, "Listening for SSL connections on TCP port " << port); + broker->registerTransport("ssl", protocol, protocol, port); + } catch (const std::exception& e) { + QPID_LOG(error, "Failed to initialise SSL listener: " << e.what()); + } + } + } +} sslPlugin; + +SslProtocolFactory::SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options, Timer& timer) + : SocketAcceptor(opts.tcpNoDelay, false, opts.maxNegotiateTime, timer, + boost::bind(&SslProtocolFactory::establishedIncoming, this, _1, _2, _3)), + brokerTimer(timer), + maxNegotiateTime(opts.maxNegotiateTime), + tcpNoDelay(opts.tcpNoDelay), + clientAuthSelected(options.clientAuth) { + + // Make sure that certificate store is good before listening to sockets + // to avoid having open and listening sockets when there is no cert store + SecInvalidateHandle(&credHandle); + + // Get the certificate for this server. + DWORD flags = 0; + std::string certStoreLocation = options.certStoreLocation; + std::transform(certStoreLocation.begin(), certStoreLocation.end(), certStoreLocation.begin(), ::tolower); + if (certStoreLocation == "currentuser") { + flags = CERT_SYSTEM_STORE_CURRENT_USER; + } else if (certStoreLocation == "localmachine") { + flags = CERT_SYSTEM_STORE_LOCAL_MACHINE; + } else if (certStoreLocation == "currentservice") { + flags = CERT_SYSTEM_STORE_CURRENT_SERVICE; + } else { + QPID_LOG(error, "Unrecognised SSL certificate store location: " << options.certStoreLocation + << " - Using default location"); + } + HCERTSTORE certStoreHandle; + certStoreHandle = ::CertOpenStore(CERT_STORE_PROV_SYSTEM_A, + X509_ASN_ENCODING, + 0, + flags | + CERT_STORE_READONLY_FLAG, + options.certStore.c_str()); + if (!certStoreHandle) + throw qpid::Exception(QPID_MSG("Opening store " << options.certStore << " " << qpid::sys::strError(GetLastError()))); + + PCCERT_CONTEXT certContext; + certContext = ::CertFindCertificateInStore(certStoreHandle, + X509_ASN_ENCODING, + 0, + CERT_FIND_SUBJECT_STR_A, + options.certName.c_str(), + NULL); + if (certContext == NULL) { + int err = ::GetLastError(); + ::CertCloseStore(certStoreHandle, 0); + throw qpid::Exception(QPID_MSG("Locating certificate " << options.certName << " in store " << options.certStore << " " << qpid::sys::strError(GetLastError()))); + throw QPID_WINDOWS_ERROR(err); + } + + SCHANNEL_CRED cred; + memset(&cred, 0, sizeof(cred)); + cred.dwVersion = SCHANNEL_CRED_VERSION; + cred.cCreds = 1; + cred.paCred = &certContext; + SECURITY_STATUS status = ::AcquireCredentialsHandle(NULL, + UNISP_NAME, + SECPKG_CRED_INBOUND, + NULL, + &cred, + NULL, + NULL, + &credHandle, + NULL); + if (status != SEC_E_OK) + throw QPID_WINDOWS_ERROR(status); + ::CertFreeCertificateContext(certContext); + ::CertCloseStore(certStoreHandle, 0); +} + +SslProtocolFactory::~SslProtocolFactory() { + ::FreeCredentialsHandle(&credHandle); +} + +void SslProtocolFactory::connectFailed(const qpid::sys::Socket&, + int err, + const std::string& msg) { + if (connectFailedCallback) + connectFailedCallback(err, msg); +} + +void SslProtocolFactory::establishedIncoming(sys::Poller::shared_ptr poller, + const qpid::sys::Socket& s, + sys::ConnectionCodec::Factory* f) { + sys::AsynchIOHandler* async = new sys::AsynchIOHandler(s.getFullAddress(), f, false, false); + + sys::AsynchIO *aio = + new qpid::sys::windows::ServerSslAsynchIO( + clientAuthSelected, + s, + credHandle, + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&AsynchIOHandler::eof, async, _1), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); + + establishedCommon(poller, async, aio, s); +} + +void SslProtocolFactory::establishedOutgoing(sys::Poller::shared_ptr poller, + const qpid::sys::Socket& s, + sys::ConnectionCodec::Factory* f, + std::string& name) { + sys::AsynchIOHandler* async = new sys::AsynchIOHandler(name, f, true, false); + + sys::AsynchIO *aio = + new qpid::sys::windows::ClientSslAsynchIO( + brokerHost, + s, + credHandle, + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&AsynchIOHandler::eof, async, _1), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); + + establishedCommon(poller, async, aio, s); +} + +void SslProtocolFactory::establishedCommon(sys::Poller::shared_ptr poller, + sys::AsynchIOHandler* async, + sys::AsynchIO* aio, + const qpid::sys::Socket& s) { + if (tcpNoDelay) { + s.setTcpNoDelay(); + QPID_LOG(info, + "Set TCP_NODELAY on connection to " << s.getPeerAddress()); + } + + async->init(aio, brokerTimer, maxNegotiateTime); + aio->start(poller); +} + +void SslProtocolFactory::connect(sys::Poller::shared_ptr poller, + const std::string& name, + const std::string& host, + const std::string& port, + sys::ConnectionCodec::Factory* fact, + ConnectFailedCallback failed) +{ + SCHANNEL_CRED cred; + memset(&cred, 0, sizeof(cred)); + cred.dwVersion = SCHANNEL_CRED_VERSION; + SECURITY_STATUS status = ::AcquireCredentialsHandle(NULL, + UNISP_NAME, + SECPKG_CRED_OUTBOUND, + NULL, + &cred, + NULL, + NULL, + &credHandle, + NULL); + if (status != SEC_E_OK) + throw QPID_WINDOWS_ERROR(status); + + brokerHost = host; + // Note that the following logic does not cause a memory leak. + // The allocated Socket is freed either by the AsynchConnector + // upon connection failure or by the AsynchIO upon connection + // shutdown. The allocated AsynchConnector frees itself when it + // is no longer needed. + qpid::sys::Socket* socket = createSocket(); + connectFailedCallback = failed; + AsynchConnector::create(*socket, + host, + port, + boost::bind(&SslProtocolFactory::establishedOutgoing, + this, poller, _1, fact, name), + boost::bind(&SslProtocolFactory::connectFailed, + this, _1, _2, _3)); +} + +}}} diff --git a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp index 51cc0ed109..8655a8baa3 100644 --- a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp @@ -19,7 +19,7 @@ * */ -#include "qpid/sys/ProtocolFactory.h" +#include "qpid/sys/TransportFactory.h" #include "qpid/Plugin.h" #include "qpid/broker/Broker.h" @@ -239,7 +239,7 @@ void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) { } } -class RdmaIOProtocolFactory : public ProtocolFactory { +class RdmaIOProtocolFactory : public TransportAcceptor, public TransportConnector { auto_ptr<Rdma::Listener> listener; const uint16_t listeningPort; @@ -275,9 +275,10 @@ static class RdmaIOPlugin : public Plugin { // Only provide to a Broker if (broker) { const broker::Broker::Options& opts = broker->getOptions(); - ProtocolFactory::shared_ptr protocol(new RdmaIOProtocolFactory(opts.port, opts.connectionBacklog)); - QPID_LOG(notice, "Rdma: Listening on RDMA port " << protocol->getPort()); - broker->registerProtocolFactory("rdma", protocol); + boost::shared_ptr<RdmaIOProtocolFactory> protocol(new RdmaIOProtocolFactory(opts.port, opts.connectionBacklog)); + uint16_t port = protocol->getPort(); + QPID_LOG(notice, "Rdma: Listening on RDMA port " << port); + broker->registerTransport("rdma", protocol, protocol, port); } } } rdmaPlugin; diff --git a/qpid/cpp/src/qpid/sys/SocketTransport.cpp b/qpid/cpp/src/qpid/sys/SocketTransport.cpp new file mode 100644 index 0000000000..091851713e --- /dev/null +++ b/qpid/cpp/src/qpid/sys/SocketTransport.cpp @@ -0,0 +1,209 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/SocketTransport.h" + +#include "qpid/broker/NameGenerator.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/AsynchIOHandler.h" +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/Socket.h" +#include "qpid/sys/SocketAddress.h" +#include "qpid/sys/SystemInfo.h" + +#include <boost/bind.hpp> + +namespace qpid { +namespace sys { + +namespace { + void establishedCommon( + AsynchIOHandler* async, + boost::shared_ptr<Poller> poller, const SocketTransportOptions& opts, Timer* timer, + const Socket& s) + { + if (opts.tcpNoDelay) { + s.setTcpNoDelay(); + QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress()); + } + + AsynchIO* aio = AsynchIO::create + (s, + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&AsynchIOHandler::eof, async, _1), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); + + async->init(aio, *timer, opts.maxNegotiateTime); + aio->start(poller); + } + + void establishedIncoming( + boost::shared_ptr<Poller> poller, const SocketTransportOptions& opts, Timer* timer, + const Socket& s, ConnectionCodec::Factory* f) + { + AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, opts.nodict); + establishedCommon(async, poller, opts, timer, s); + } + + void establishedOutgoing( + boost::shared_ptr<Poller> poller, const SocketTransportOptions& opts, Timer* timer, + const Socket& s, ConnectionCodec::Factory* f, const std::string& name) + { + AsynchIOHandler* async = new AsynchIOHandler(name, f, true, opts.nodict); + establishedCommon(async, poller, opts, timer, s); + } + + void connectFailed( + const Socket& s, int ec, const std::string& emsg, + SocketConnector::ConnectFailedCallback failedCb) + { + failedCb(ec, emsg); + s.close(); + delete &s; + } + + // Expand list of Interfaces and addresses to a list of addresses + std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) { + std::vector<std::string> addresses; + // If there are no specific interfaces listed use a single "" to listen on every interface + if (interfaces.empty()) { + addresses.push_back(""); + return addresses; + } + for (unsigned i = 0; i < interfaces.size(); ++i) { + const std::string& interface = interfaces[i]; + if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) { + // We don't have an interface of that name - + // Check for IPv6 ('[' ']') brackets and remove them + // then pass to be looked up directly + if (interface[0]=='[' && interface[interface.size()-1]==']') { + addresses.push_back(interface.substr(1, interface.size()-2)); + } else { + addresses.push_back(interface); + } + } + } + return addresses; + } +} + +SocketAcceptor::SocketAcceptor(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer0) : + timer(timer0), + options(tcpNoDelay, nodict, maxNegotiateTime), + established(boost::bind(&establishedIncoming, _1, options, &timer, _2, _3)) +{} + +SocketAcceptor::SocketAcceptor(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer0, const EstablishedCallback& established0) : + timer(timer0), + options(tcpNoDelay, nodict, maxNegotiateTime), + established(established0) +{} + +void SocketAcceptor::addListener(Socket* socket) +{ + listeners.push_back(socket); +} + +uint16_t SocketAcceptor::listen(const std::vector<std::string>& interfaces, const std::string& port, int backlog, const SocketFactory& factory) +{ + std::vector<std::string> addresses = expandInterfaces(interfaces); + if (addresses.empty()) { + // We specified some interfaces, but couldn't find addresses for them + QPID_LOG(warning, "TCP/TCP6: No specified network interfaces found: Not Listening"); + return 0; + } + + int listeningPort = 0; + for (unsigned i = 0; i<addresses.size(); ++i) { + QPID_LOG(debug, "Using interface: " << addresses[i]); + SocketAddress sa(addresses[i], port); + + // We must have at least one resolved address + QPID_LOG(info, "Listening to: " << sa.asString()) + Socket* s = factory(); + uint16_t lport = s->listen(sa, backlog); + QPID_LOG(debug, "Listened to: " << lport); + addListener(s); + + listeningPort = lport; + + // Try any other resolved addresses + while (sa.nextAddress()) { + // Hack to ensure that all listening connections are on the same port + sa.setAddrInfoPort(listeningPort); + QPID_LOG(info, "Listening to: " << sa.asString()) + Socket* s = factory(); + uint16_t lport = s->listen(sa, backlog); + QPID_LOG(debug, "Listened to: " << lport); + addListener(s); + } + } + return listeningPort; +} + +void SocketAcceptor::accept(boost::shared_ptr<Poller> poller, ConnectionCodec::Factory* f) +{ + for (unsigned i = 0; i<listeners.size(); ++i) { + acceptors.push_back( + AsynchAcceptor::create(listeners[i], boost::bind(established, poller, _1, f))); + acceptors[i].start(poller); + } +} + +SocketConnector::SocketConnector(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer0, const SocketFactory& factory0) : + timer(timer0), + factory(factory0), + options(tcpNoDelay, nodict, maxNegotiateTime) +{} + +void SocketConnector::connect( + boost::shared_ptr<Poller> poller, + const std::string& name, + const std::string& host, const std::string& port, + ConnectionCodec::Factory* fact, + ConnectFailedCallback failed) +{ + // Note that the following logic does not cause a memory leak. + // The allocated Socket is freed either by the AsynchConnector + // upon connection failure or by the AsynchIO upon connection + // shutdown. The allocated AsynchConnector frees itself when it + // is no longer needed. + Socket* socket = factory(); + try { + AsynchConnector* c = AsynchConnector::create( + *socket, + host, + port, + boost::bind(&establishedOutgoing, poller, options, &timer, _1, fact, name), + boost::bind(&connectFailed, _1, _2, _3, failed)); + c->start(poller); + } catch (std::exception&) { + // TODO: Design question - should we do the error callback and also throw? + int errCode = socket->getError(); + connectFailed(*socket, errCode, strError(errCode), failed); + throw; + } +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/SocketTransport.h b/qpid/cpp/src/qpid/sys/SocketTransport.h new file mode 100644 index 0000000000..b2f1e72907 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/SocketTransport.h @@ -0,0 +1,91 @@ +#ifndef QPID_SYS_SOCKETTRANSPORT_H +#define QPID_SYS_SOCKETTRANSPORT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/TransportFactory.h" + +#include "qpid/sys/IntegerTypes.h" +#include "qpid/sys/ConnectionCodec.h" +#include <boost/ptr_container/ptr_vector.hpp> +#include <boost/function.hpp> + +namespace qpid { +namespace sys { + +class AsynchAcceptor; +class Poller; +class Timer; +class Socket; +typedef boost::function0<Socket*> SocketFactory; +typedef boost::function3<void, boost::shared_ptr<Poller>, const Socket&, ConnectionCodec::Factory*> EstablishedCallback; + +struct SocketTransportOptions { + bool tcpNoDelay; + bool nodict; + uint32_t maxNegotiateTime; + + SocketTransportOptions(bool t, bool d, uint32_t m) : + tcpNoDelay(t), + nodict(d), + maxNegotiateTime(m) + {} +}; + +class SocketAcceptor : public TransportAcceptor { + boost::ptr_vector<Socket> listeners; + boost::ptr_vector<AsynchAcceptor> acceptors; + Timer& timer; + SocketTransportOptions options; + const EstablishedCallback established; + +public: + SocketAcceptor(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer); + SocketAcceptor(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer, const EstablishedCallback& established); + + // Create sockets from list of interfaces and listen to them + uint16_t listen(const std::vector<std::string>& interfaces, const std::string& port, int backlog, const SocketFactory& factory); + + // Import sockets that are already being listened to + void addListener(Socket* socket); + + void accept(boost::shared_ptr<Poller> poller, ConnectionCodec::Factory* f); +}; + +class SocketConnector : public TransportConnector { + Timer& timer; + const SocketFactory factory; + SocketTransportOptions options; + +public: + SocketConnector(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer, const SocketFactory& factory); + + void connect(boost::shared_ptr<Poller> poller, + const std::string& name, + const std::string& host, const std::string& port, + ConnectionCodec::Factory* f, + ConnectFailedCallback failed); +}; + +}} + +#endif diff --git a/qpid/cpp/src/qpid/sys/SslPlugin.cpp b/qpid/cpp/src/qpid/sys/SslPlugin.cpp index a40da24eb8..20ca9256fc 100644 --- a/qpid/cpp/src/qpid/sys/SslPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/SslPlugin.cpp @@ -19,22 +19,17 @@ * */ -#include "qpid/sys/ProtocolFactory.h" +#include "qpid/sys/TransportFactory.h" #include "qpid/Plugin.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/NameGenerator.h" #include "qpid/log/Statement.h" -#include "qpid/sys/AsynchIOHandler.h" #include "qpid/sys/AsynchIO.h" +#include "qpid/sys/SocketTransport.h" #include "qpid/sys/ssl/util.h" #include "qpid/sys/ssl/SslSocket.h" -#include "qpid/sys/SocketAddress.h" -#include "qpid/sys/SystemInfo.h" -#include "qpid/sys/Poller.h" #include <boost/bind.hpp> -#include <boost/ptr_container/ptr_vector.hpp> namespace qpid { namespace sys { @@ -64,32 +59,20 @@ struct SslServerOptions : ssl::SslOptions } }; -class SslProtocolFactory : public ProtocolFactory { - boost::ptr_vector<Socket> listeners; - boost::ptr_vector<AsynchAcceptor> acceptors; - Timer& brokerTimer; - uint32_t maxNegotiateTime; - uint16_t listeningPort; - const bool tcpNoDelay; - bool nodict; - - public: - SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options, - Timer& timer); - void accept(Poller::shared_ptr, ConnectionCodec::Factory*); - void connect(Poller::shared_ptr, const std::string& name, const std::string& host, const std::string& port, - ConnectionCodec::Factory*, - ConnectFailedCallback); +namespace { + Socket* createServerSSLSocket(const SslServerOptions& options) { + return new SslSocket(options.certName, options.clientAuth); + } - uint16_t getPort() const; + Socket* createServerSSLMuxSocket(const SslServerOptions& options) { + return new SslMuxSocket(options.certName, options.clientAuth); + } - private: - void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*); - void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&); - void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&); - void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback); -}; + Socket* createClientSSLSocket() { + return new SslSocket(); + } +} // Static instance to initialise plugin static struct SslPlugin : public Plugin { @@ -104,7 +87,7 @@ static struct SslPlugin : public Plugin { void earlyInitialize(Target& target) { broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); if (broker && !options.certDbPath.empty()) { - const broker::Broker::Options& opts = broker->getOptions(); + broker::Broker::Options& opts = broker->getOptions(); if (opts.port == options.port && // AMQP & AMQPS ports are the same opts.port != 0) { @@ -132,18 +115,25 @@ static struct SslPlugin : public Plugin { nssInitialized = true; const broker::Broker::Options& opts = broker->getOptions(); - - ProtocolFactory::shared_ptr protocol( - static_cast<ProtocolFactory*>(new SslProtocolFactory(opts, options, broker->getTimer()))); - - if (protocol->getPort()!=0 ) { + TransportAcceptor::shared_ptr ta; + SocketAcceptor* sa = + new SocketAcceptor(opts.tcpNoDelay, options.nodict, opts.maxNegotiateTime, broker->getTimer()); + uint16_t port = sa->listen(opts.listenInterfaces, boost::lexical_cast<std::string>(options.port), opts.connectionBacklog, + options.multiplex ? + boost::bind(&createServerSSLMuxSocket, options) : + boost::bind(&createServerSSLSocket, options)); + if ( port!=0 ) { + ta.reset(sa); QPID_LOG(notice, "Listening for " << (options.multiplex ? "SSL or TCP" : "SSL") << " connections on TCP/TCP6 port " << - protocol->getPort()); + port); } - broker->registerProtocolFactory("ssl", protocol); + TransportConnector::shared_ptr tc( + new SocketConnector(opts.tcpNoDelay, options.nodict, opts.maxNegotiateTime, broker->getTimer(), + &createClientSSLSocket)); + broker->registerTransport("ssl", ta, tc, port); } catch (const std::exception& e) { QPID_LOG(error, "Failed to initialise SSL plugin: " << e.what()); } @@ -152,160 +142,4 @@ static struct SslPlugin : public Plugin { } } sslPlugin; -namespace { - // Expand list of Interfaces and addresses to a list of addresses - std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) { - std::vector<std::string> addresses; - // If there are no specific interfaces listed use a single "" to listen on every interface - if (interfaces.empty()) { - addresses.push_back(""); - return addresses; - } - for (unsigned i = 0; i < interfaces.size(); ++i) { - const std::string& interface = interfaces[i]; - if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) { - // We don't have an interface of that name - - // Check for IPv6 ('[' ']') brackets and remove them - // then pass to be looked up directly - if (interface[0]=='[' && interface[interface.size()-1]==']') { - addresses.push_back(interface.substr(1, interface.size()-2)); - } else { - addresses.push_back(interface); - } - } - } - return addresses; - } -} - -SslProtocolFactory::SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options, - Timer& timer) : - brokerTimer(timer), - maxNegotiateTime(opts.maxNegotiateTime), - tcpNoDelay(opts.tcpNoDelay), - nodict(options.nodict) -{ - std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces); - if (addresses.empty()) { - // We specified some interfaces, but couldn't find addresses for them - QPID_LOG(warning, "SSL: No specified network interfaces found: Not Listening"); - listeningPort = 0; - } - - for (unsigned i = 0; i<addresses.size(); ++i) { - QPID_LOG(debug, "Using interface: " << addresses[i]); - SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(options.port)); - - // We must have at least one resolved address - QPID_LOG(info, "Listening to: " << sa.asString()) - Socket* s = options.multiplex ? - new SslMuxSocket(options.certName, options.clientAuth) : - new SslSocket(options.certName, options.clientAuth); - uint16_t lport = s->listen(sa, opts.connectionBacklog); - QPID_LOG(debug, "Listened to: " << lport); - listeners.push_back(s); - - listeningPort = lport; - - // Try any other resolved addresses - while (sa.nextAddress()) { - // Hack to ensure that all listening connections are on the same port - sa.setAddrInfoPort(listeningPort); - QPID_LOG(info, "Listening to: " << sa.asString()) - Socket* s = options.multiplex ? - new SslMuxSocket(options.certName, options.clientAuth) : - new SslSocket(options.certName, options.clientAuth); - uint16_t lport = s->listen(sa, opts.connectionBacklog); - QPID_LOG(debug, "Listened to: " << lport); - listeners.push_back(s); - } - } -} - -void SslProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s, - ConnectionCodec::Factory* f) { - AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false); - establishedCommon(async, poller, s); -} - -void SslProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s, - ConnectionCodec::Factory* f, const std::string& name) { - AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false); - establishedCommon(async, poller, s); -} - -void SslProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) { - if (tcpNoDelay) { - s.setTcpNoDelay(); - QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress()); - } - - AsynchIO* aio = AsynchIO::create( - s, - boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), - boost::bind(&AsynchIOHandler::eof, async, _1), - boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), - boost::bind(&AsynchIOHandler::nobuffs, async, _1), - boost::bind(&AsynchIOHandler::idle, async, _1)); - - async->init(aio, brokerTimer, maxNegotiateTime); - aio->start(poller); -} - -uint16_t SslProtocolFactory::getPort() const { - return listeningPort; // Immutable no need for lock. -} - -void SslProtocolFactory::accept(Poller::shared_ptr poller, - ConnectionCodec::Factory* fact) { - for (unsigned i = 0; i<listeners.size(); ++i) { - acceptors.push_back( - AsynchAcceptor::create(listeners[i], - boost::bind(&SslProtocolFactory::establishedIncoming, this, poller, _1, fact))); - acceptors[i].start(poller); - } -} - -void SslProtocolFactory::connectFailed( - const Socket& s, int ec, const std::string& emsg, - ConnectFailedCallback failedCb) -{ - failedCb(ec, emsg); - s.close(); - delete &s; -} - -void SslProtocolFactory::connect( - Poller::shared_ptr poller, - const std::string& name, - const std::string& host, const std::string& port, - ConnectionCodec::Factory* fact, - ConnectFailedCallback failed) -{ - // Note that the following logic does not cause a memory leak. - // The allocated Socket is freed either by the SslConnector - // upon connection failure or by the SslIoHandle upon connection - // shutdown. The allocated SslConnector frees itself when it - // is no longer needed. - - Socket* socket = new qpid::sys::ssl::SslSocket(); - try { - AsynchConnector* c = AsynchConnector::create( - *socket, - host, - port, - boost::bind(&SslProtocolFactory::establishedOutgoing, - this, poller, _1, fact, name), - boost::bind(&SslProtocolFactory::connectFailed, - this, _1, _2, _3, failed)); - c->start(poller); - } catch (std::exception&) { - // TODO: Design question - should we do the error callback and also throw? - int errCode = socket->getError(); - connectFailed(*socket, errCode, strError(errCode), failed); - throw; - } -} - }} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp index 1ef8708cd0..f9be1043f8 100644 --- a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -19,52 +19,18 @@ * */ -#include "qpid/sys/ProtocolFactory.h" +#include "qpid/sys/TransportFactory.h" #include "qpid/Plugin.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/NameGenerator.h" #include "qpid/log/Statement.h" -#include "qpid/sys/AsynchIOHandler.h" #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Socket.h" -#include "qpid/sys/SocketAddress.h" -#include "qpid/sys/SystemInfo.h" -#include "qpid/sys/Poller.h" - -#include <boost/bind.hpp> -#include <boost/ptr_container/ptr_vector.hpp> +#include "qpid/sys/SocketTransport.h" namespace qpid { namespace sys { -class Timer; - -class AsynchIOProtocolFactory : public ProtocolFactory { - boost::ptr_vector<Socket> listeners; - boost::ptr_vector<AsynchAcceptor> acceptors; - Timer& brokerTimer; - uint32_t maxNegotiateTime; - uint16_t listeningPort; - const bool tcpNoDelay; - - public: - AsynchIOProtocolFactory(const qpid::broker::Broker::Options& opts, Timer& timer, bool shouldListen); - void accept(Poller::shared_ptr, ConnectionCodec::Factory*); - void connect(Poller::shared_ptr, const std::string& name, - const std::string& host, const std::string& port, - ConnectionCodec::Factory*, - ConnectFailedCallback); - - uint16_t getPort() const; - - private: - void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*); - void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&); - void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&); - void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback); -}; - static bool sslMultiplexEnabled(void) { Options o; @@ -93,170 +59,22 @@ static class TCPIOPlugin : public Plugin { // Check for SSL on the same port bool shouldListen = !sslMultiplexEnabled(); - ProtocolFactory::shared_ptr protocolt( - new AsynchIOProtocolFactory(opts, broker->getTimer(),shouldListen)); - - if (shouldListen && protocolt->getPort()!=0 ) { - QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort()); - } - - broker->registerProtocolFactory("tcp", protocolt); - } - } -} tcpPlugin; - -namespace { - // Expand list of Interfaces and addresses to a list of addresses - std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) { - std::vector<std::string> addresses; - // If there are no specific interfaces listed use a single "" to listen on every interface - if (interfaces.empty()) { - addresses.push_back(""); - return addresses; - } - for (unsigned i = 0; i < interfaces.size(); ++i) { - const std::string& interface = interfaces[i]; - if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) { - // We don't have an interface of that name - - // Check for IPv6 ('[' ']') brackets and remove them - // then pass to be looked up directly - if (interface[0]=='[' && interface[interface.size()-1]==']') { - addresses.push_back(interface.substr(1, interface.size()-2)); - } else { - addresses.push_back(interface); + uint16_t port = opts.port; + TransportAcceptor::shared_ptr ta; + if (shouldListen) { + SocketAcceptor* aa = new SocketAcceptor(opts.tcpNoDelay, false, opts.maxNegotiateTime, broker->getTimer()); + ta.reset(aa); + port = aa->listen(opts.listenInterfaces, boost::lexical_cast<std::string>(opts.port), opts.connectionBacklog, &createSocket); + if ( port!=0 ) { + QPID_LOG(notice, "Listening on TCP/TCP6 port " << port); } } - } - return addresses; - } -} - -AsynchIOProtocolFactory::AsynchIOProtocolFactory(const qpid::broker::Broker::Options& opts, Timer& timer, bool shouldListen) : - brokerTimer(timer), - maxNegotiateTime(opts.maxNegotiateTime), - tcpNoDelay(opts.tcpNoDelay) -{ - if (!shouldListen) { - listeningPort = boost::lexical_cast<uint16_t>(opts.port); - return; - } - - std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces); - if (addresses.empty()) { - // We specified some interfaces, but couldn't find addresses for them - QPID_LOG(warning, "TCP/TCP6: No specified network interfaces found: Not Listening"); - listeningPort = 0; - } - - for (unsigned i = 0; i<addresses.size(); ++i) { - QPID_LOG(debug, "Using interface: " << addresses[i]); - SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(opts.port)); - - // We must have at least one resolved address - QPID_LOG(info, "Listening to: " << sa.asString()) - Socket* s = createSocket(); - uint16_t lport = s->listen(sa, opts.connectionBacklog); - QPID_LOG(debug, "Listened to: " << lport); - listeners.push_back(s); - listeningPort = lport; + TransportConnector::shared_ptr tc(new SocketConnector(opts.tcpNoDelay, false, opts.maxNegotiateTime, broker->getTimer(), &createSocket)); - // Try any other resolved addresses - while (sa.nextAddress()) { - // Hack to ensure that all listening connections are on the same port - sa.setAddrInfoPort(listeningPort); - QPID_LOG(info, "Listening to: " << sa.asString()) - Socket* s = createSocket(); - uint16_t lport = s->listen(sa, opts.connectionBacklog); - QPID_LOG(debug, "Listened to: " << lport); - listeners.push_back(s); + broker->registerTransport("tcp", ta, tc, port); } } -} - -void AsynchIOProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s, - ConnectionCodec::Factory* f) { - AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false); - establishedCommon(async, poller, s); -} - -void AsynchIOProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s, - ConnectionCodec::Factory* f, const std::string& name) { - AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false); - establishedCommon(async, poller, s); -} - -void AsynchIOProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) { - if (tcpNoDelay) { - s.setTcpNoDelay(); - QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress()); - } - - AsynchIO* aio = AsynchIO::create - (s, - boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), - boost::bind(&AsynchIOHandler::eof, async, _1), - boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), - boost::bind(&AsynchIOHandler::nobuffs, async, _1), - boost::bind(&AsynchIOHandler::idle, async, _1)); - - async->init(aio, brokerTimer, maxNegotiateTime); - aio->start(poller); -} - -uint16_t AsynchIOProtocolFactory::getPort() const { - return listeningPort; // Immutable no need for lock. -} - -void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, - ConnectionCodec::Factory* fact) { - for (unsigned i = 0; i<listeners.size(); ++i) { - acceptors.push_back( - AsynchAcceptor::create(listeners[i], - boost::bind(&AsynchIOProtocolFactory::establishedIncoming, this, poller, _1, fact))); - acceptors[i].start(poller); - } -} - -void AsynchIOProtocolFactory::connectFailed( - const Socket& s, int ec, const std::string& emsg, - ConnectFailedCallback failedCb) -{ - failedCb(ec, emsg); - s.close(); - delete &s; -} - -void AsynchIOProtocolFactory::connect( - Poller::shared_ptr poller, - const std::string& name, - const std::string& host, const std::string& port, - ConnectionCodec::Factory* fact, - ConnectFailedCallback failed) -{ - // Note that the following logic does not cause a memory leak. - // The allocated Socket is freed either by the AsynchConnector - // upon connection failure or by the AsynchIO upon connection - // shutdown. The allocated AsynchConnector frees itself when it - // is no longer needed. - Socket* socket = createSocket(); - try { - AsynchConnector* c = AsynchConnector::create( - *socket, - host, - port, - boost::bind(&AsynchIOProtocolFactory::establishedOutgoing, - this, poller, _1, fact, name), - boost::bind(&AsynchIOProtocolFactory::connectFailed, - this, _1, _2, _3, failed)); - c->start(poller); - } catch (std::exception&) { - // TODO: Design question - should we do the error callback and also throw? - int errCode = socket->getError(); - connectFailed(*socket, errCode, strError(errCode), failed); - throw; - } -} +} tcpPlugin; }} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/ProtocolFactory.h b/qpid/cpp/src/qpid/sys/TransportFactory.h index 236398c111..06aa168024 100644 --- a/qpid/cpp/src/qpid/sys/ProtocolFactory.h +++ b/qpid/cpp/src/qpid/sys/TransportFactory.h @@ -1,5 +1,5 @@ -#ifndef _sys_ProtocolFactory_h -#define _sys_ProtocolFactory_h +#ifndef QPID_SYS_TRANSPORTFACTORY_H +#define QPID_SYS_TRANSPORTFACTORY_H /* * @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,24 +22,34 @@ * */ -#include "qpid/sys/IntegerTypes.h" #include "qpid/SharedObject.h" #include "qpid/sys/ConnectionCodec.h" +#include <string> #include <boost/function.hpp> +#include <boost/shared_ptr.hpp> namespace qpid { namespace sys { +class AsynchAcceptor; class Poller; +class Timer; -class ProtocolFactory : public qpid::SharedObject<ProtocolFactory> +class TransportAcceptor : public qpid::SharedObject<TransportAcceptor> { public: + virtual ~TransportAcceptor() = 0; + virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0; +}; + +inline TransportAcceptor::~TransportAcceptor() {} + +class TransportConnector : public qpid::SharedObject<TransportConnector> +{ +public: typedef boost::function2<void, int, std::string> ConnectFailedCallback; - virtual ~ProtocolFactory() = 0; - virtual uint16_t getPort() const = 0; - virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0; + virtual ~TransportConnector() = 0; virtual void connect( boost::shared_ptr<Poller>, const std::string& name, @@ -48,10 +58,8 @@ class ProtocolFactory : public qpid::SharedObject<ProtocolFactory> ConnectFailedCallback failed) = 0; }; -inline ProtocolFactory::~ProtocolFactory() {} +inline TransportConnector::~TransportConnector() {} }} - - -#endif //!_sys_ProtocolFactory_h +#endif diff --git a/qpid/cpp/src/qpid/sys/windows/WinSocket.cpp b/qpid/cpp/src/qpid/sys/windows/WinSocket.cpp index b2d2d79c63..080600adcd 100644 --- a/qpid/cpp/src/qpid/sys/windows/WinSocket.cpp +++ b/qpid/cpp/src/qpid/sys/windows/WinSocket.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
