diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/sys/TCPIOPlugin.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/TCPIOPlugin.cpp')
-rw-r--r-- | cpp/src/qpid/sys/TCPIOPlugin.cpp | 136 |
1 files changed, 86 insertions, 50 deletions
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp index ed7cc3748d..1ef8708cd0 100644 --- a/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -20,15 +20,17 @@ */ #include "qpid/sys/ProtocolFactory.h" -#include "qpid/sys/AsynchIOHandler.h" -#include "qpid/sys/AsynchIO.h" #include "qpid/Plugin.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/NameGenerator.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/AsynchIOHandler.h" +#include "qpid/sys/AsynchIO.h" #include "qpid/sys/Socket.h" #include "qpid/sys/SocketAddress.h" +#include "qpid/sys/SystemInfo.h" #include "qpid/sys/Poller.h" -#include "qpid/broker/Broker.h" -#include "qpid/log/Statement.h" #include <boost/bind.hpp> #include <boost/ptr_container/ptr_vector.hpp> @@ -47,20 +49,19 @@ class AsynchIOProtocolFactory : public ProtocolFactory { const bool tcpNoDelay; public: - AsynchIOProtocolFactory(const std::string& host, const std::string& port, - int backlog, bool nodelay, - Timer& timer, uint32_t maxTime, - bool shouldListen); + AsynchIOProtocolFactory(const qpid::broker::Broker::Options& opts, Timer& timer, bool shouldListen); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); - void connect(Poller::shared_ptr, const std::string& host, const std::string& port, + void connect(Poller::shared_ptr, const std::string& name, + const std::string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback); uint16_t getPort() const; private: - void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, - bool isClient); + void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*); + void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&); + void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&); void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback); }; @@ -93,14 +94,9 @@ static class TCPIOPlugin : public Plugin { bool shouldListen = !sslMultiplexEnabled(); ProtocolFactory::shared_ptr protocolt( - new AsynchIOProtocolFactory( - "", boost::lexical_cast<std::string>(opts.port), - opts.connectionBacklog, - opts.tcpNoDelay, - broker->getTimer(), opts.maxNegotiateTime, - shouldListen)); - - if (shouldListen) { + new AsynchIOProtocolFactory(opts, broker->getTimer(),shouldListen)); + + if (shouldListen && protocolt->getPort()!=0 ) { QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort()); } @@ -109,54 +105,93 @@ static class TCPIOPlugin : public Plugin { } } tcpPlugin; -AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, - int backlog, bool nodelay, - Timer& timer, uint32_t maxTime, - bool shouldListen) : +namespace { + // Expand list of Interfaces and addresses to a list of addresses + std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) { + std::vector<std::string> addresses; + // If there are no specific interfaces listed use a single "" to listen on every interface + if (interfaces.empty()) { + addresses.push_back(""); + return addresses; + } + for (unsigned i = 0; i < interfaces.size(); ++i) { + const std::string& interface = interfaces[i]; + if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) { + // We don't have an interface of that name - + // Check for IPv6 ('[' ']') brackets and remove them + // then pass to be looked up directly + if (interface[0]=='[' && interface[interface.size()-1]==']') { + addresses.push_back(interface.substr(1, interface.size()-2)); + } else { + addresses.push_back(interface); + } + } + } + return addresses; + } +} + +AsynchIOProtocolFactory::AsynchIOProtocolFactory(const qpid::broker::Broker::Options& opts, Timer& timer, bool shouldListen) : brokerTimer(timer), - maxNegotiateTime(maxTime), - tcpNoDelay(nodelay) + maxNegotiateTime(opts.maxNegotiateTime), + tcpNoDelay(opts.tcpNoDelay) { if (!shouldListen) { - listeningPort = boost::lexical_cast<uint16_t>(port); + listeningPort = boost::lexical_cast<uint16_t>(opts.port); return; } - SocketAddress sa(host, port); - - // We must have at least one resolved address - QPID_LOG(info, "Listening to: " << sa.asString()) - Socket* s = new Socket; - uint16_t lport = s->listen(sa, backlog); - QPID_LOG(debug, "Listened to: " << lport); - listeners.push_back(s); + std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces); + if (addresses.empty()) { + // We specified some interfaces, but couldn't find addresses for them + QPID_LOG(warning, "TCP/TCP6: No specified network interfaces found: Not Listening"); + listeningPort = 0; + } - listeningPort = lport; + for (unsigned i = 0; i<addresses.size(); ++i) { + QPID_LOG(debug, "Using interface: " << addresses[i]); + SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(opts.port)); - // Try any other resolved addresses - while (sa.nextAddress()) { - // Hack to ensure that all listening connections are on the same port - sa.setAddrInfoPort(listeningPort); + // We must have at least one resolved address QPID_LOG(info, "Listening to: " << sa.asString()) - Socket* s = new Socket; - uint16_t lport = s->listen(sa, backlog); + Socket* s = createSocket(); + uint16_t lport = s->listen(sa, opts.connectionBacklog); QPID_LOG(debug, "Listened to: " << lport); listeners.push_back(s); + + listeningPort = lport; + + // Try any other resolved addresses + while (sa.nextAddress()) { + // Hack to ensure that all listening connections are on the same port + sa.setAddrInfoPort(listeningPort); + QPID_LOG(info, "Listening to: " << sa.asString()) + Socket* s = createSocket(); + uint16_t lport = s->listen(sa, opts.connectionBacklog); + QPID_LOG(debug, "Listened to: " << lport); + listeners.push_back(s); + } } +} +void AsynchIOProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s, + ConnectionCodec::Factory* f) { + AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false); + establishedCommon(async, poller, s); } -void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, - ConnectionCodec::Factory* f, bool isClient) { - AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f); +void AsynchIOProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s, + ConnectionCodec::Factory* f, const std::string& name) { + AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false); + establishedCommon(async, poller, s); +} +void AsynchIOProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) { if (tcpNoDelay) { s.setTcpNoDelay(); QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress()); } - if (isClient) - async->setClient(); AsynchIO* aio = AsynchIO::create (s, boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), @@ -179,7 +214,7 @@ void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, for (unsigned i = 0; i<listeners.size(); ++i) { acceptors.push_back( AsynchAcceptor::create(listeners[i], - boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false))); + boost::bind(&AsynchIOProtocolFactory::establishedIncoming, this, poller, _1, fact))); acceptors[i].start(poller); } } @@ -195,6 +230,7 @@ void AsynchIOProtocolFactory::connectFailed( void AsynchIOProtocolFactory::connect( Poller::shared_ptr poller, + const std::string& name, const std::string& host, const std::string& port, ConnectionCodec::Factory* fact, ConnectFailedCallback failed) @@ -204,14 +240,14 @@ void AsynchIOProtocolFactory::connect( // upon connection failure or by the AsynchIO upon connection // shutdown. The allocated AsynchConnector frees itself when it // is no longer needed. - Socket* socket = new Socket(); + Socket* socket = createSocket(); try { AsynchConnector* c = AsynchConnector::create( *socket, host, port, - boost::bind(&AsynchIOProtocolFactory::established, - this, poller, _1, fact, true), + boost::bind(&AsynchIOProtocolFactory::establishedOutgoing, + this, poller, _1, fact, name), boost::bind(&AsynchIOProtocolFactory::connectFailed, this, _1, _2, _3, failed)); c->start(poller); |