summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp19
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h8
-rw-r--r--qpid/cpp/src/qpid/sys/SslPlugin.cpp48
-rw-r--r--qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp19
4 files changed, 50 insertions, 44 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index f8c6ccc3b1..be69516072 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -162,6 +162,7 @@ Broker::Options::Options(const std::string& name) :
("no-data-dir", optValue(noDataDir), "Don't use a data directory. No persistent configuration will be loaded or stored")
("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT")
("interface", optValue(listenInterfaces, "<interface name>|<interface address>"), "Which network interfaces to use to listen for incoming connections")
+ ("listen-disable", optValue(listenDisabled, "<transport name>"), "Transports to disable listening")
("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size")
("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
@@ -208,6 +209,7 @@ Broker::Broker(const Broker::Options& conf) :
managementAgent(conf.enableMgmt ? new ManagementAgent(conf.qmf1Support,
conf.qmf2Support)
: 0),
+ disabledListeningTransports(config.listenDisabled.begin(), config.listenDisabled.end()),
store(new NullMessageStore),
acl(0),
dataDir(conf.noDataDir ? std::string() : conf.dataDir),
@@ -1060,13 +1062,28 @@ uint16_t Broker::getPort(const std::string& name) const {
}
}
+bool Broker::shouldListen(std::string transport) {
+ return disabledListeningTransports.count(transport)==0;
+}
+
+void Broker::disableListening(std::string transport) {
+ disabledListeningTransports.insert(transport);
+}
+
void Broker::registerTransport(const std::string& name, boost::shared_ptr<TransportAcceptor> a, boost::shared_ptr<TransportConnector> c, uint16_t p) {
transportMap[name] = TransportInfo(a, c, p);
}
void Broker::accept() {
+ unsigned accepting = 0;
for (TransportMap::const_iterator i = transportMap.begin(); i != transportMap.end(); i++) {
- if (i->second.acceptor) i->second.acceptor->accept(poller, factory.get());
+ if (i->second.acceptor) {
+ i->second.acceptor->accept(poller, factory.get());
+ ++accepting;
+ }
+ }
+ if ( accepting==0 ) {
+ throw Exception(QPID_MSG("Failed to start broker: No transports are listening for incoming connections"));
}
}
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 0ac5fc412e..c2032ef629 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -94,6 +94,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
std::string dataDir;
uint16_t port;
std::vector<std::string> listenInterfaces;
+ std::vector<std::string> listenDisabled;
int workerThreads;
int connectionBacklog;
bool enableMgmt;
@@ -168,6 +169,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
std::auto_ptr<sys::Timer> timer;
Options config;
std::auto_ptr<management::ManagementAgent> managementAgent;
+ std::set<std::string> disabledListeningTransports;
TransportMap transportMap;
std::auto_ptr<MessageStore> store;
AclModule* acl;
@@ -247,6 +249,12 @@ class Broker : public sys::Runnable, public Plugin::Target,
QPID_BROKER_EXTERN management::Manageable::status_t ManagementMethod(
uint32_t methodId, management::Args& args, std::string& text);
+ // Should we listen using this protocol or not?
+ QPID_BROKER_EXTERN bool shouldListen(std::string transport);
+
+ // Turn off listening for a protocol
+ QPID_BROKER_EXTERN void disableListening(std::string transport);
+
/** Add to the broker's protocolFactorys */
QPID_BROKER_EXTERN void registerTransport(
const std::string& name,
diff --git a/qpid/cpp/src/qpid/sys/SslPlugin.cpp b/qpid/cpp/src/qpid/sys/SslPlugin.cpp
index 20ca9256fc..b99b93137a 100644
--- a/qpid/cpp/src/qpid/sys/SslPlugin.cpp
+++ b/qpid/cpp/src/qpid/sys/SslPlugin.cpp
@@ -43,12 +43,10 @@ struct SslServerOptions : ssl::SslOptions
uint16_t port;
bool clientAuth;
bool nodict;
- bool multiplex;
SslServerOptions() : port(5671),
clientAuth(false),
- nodict(false),
- multiplex(false)
+ nodict(false)
{
addOptions()
("ssl-port", optValue(port, "PORT"), "Port on which to listen for SSL connections")
@@ -78,10 +76,11 @@ namespace {
static struct SslPlugin : public Plugin {
SslServerOptions options;
bool nssInitialized;
+ bool multiplex;
Options* getOptions() { return &options; }
- SslPlugin() : nssInitialized(false) {}
+ SslPlugin() : nssInitialized(false), multiplex(false) {}
~SslPlugin() { if (nssInitialized) ssl::shutdownNSS(); }
void earlyInitialize(Target& target) {
@@ -90,14 +89,11 @@ static struct SslPlugin : public Plugin {
broker::Broker::Options& opts = broker->getOptions();
if (opts.port == options.port && // AMQP & AMQPS ports are the same
- opts.port != 0) {
- // The presence of this option is used to signal to the TCP
- // plugin not to start listening on the shared port. The actual
- // value cannot be configured through the command line or config
- // file (other than by setting the ports to the same value)
- // because we are only adding it after option parsing.
- options.multiplex = true;
- options.addOptions()("ssl-multiplex", optValue(options.multiplex), "Allow SSL and non-SSL connections on the same port");
+ opts.port != 0 &&
+ broker->shouldListen("tcp")&&
+ broker->shouldListen("ssl")) {
+ multiplex = true;
+ broker->disableListening("tcp");
}
}
}
@@ -115,21 +111,23 @@ static struct SslPlugin : public Plugin {
nssInitialized = true;
const broker::Broker::Options& opts = broker->getOptions();
+ uint16_t port = options.port;
TransportAcceptor::shared_ptr ta;
- SocketAcceptor* sa =
- new SocketAcceptor(opts.tcpNoDelay, options.nodict, opts.maxNegotiateTime, broker->getTimer());
- uint16_t port = sa->listen(opts.listenInterfaces, boost::lexical_cast<std::string>(options.port), opts.connectionBacklog,
- options.multiplex ?
- boost::bind(&createServerSSLMuxSocket, options) :
- boost::bind(&createServerSSLSocket, options));
- if ( port!=0 ) {
- ta.reset(sa);
- QPID_LOG(notice, "Listening for " <<
- (options.multiplex ? "SSL or TCP" : "SSL") <<
- " connections on TCP/TCP6 port " <<
- port);
+ if (broker->shouldListen("ssl")) {
+ SocketAcceptor* sa =
+ new SocketAcceptor(opts.tcpNoDelay, options.nodict, opts.maxNegotiateTime, broker->getTimer());
+ port = sa->listen(opts.listenInterfaces, boost::lexical_cast<std::string>(options.port), opts.connectionBacklog,
+ multiplex ?
+ boost::bind(&createServerSSLMuxSocket, options) :
+ boost::bind(&createServerSSLSocket, options));
+ if ( port!=0 ) {
+ ta.reset(sa);
+ QPID_LOG(notice, "Listening for " <<
+ (multiplex ? "SSL or TCP" : "SSL") <<
+ " connections on TCP/TCP6 port " <<
+ port);
+ }
}
-
TransportConnector::shared_ptr tc(
new SocketConnector(opts.tcpNoDelay, options.nodict, opts.maxNegotiateTime, broker->getTimer(),
&createClientSSLSocket));
diff --git a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
index f9be1043f8..0910cbef20 100644
--- a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -31,20 +31,6 @@
namespace qpid {
namespace sys {
-static bool sslMultiplexEnabled(void)
-{
- Options o;
- Plugin::addOptions(o);
-
- if (o.find_nothrow("ssl-multiplex", false)) {
- // This option is added by the SSL plugin when the SSL port
- // is configured to be the same as the main port.
- QPID_LOG(notice, "SSL multiplexing enabled");
- return true;
- }
- return false;
-}
-
// Static instance to initialise plugin
static class TCPIOPlugin : public Plugin {
void earlyInitialize(Target&) {
@@ -56,12 +42,9 @@ static class TCPIOPlugin : public Plugin {
if (broker) {
const broker::Broker::Options& opts = broker->getOptions();
- // Check for SSL on the same port
- bool shouldListen = !sslMultiplexEnabled();
-
uint16_t port = opts.port;
TransportAcceptor::shared_ptr ta;
- if (shouldListen) {
+ if (broker->shouldListen("tcp")) {
SocketAcceptor* aa = new SocketAcceptor(opts.tcpNoDelay, false, opts.maxNegotiateTime, broker->getTimer());
ta.reset(aa);
port = aa->listen(opts.listenInterfaces, boost::lexical_cast<std::string>(opts.port), opts.connectionBacklog, &createSocket);