diff options
| author | Stephen D. Huston <shuston@apache.org> | 2011-10-20 18:42:46 +0000 |
|---|---|---|
| committer | Stephen D. Huston <shuston@apache.org> | 2011-10-20 18:42:46 +0000 |
| commit | 5eb354b338bb8d8fcd35b6ac3fb33f8103e757c3 (patch) | |
| tree | f24776684c025fbed6a0431bf3d6811f0a1aae7a /cpp/src/qpid/sys/TCPIOPlugin.cpp | |
| parent | 718ff5b34dd1e87eb79fa4c61fec668d1dc33103 (diff) | |
| download | qpid-python-5eb354b338bb8d8fcd35b6ac3fb33f8103e757c3.tar.gz | |
Merge trunk to QPID-2519 branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1186990 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/TCPIOPlugin.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/TCPIOPlugin.cpp | 80 |
1 files changed, 55 insertions, 25 deletions
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp index a6528f9ad9..85d8c1db87 100644 --- a/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -25,31 +25,31 @@ #include "qpid/Plugin.h" #include "qpid/sys/Socket.h" +#include "qpid/sys/SocketAddress.h" #include "qpid/sys/Poller.h" #include "qpid/broker/Broker.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> -#include <memory> +#include <boost/ptr_container/ptr_vector.hpp> namespace qpid { namespace sys { class AsynchIOProtocolFactory : public ProtocolFactory { const bool tcpNoDelay; - Socket listener; - const uint16_t listeningPort; - std::auto_ptr<AsynchAcceptor> acceptor; + boost::ptr_vector<Socket> listeners; + boost::ptr_vector<AsynchAcceptor> acceptors; + uint16_t listeningPort; public: - AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay); + AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); - void connect(Poller::shared_ptr, const std::string& host, int16_t port, + void connect(Poller::shared_ptr, const std::string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback); uint16_t getPort() const; - std::string getHost() const; private: void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, @@ -61,23 +61,49 @@ class AsynchIOProtocolFactory : public ProtocolFactory { static class TCPIOPlugin : public Plugin { void earlyInitialize(Target&) { } - + void initialize(Target& target) { broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); // Only provide to a Broker if (broker) { const broker::Broker::Options& opts = broker->getOptions(); - ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog, - opts.tcpNoDelay)); - QPID_LOG(notice, "Listening on TCP port " << protocol->getPort()); - broker->registerProtocolFactory("tcp", protocol); + ProtocolFactory::shared_ptr protocolt( + new AsynchIOProtocolFactory( + "", boost::lexical_cast<std::string>(opts.port), + opts.connectionBacklog, + opts.tcpNoDelay)); + QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort()); + broker->registerProtocolFactory("tcp", protocolt); } } } tcpPlugin; -AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay) : - tcpNoDelay(nodelay), listeningPort(listener.listen(port, backlog)) -{} +AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay) : + tcpNoDelay(nodelay) +{ + SocketAddress sa(host, port); + + // We must have at least one resolved address + QPID_LOG(info, "Listening to: " << sa.asString()) + Socket* s = new Socket; + uint16_t lport = s->listen(sa, backlog); + QPID_LOG(debug, "Listened to: " << lport); + listeners.push_back(s); + + listeningPort = lport; + + // Try any other resolved addresses + while (sa.nextAddress()) { + // Hack to ensure that all listening connections are on the same port + sa.setAddrInfoPort(listeningPort); + QPID_LOG(info, "Listening to: " << sa.asString()) + Socket* s = new Socket; + uint16_t lport = s->listen(sa, backlog); + QPID_LOG(debug, "Listened to: " << lport); + listeners.push_back(s); + } + +} void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f, bool isClient) { @@ -107,16 +133,14 @@ uint16_t AsynchIOProtocolFactory::getPort() const { return listeningPort; // Immutable no need for lock. } -std::string AsynchIOProtocolFactory::getHost() const { - return listener.getSockname(); -} - void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { - acceptor.reset( - AsynchAcceptor::create(listener, - boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false))); - acceptor->start(poller); + for (unsigned i = 0; i<listeners.size(); ++i) { + acceptors.push_back( + AsynchAcceptor::create(listeners[i], + boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false))); + acceptors[i].start(poller); + } } void AsynchIOProtocolFactory::connectFailed( @@ -130,7 +154,7 @@ void AsynchIOProtocolFactory::connectFailed( void AsynchIOProtocolFactory::connect( Poller::shared_ptr poller, - const std::string& host, int16_t port, + const std::string& host, const std::string& port, ConnectionCodec::Factory* fact, ConnectFailedCallback failed) { @@ -139,8 +163,8 @@ void AsynchIOProtocolFactory::connect( // upon connection failure or by the AsynchIO upon connection // shutdown. The allocated AsynchConnector frees itself when it // is no longer needed. - Socket* socket = new Socket(); + try { AsynchConnector* c = AsynchConnector::create( *socket, host, @@ -150,6 +174,12 @@ void AsynchIOProtocolFactory::connect( boost::bind(&AsynchIOProtocolFactory::connectFailed, this, _1, _2, _3, failed)); c->start(poller); + } catch (std::exception&) { + // TODO: Design question - should we do the error callback and also throw? + int errCode = socket->getError(); + connectFailed(*socket, errCode, strError(errCode), failed); + throw; + } } }} // namespace qpid::sys |
