From bd46aa4383ca0a04563c80dc527cf005a2a335ad Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Fri, 12 Aug 2011 22:32:05 +0000 Subject: QPID-3405: IPv6 support for Unix C++ ports: - On the Listen side we create separate listening sockets for IPv4 and IPv6 making sure to not allow the IPv6 socket to run dual stack. This makes the reported IPv4 addresses look "normal" and would allow us to turn control IPv4/IPv6 listening separately. - On the connect side we make sure to try all the addresses returned by getaddrinfo() in order until we either find one that connects or have tried all of them. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1157272 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/sys/SocketAddress.h | 5 ++- qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp | 48 +++++++++++++++++++------ qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp | 24 +++++++++++-- qpid/cpp/src/qpid/sys/posix/Socket.cpp | 30 +++++++++++----- qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp | 28 ++++++++++++--- qpid/cpp/src/qpid/sys/windows/Socket.cpp | 21 ++++++----- qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp | 7 ++++ qpid/cpp/src/tests/cluster_test_logs.py | 1 + 8 files changed, 127 insertions(+), 37 deletions(-) mode change 100755 => 100644 qpid/cpp/src/qpid/sys/windows/Socket.cpp (limited to 'qpid/cpp') diff --git a/qpid/cpp/src/qpid/sys/SocketAddress.h b/qpid/cpp/src/qpid/sys/SocketAddress.h index c2120338cf..481beab747 100644 --- a/qpid/cpp/src/qpid/sys/SocketAddress.h +++ b/qpid/cpp/src/qpid/sys/SocketAddress.h @@ -41,12 +41,15 @@ public: QPID_COMMON_EXTERN SocketAddress& operator=(const SocketAddress&); QPID_COMMON_EXTERN ~SocketAddress(); - std::string asString(bool numeric=true) const; + QPID_COMMON_EXTERN bool nextAddress(); + QPID_COMMON_EXTERN std::string asString(bool numeric=true) const; + QPID_COMMON_EXTERN void setAddrInfoPort(uint16_t port); private: std::string host; std::string port; mutable ::addrinfo* addrInfo; + mutable ::addrinfo* currentAddrInfo; }; }} diff --git a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp index 34338ce434..c5cc86c813 100644 --- a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -25,21 +25,22 @@ #include "qpid/Plugin.h" #include "qpid/sys/Socket.h" +#include "qpid/sys/SocketAddress.h" #include "qpid/sys/Poller.h" #include "qpid/broker/Broker.h" #include "qpid/log/Statement.h" #include -#include +#include namespace qpid { namespace sys { class AsynchIOProtocolFactory : public ProtocolFactory { const bool tcpNoDelay; - Socket listener; - const uint16_t listeningPort; - std::auto_ptr acceptor; + boost::ptr_vector listeners; + boost::ptr_vector acceptors; + uint16_t listeningPort; public: AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay); @@ -71,15 +72,38 @@ static class TCPIOPlugin : public Plugin { "", boost::lexical_cast(opts.port), opts.connectionBacklog, opts.tcpNoDelay)); - QPID_LOG(notice, "Listening on TCP port " << protocolt->getPort()); + QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort()); broker->registerProtocolFactory("tcp", protocolt); } } } tcpPlugin; AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay) : - tcpNoDelay(nodelay), listeningPort(listener.listen(host, port, backlog)) -{} + tcpNoDelay(nodelay) +{ + SocketAddress sa(host, port); + + // We must have at least one resolved address + QPID_LOG(info, "Listening to: " << sa.asString()) + Socket* s = new Socket; + uint16_t lport = s->listen(sa, backlog); + QPID_LOG(debug, "Listened to: " << lport); + listeners.push_back(s); + + listeningPort = lport; + + // Try any other resolved addresses + while (sa.nextAddress()) { + // Hack to ensure that all listening connections are on the same port + sa.setAddrInfoPort(listeningPort); + QPID_LOG(info, "Listening to: " << sa.asString()) + Socket* s = new Socket; + uint16_t lport = s->listen(sa, backlog); + QPID_LOG(debug, "Listened to: " << lport); + listeners.push_back(s); + } + +} void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f, bool isClient) { @@ -111,10 +135,12 @@ uint16_t AsynchIOProtocolFactory::getPort() const { void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { - acceptor.reset( - AsynchAcceptor::create(listener, - boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false))); - acceptor->start(poller); + for (unsigned i = 0; ifd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)); + int result = ::setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)); QPID_POSIX_CHECK(result); } } @@ -179,19 +198,14 @@ int Socket::listen(const SocketAddress& sa, int backlog) const const int& socket = impl->fd; int yes=1; - QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))); + QPID_POSIX_CHECK(::setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))); if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0) throw Exception(QPID_MSG("Can't bind to port " << sa.asString() << ": " << strError(errno))); if (::listen(socket, backlog) < 0) throw Exception(QPID_MSG("Can't listen on port " << sa.asString() << ": " << strError(errno))); - struct sockaddr_in name; - socklen_t namelen = sizeof(name); - if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0) - throw QPID_POSIX_ERROR(errno); - - return ntohs(name.sin_port); + return getLocalPort(socket); } Socket* Socket::accept() const diff --git a/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp b/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp index 10f1c8a563..67438c0d89 100644 --- a/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp +++ b/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp @@ -27,8 +27,6 @@ #include #include -#include - namespace qpid { namespace sys { @@ -73,19 +71,38 @@ std::string SocketAddress::asString(bool numeric) const dispName, sizeof(dispName), servName, sizeof(servName), NI_NUMERICHOST | NI_NUMERICSERV) != 0) - throw QPID_POSIX_ERROR(rc); + throw qpid::Exception(QPID_MSG(gai_strerror(rc))); std::string s(dispName); s += ":"; s += servName; return s; } +bool SocketAddress::nextAddress() { + bool r = currentAddrInfo->ai_next != 0; + if (r) + currentAddrInfo = currentAddrInfo->ai_next; + return r; +} + +void SocketAddress::setAddrInfoPort(uint16_t port) { + if (!currentAddrInfo) return; + + ::addrinfo& ai = *currentAddrInfo; + switch (ai.ai_family) { + case AF_INET: ((::sockaddr_in*)ai.ai_addr)->sin_port = htons(port); return; + case AF_INET6:((::sockaddr_in6*)ai.ai_addr)->sin6_port = htons(port); return; + default: throw Exception(QPID_MSG("Unexpected socket type")); + } +} + const ::addrinfo& getAddrInfo(const SocketAddress& sa) { if (!sa.addrInfo) { ::addrinfo hints; ::memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_INET; // Change this to support IPv6 + hints.ai_flags = AI_ADDRCONFIG; // Only use protocols that we have configured interfaces for + hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6 hints.ai_socktype = SOCK_STREAM; const char* node = 0; @@ -99,9 +116,10 @@ const ::addrinfo& getAddrInfo(const SocketAddress& sa) int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo); if (n != 0) throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n))); + sa.currentAddrInfo = sa.addrInfo; } - return *sa.addrInfo; + return *sa.currentAddrInfo; } }} diff --git a/qpid/cpp/src/qpid/sys/windows/Socket.cpp b/qpid/cpp/src/qpid/sys/windows/Socket.cpp old mode 100755 new mode 100644 index baa80f04e0..1ba3831b9f --- a/qpid/cpp/src/qpid/sys/windows/Socket.cpp +++ b/qpid/cpp/src/qpid/sys/windows/Socket.cpp @@ -211,21 +211,24 @@ int Socket::read(void *buf, size_t count) const return received; } -int Socket::listen(const std::string&, const std::string& port, int backlog) const +int Socket::listen(const std::string& host, const std::string& port, int backlog) const +{ + SocketAddress sa(host, port); + return listen(sa, backlog); +} + +int Socket::listen(const SocketAddress& addr, int backlog) const { const SOCKET& socket = impl->fd; BOOL yes=1; QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, sizeof(yes))); - struct sockaddr_in name; - memset(&name, 0, sizeof(name)); - name.sin_family = AF_INET; - name.sin_port = htons(boost::lexical_cast(port)); - name.sin_addr.s_addr = 0; - if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) == SOCKET_ERROR) - throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(WSAGetLastError()))); + + if (::bind(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) == SOCKET_ERROR) + throw Exception(QPID_MSG("Can't bind to " << addr.asString() << ": " << strError(WSAGetLastError()))); if (::listen(socket, backlog) == SOCKET_ERROR) - throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(WSAGetLastError()))); + throw Exception(QPID_MSG("Can't listen on " <ai_next != 0; + if (r) + currentAddrInfo = currentAddrInfo->ai_next; + return r; +} + const ::addrinfo& getAddrInfo(const SocketAddress& sa) { return *sa.addrInfo; diff --git a/qpid/cpp/src/tests/cluster_test_logs.py b/qpid/cpp/src/tests/cluster_test_logs.py index a0ce8fb9c3..3c7e8e8020 100755 --- a/qpid/cpp/src/tests/cluster_test_logs.py +++ b/qpid/cpp/src/tests/cluster_test_logs.py @@ -53,6 +53,7 @@ def filter_log(log): 'stall for update|unstall, ignore update|cancelled offer .* unstall', 'caught up', 'active for links|Passivating links|Activating links', + 'info Connecting: .*', # UpdateClient connection 'info Connection.* connected to', # UpdateClient connection 'warning Connection \\[[-0-9.: ]+\\] closed', # UpdateClient connection 'warning Broker closed connection: 200, OK', -- cgit v1.2.1