diff options
author | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
commit | f83677056891e436bf5ba99e79240df2a44528cd (patch) | |
tree | 625bfd644b948e89105630759cf6decb0435354d /cpp/src/qpid/sys/posix | |
parent | ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (diff) | |
download | qpid-python-f83677056891e436bf5ba99e79240df2a44528cd.tar.gz |
Merged out from trunkQPID-2519
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/posix')
-rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 36 | ||||
-rwxr-xr-x | cpp/src/qpid/sys/posix/LockFile.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/Socket.cpp | 171 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/SocketAddress.cpp | 80 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/Thread.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/Time.cpp | 7 |
6 files changed, 180 insertions, 120 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index 119a6aa8a4..dab8bd09c6 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -149,11 +149,12 @@ private: ConnectedCallback connCallback; FailedCallback failCallback; const Socket& socket; + SocketAddress sa; public: AsynchConnector(const Socket& socket, - std::string hostname, - uint16_t port, + const std::string& hostname, + const std::string& port, ConnectedCallback connCb, FailedCallback failCb); void start(Poller::shared_ptr poller); @@ -161,8 +162,8 @@ public: }; AsynchConnector::AsynchConnector(const Socket& s, - std::string hostname, - uint16_t port, + const std::string& hostname, + const std::string& port, ConnectedCallback connCb, FailedCallback failCb) : DispatchHandle(s, @@ -171,11 +172,13 @@ AsynchConnector::AsynchConnector(const Socket& s, boost::bind(&AsynchConnector::connComplete, this, _1)), connCallback(connCb), failCallback(failCb), - socket(s) + socket(s), + sa(hostname, port) { socket.setNonblocking(); - SocketAddress sa(hostname, boost::lexical_cast<std::string>(port)); + // Note, not catching any exceptions here, also has effect of destructing + QPID_LOG(info, "Connecting: " << sa.asString()); socket.connect(sa); } @@ -191,11 +194,26 @@ void AsynchConnector::stop() void AsynchConnector::connComplete(DispatchHandle& h) { - h.stopWatch(); int errCode = socket.getError(); if (errCode == 0) { + h.stopWatch(); connCallback(socket); } else { + // Retry while we cause an immediate exception + // (asynch failure will be handled by re-entering here at the top) + while (sa.nextAddress()) { + try { + // Try next address without deleting ourselves + QPID_LOG(debug, "Ignored socket connect error: " << strError(errCode)); + QPID_LOG(info, "Retrying connect: " << sa.asString()); + socket.connect(sa); + return; + } catch (const std::exception& e) { + QPID_LOG(debug, "Ignored socket connect exception: " << e.what()); + } + errCode = socket.getError(); + } + h.stopWatch(); failCallback(socket, errCode, strError(errCode)); } DispatchHandle::doDelete(); @@ -589,8 +607,8 @@ AsynchAcceptor* AsynchAcceptor::create(const Socket& s, } AsynchConnector* AsynchConnector::create(const Socket& s, - std::string hostname, - uint16_t port, + const std::string& hostname, + const std::string& port, ConnectedCallback connCb, FailedCallback failCb) { diff --git a/cpp/src/qpid/sys/posix/LockFile.cpp b/cpp/src/qpid/sys/posix/LockFile.cpp index 1862ff6ac9..f5a6c292cb 100755 --- a/cpp/src/qpid/sys/posix/LockFile.cpp +++ b/cpp/src/qpid/sys/posix/LockFile.cpp @@ -58,8 +58,7 @@ LockFile::~LockFile() { if (impl) { int f = impl->fd; if (f >= 0) { - int unused_ret; - unused_ret = ::lockf(f, F_ULOCK, 0); // Suppress warnings about ignoring return value. + (void) ::lockf(f, F_ULOCK, 0); // Suppress warnings about ignoring return value. ::close(f); impl->fd = -1; } diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp index 7b906f33e8..4a6dc66f80 100644 --- a/cpp/src/qpid/sys/posix/Socket.cpp +++ b/cpp/src/qpid/sys/posix/Socket.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,65 +34,35 @@ #include <netdb.h> #include <cstdlib> #include <string.h> -#include <iostream> - -#include <boost/format.hpp> -#include <boost/lexical_cast.hpp> namespace qpid { namespace sys { namespace { -std::string getName(int fd, bool local, bool includeService = false) +std::string getName(int fd, bool local) { - ::sockaddr_storage name; // big enough for any socket address - ::socklen_t namelen = sizeof(name); - - int result = -1; + ::sockaddr_storage name_s; // big enough for any socket address + ::sockaddr* name = (::sockaddr*)&name_s; + ::socklen_t namelen = sizeof(name_s); + if (local) { - result = ::getsockname(fd, (::sockaddr*)&name, &namelen); + QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) ); } else { - result = ::getpeername(fd, (::sockaddr*)&name, &namelen); + QPID_POSIX_CHECK( ::getpeername(fd, name, &namelen) ); } - QPID_POSIX_CHECK(result); - - char servName[NI_MAXSERV]; - char dispName[NI_MAXHOST]; - if (includeService) { - if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), - servName, sizeof(servName), - NI_NUMERICHOST | NI_NUMERICSERV) != 0) - throw QPID_POSIX_ERROR(rc); - return std::string(dispName) + ":" + std::string(servName); - - } else { - if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), 0, 0, NI_NUMERICHOST) != 0) - throw QPID_POSIX_ERROR(rc); - return dispName; - } + return SocketAddress::asString(name, namelen); } -std::string getService(int fd, bool local) +uint16_t getLocalPort(int fd) { - ::sockaddr_storage name; // big enough for any socket address - ::socklen_t namelen = sizeof(name); - - int result = -1; - if (local) { - result = ::getsockname(fd, (::sockaddr*)&name, &namelen); - } else { - result = ::getpeername(fd, (::sockaddr*)&name, &namelen); - } + ::sockaddr_storage name_s; // big enough for any socket address + ::sockaddr* name = (::sockaddr*)&name_s; + ::socklen_t namelen = sizeof(name_s); - QPID_POSIX_CHECK(result); + QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) ); - char servName[NI_MAXSERV]; - if (int rc=::getnameinfo((::sockaddr*)&name, namelen, 0, 0, - servName, sizeof(servName), - NI_NUMERICHOST | NI_NUMERICSERV) != 0) - throw QPID_POSIX_ERROR(rc); - return servName; + return SocketAddress::getPort(name); } } @@ -119,6 +89,11 @@ void Socket::createSocket(const SocketAddress& sa) const try { if (nonblocking) setNonblocking(); if (nodelay) setTcpNoDelay(); + if (getAddrInfo(sa).ai_family == AF_INET6) { + int flag = 1; + int result = ::setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&flag, sizeof(flag)); + QPID_POSIX_CHECK(result); + } } catch (std::exception&) { ::close(s); socket = -1; @@ -126,13 +101,18 @@ void Socket::createSocket(const SocketAddress& sa) const } } -void Socket::setTimeout(const Duration& interval) const -{ - const int& socket = impl->fd; - struct timeval tv; - toTimeval(tv, interval); - setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); - setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); +Socket* Socket::createSameTypeSocket() const { + int& socket = impl->fd; + // Socket currently has no actual socket attached + if (socket == -1) + return new Socket; + + ::sockaddr_storage sa; + ::socklen_t salen = sizeof(sa); + QPID_POSIX_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen)); + int s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM + if (s < 0) throw QPID_POSIX_ERROR(errno); + return new Socket(new IOHandlePrivate(s)); } void Socket::setNonblocking() const { @@ -149,20 +129,27 @@ void Socket::setTcpNoDelay() const nodelay = true; if (socket != -1) { int flag = 1; - int result = setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)); + int result = ::setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)); QPID_POSIX_CHECK(result); } } -void Socket::connect(const std::string& host, uint16_t port) const +void Socket::connect(const std::string& host, const std::string& port) const { - SocketAddress sa(host, boost::lexical_cast<std::string>(port)); + SocketAddress sa(host, port); connect(sa); } void Socket::connect(const SocketAddress& addr) const { - connectname = addr.asString(); + // The display name for an outbound connection needs to be the name that was specified + // for the address rather than a resolved IP address as we don't know which of + // the IP addresses is actually the one that will be connected to. + peername = addr.asString(false); + + // However the string we compare with the local port must be numeric or it might not + // match when it should as getLocalAddress() will always be numeric + std::string connectname = addr.asString(); createSocket(addr); @@ -170,7 +157,24 @@ void Socket::connect(const SocketAddress& addr) const // TODO the correct thing to do here is loop on failure until you've used all the returned addresses if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) && (errno != EINPROGRESS)) { - throw Exception(QPID_MSG(strError(errno) << ": " << connectname)); + throw Exception(QPID_MSG(strError(errno) << ": " << peername)); + } + // When connecting to a port on the same host which no longer has + // a process associated with it, the OS occasionally chooses the + // remote port (which is unoccupied) as the port to bind the local + // end of the socket, resulting in a "circular" connection. + // + // This seems like something the OS should prevent but I have + // confirmed that sporadic hangs in + // cluster_tests.LongTests.test_failover on RHEL5 are caused by + // such a circular connection. + // + // Raise an error if we see such a connection, since we know there is + // no listener on the peer address. + // + if (getLocalAddress() == connectname) { + close(); + throw Exception(QPID_MSG("Connection refused: " << peername)); } } @@ -183,9 +187,9 @@ Socket::close() const socket = -1; } -int Socket::listen(uint16_t port, int backlog) const +int Socket::listen(const std::string& host, const std::string& port, int backlog) const { - SocketAddress sa("", boost::lexical_cast<std::string>(port)); + SocketAddress sa(host, port); return listen(sa, backlog); } @@ -195,26 +199,24 @@ int Socket::listen(const SocketAddress& sa, int backlog) const const int& socket = impl->fd; int yes=1; - QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))); + QPID_POSIX_CHECK(::setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))); if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0) throw Exception(QPID_MSG("Can't bind to port " << sa.asString() << ": " << strError(errno))); if (::listen(socket, backlog) < 0) throw Exception(QPID_MSG("Can't listen on port " << sa.asString() << ": " << strError(errno))); - struct sockaddr_in name; - socklen_t namelen = sizeof(name); - if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0) - throw QPID_POSIX_ERROR(errno); - - return ntohs(name.sin_port); + return getLocalPort(socket); } Socket* Socket::accept() const { int afd = ::accept(impl->fd, 0, 0); - if ( afd >= 0) - return new Socket(new IOHandlePrivate(afd)); + if ( afd >= 0) { + Socket* s = new Socket(new IOHandlePrivate(afd)); + s->localname = localname; + return s; + } else if (errno == EAGAIN) return 0; else throw QPID_POSIX_ERROR(errno); @@ -230,37 +232,20 @@ int Socket::write(const void *buf, size_t count) const return ::write(impl->fd, buf, count); } -std::string Socket::getSockname() const -{ - return getName(impl->fd, true); -} - -std::string Socket::getPeername() const -{ - return getName(impl->fd, false); -} - std::string Socket::getPeerAddress() const { - if (connectname.empty()) { - connectname = getName(impl->fd, false, true); + if (peername.empty()) { + peername = getName(impl->fd, false); } - return connectname; + return peername; } std::string Socket::getLocalAddress() const { - return getName(impl->fd, true, true); -} - -uint16_t Socket::getLocalPort() const -{ - return std::atoi(getService(impl->fd, true).c_str()); -} - -uint16_t Socket::getRemotePort() const -{ - return std::atoi(getService(impl->fd, true).c_str()); + if (localname.empty()) { + localname = getName(impl->fd, true); + } + return localname; } int Socket::getError() const diff --git a/cpp/src/qpid/sys/posix/SocketAddress.cpp b/cpp/src/qpid/sys/posix/SocketAddress.cpp index 8f5f29d793..077942ef2f 100644 --- a/cpp/src/qpid/sys/posix/SocketAddress.cpp +++ b/cpp/src/qpid/sys/posix/SocketAddress.cpp @@ -21,11 +21,13 @@ #include "qpid/sys/SocketAddress.h" -#include "qpid/sys/posix/check.h" +#include "qpid/Exception.h" +#include "qpid/Msg.h" #include <sys/socket.h> -#include <string.h> +#include <netinet/in.h> #include <netdb.h> +#include <string.h> namespace qpid { namespace sys { @@ -46,15 +48,9 @@ SocketAddress::SocketAddress(const SocketAddress& sa) : SocketAddress& SocketAddress::operator=(const SocketAddress& sa) { - if (&sa != this) { - host = sa.host; - port = sa.port; + SocketAddress temp(sa); - if (addrInfo) { - ::freeaddrinfo(addrInfo); - addrInfo = 0; - } - } + std::swap(temp, *this); return *this; } @@ -65,9 +61,61 @@ SocketAddress::~SocketAddress() } } -std::string SocketAddress::asString() const +std::string SocketAddress::asString(::sockaddr const * const addr, size_t addrlen) +{ + char servName[NI_MAXSERV]; + char dispName[NI_MAXHOST]; + if (int rc=::getnameinfo(addr, addrlen, + dispName, sizeof(dispName), + servName, sizeof(servName), + NI_NUMERICHOST | NI_NUMERICSERV) != 0) + throw qpid::Exception(QPID_MSG(gai_strerror(rc))); + std::string s; + switch (addr->sa_family) { + case AF_INET: s += dispName; break; + case AF_INET6: s += "["; s += dispName; s+= "]"; break; + default: throw Exception(QPID_MSG("Unexpected socket type")); + } + s += ":"; + s += servName; + return s; +} + +uint16_t SocketAddress::getPort(::sockaddr const * const addr) { - return host + ":" + port; + switch (addr->sa_family) { + case AF_INET: return ntohs(((::sockaddr_in*)addr)->sin_port); + case AF_INET6: return ntohs(((::sockaddr_in6*)addr)->sin6_port); + default:throw Exception(QPID_MSG("Unexpected socket type")); + } +} + +std::string SocketAddress::asString(bool numeric) const +{ + if (!numeric) + return host + ":" + port; + // Canonicalise into numeric id + const ::addrinfo& ai = getAddrInfo(*this); + + return asString(ai.ai_addr, ai.ai_addrlen); +} + +bool SocketAddress::nextAddress() { + bool r = currentAddrInfo->ai_next != 0; + if (r) + currentAddrInfo = currentAddrInfo->ai_next; + return r; +} + +void SocketAddress::setAddrInfoPort(uint16_t port) { + if (!currentAddrInfo) return; + + ::addrinfo& ai = *currentAddrInfo; + switch (ai.ai_family) { + case AF_INET: ((::sockaddr_in*)ai.ai_addr)->sin_port = htons(port); return; + case AF_INET6:((::sockaddr_in6*)ai.ai_addr)->sin6_port = htons(port); return; + default: throw Exception(QPID_MSG("Unexpected socket type")); + } } const ::addrinfo& getAddrInfo(const SocketAddress& sa) @@ -75,7 +123,8 @@ const ::addrinfo& getAddrInfo(const SocketAddress& sa) if (!sa.addrInfo) { ::addrinfo hints; ::memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_INET; // Change this to support IPv6 + hints.ai_flags = AI_ADDRCONFIG; // Only use protocols that we have configured interfaces for + hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6 hints.ai_socktype = SOCK_STREAM; const char* node = 0; @@ -88,10 +137,11 @@ const ::addrinfo& getAddrInfo(const SocketAddress& sa) int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo); if (n != 0) - throw Exception(QPID_MSG("Cannot resolve " << sa.host << ": " << ::gai_strerror(n))); + throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n))); + sa.currentAddrInfo = sa.addrInfo; } - return *sa.addrInfo; + return *sa.currentAddrInfo; } }} diff --git a/cpp/src/qpid/sys/posix/Thread.cpp b/cpp/src/qpid/sys/posix/Thread.cpp index b466733260..a1d6396763 100644 --- a/cpp/src/qpid/sys/posix/Thread.cpp +++ b/cpp/src/qpid/sys/posix/Thread.cpp @@ -37,7 +37,8 @@ void* runRunnable(void* p) } } -struct ThreadPrivate { +class ThreadPrivate { +public: pthread_t thread; ThreadPrivate(Runnable* runnable) { diff --git a/cpp/src/qpid/sys/posix/Time.cpp b/cpp/src/qpid/sys/posix/Time.cpp index b3858279b4..9661f0c5e8 100644 --- a/cpp/src/qpid/sys/posix/Time.cpp +++ b/cpp/src/qpid/sys/posix/Time.cpp @@ -27,6 +27,7 @@ #include <stdio.h> #include <sys/time.h> #include <unistd.h> +#include <iomanip> namespace { int64_t max_abstime() { return std::numeric_limits<int64_t>::max(); } @@ -103,6 +104,12 @@ void outputFormattedNow(std::ostream& o) { o << " "; } +void outputHiresNow(std::ostream& o) { + ::timespec time; + ::clock_gettime(CLOCK_REALTIME, &time); + o << time.tv_sec << "." << std::setw(9) << std::setfill('0') << time.tv_nsec << "s "; +} + void sleep(int secs) { ::sleep(secs); } |