diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
| commit | d43d1912b376322e27fdcda551a73f9ff5487972 (patch) | |
| tree | ce493e10baa95f44be8beb5778ce51783463196d /cpp/src/qpid/sys | |
| parent | 04877fec0c6346edec67072d7f2d247740cf2af5 (diff) | |
| download | qpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz | |
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIOHandler.cpp | 43 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIOHandler.h | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/MemStat.cpp (renamed from cpp/src/qpid/sys/windows/MemStat.cpp) | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/SslPlugin.cpp | 28 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/TCPIOPlugin.cpp | 21 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/Timer.cpp | 55 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/Timer.h | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp | 1 | ||||
| -rwxr-xr-x | cpp/src/qpid/sys/posix/LockFile.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/MemStat.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/SocketAddress.cpp | 6 | ||||
| -rwxr-xr-x | cpp/src/qpid/sys/posix/SystemInfo.cpp | 108 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/ssl/SslHandler.cpp | 41 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/ssl/SslHandler.h | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/ssl/SslIo.cpp | 12 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/ssl/SslIo.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/unordered_map.h | 2 | ||||
| -rwxr-xr-x | cpp/src/qpid/sys/windows/IocpPoller.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/windows/Socket.cpp | 17 | ||||
| -rwxr-xr-x | cpp/src/qpid/sys/windows/SystemInfo.cpp | 10 |
20 files changed, 310 insertions, 76 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp index 5233002850..8a485db72d 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -23,6 +23,7 @@ #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Socket.h" #include "qpid/sys/SecuritySettings.h" +#include "qpid/sys/Timer.h" #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/log/Statement.h" @@ -41,11 +42,30 @@ struct Buff : public AsynchIO::BufferBase { { delete [] bytes;} }; -AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) : +struct ProtocolTimeoutTask : public sys::TimerTask { + AsynchIOHandler& handler; + std::string id; + + ProtocolTimeoutTask(const std::string& i, const Duration& timeout, AsynchIOHandler& h) : + TimerTask(timeout, "ProtocolTimeout"), + handler(h), + id(i) + {} + + void fire() { + // If this fires it means that we didn't negotiate the connection in the timeout period + // Schedule closing the connection for the io thread + QPID_LOG(error, "Connection " << id << " No protocol received closing"); + handler.abort(); + } +}; + +AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f) : identifier(id), aio(0), factory(f), codec(0), + reads(0), readError(false), isClient(false), readCredit(InfiniteCredit) @@ -54,12 +74,18 @@ AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) : AsynchIOHandler::~AsynchIOHandler() { if (codec) codec->closed(); + if (timeoutTimerTask) + timeoutTimerTask->cancel(); delete codec; } -void AsynchIOHandler::init(AsynchIO* a, int numBuffs) { +void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime, int numBuffs) { aio = a; + // Start timer for this connection + timeoutTimerTask = new ProtocolTimeoutTask(identifier, maxTime*TIME_MSEC, *this); + timer.add(timeoutTimerTask); + // Give connection some buffers to use for (int i = 0; i < numBuffs; i++) { aio->queueReadBuffer(new Buff); @@ -129,10 +155,18 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { } } + ++reads; size_t decoded = 0; if (codec) { // Already initiated try { decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount); + // When we've decoded 3 reads (probably frames) we will have authenticated and + // started heartbeats, if specified, in many (but not all) cases so now we will cancel + // the idle connection timeout - this is really hacky, and would be better implemented + // in the connection, but that isn't actually created until the first decode. + if (reads == 3) { + timeoutTimerTask->cancel(); + } }catch(const std::exception& e){ QPID_LOG(error, e.what()); readError = true; @@ -143,6 +177,7 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { framing::ProtocolInitiation protocolInit; if (protocolInit.decode(in)) { decoded = in.getPosition(); + QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")"); try { codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings()); @@ -202,6 +237,10 @@ void AsynchIOHandler::idle(AsynchIO&){ if (isClient && codec == 0) { codec = factory->create(*this, identifier, SecuritySettings()); write(framing::ProtocolInitiation(codec->getVersion())); + // We've just sent the protocol negotiation so we can cancel the timeout for that + // This is not ideal, because we've not received anything yet, but heartbeats will + // be active soon + timeoutTimerTask->cancel(); return; } if (codec == 0) return; diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h index b9867606c4..307aad5b85 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.h +++ b/cpp/src/qpid/sys/AsynchIOHandler.h @@ -27,6 +27,8 @@ #include "qpid/sys/Mutex.h" #include "qpid/CommonImportExport.h" +#include <boost/intrusive_ptr.hpp> + namespace qpid { namespace framing { @@ -38,24 +40,28 @@ namespace sys { class AsynchIO; struct AsynchIOBufferBase; class Socket; +class Timer; +class TimerTask; class AsynchIOHandler : public OutputControl { std::string identifier; AsynchIO* aio; ConnectionCodec::Factory* factory; ConnectionCodec* codec; + uint32_t reads; bool readError; bool isClient; AtomicValue<int32_t> readCredit; static const int32_t InfiniteCredit = -1; Mutex creditLock; + boost::intrusive_ptr<sys::TimerTask> timeoutTimerTask; void write(const framing::ProtocolInitiation&); public: - QPID_COMMON_EXTERN AsynchIOHandler(std::string id, ConnectionCodec::Factory* f); + QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f ); QPID_COMMON_EXTERN ~AsynchIOHandler(); - QPID_COMMON_EXTERN void init(AsynchIO* a, int numBuffs); + QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime, int numBuffs); QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; } diff --git a/cpp/src/qpid/sys/windows/MemStat.cpp b/cpp/src/qpid/sys/MemStat.cpp index b1d25c5fc5..c71fba785c 100644 --- a/cpp/src/qpid/sys/windows/MemStat.cpp +++ b/cpp/src/qpid/sys/MemStat.cpp @@ -21,9 +21,11 @@ #include "qpid/sys/MemStat.h" +// Null memory stats provider: +// This is for platforms that do not have a way to get allocated +// memory status void qpid::sys::MemStat::loadMemInfo(qmf::org::apache::qpid::broker::Memory*) { - // TODO: Add Windows-specific memory stats to the object and load them here. } diff --git a/cpp/src/qpid/sys/SslPlugin.cpp b/cpp/src/qpid/sys/SslPlugin.cpp index 48baef9042..3b50527c0a 100644 --- a/cpp/src/qpid/sys/SslPlugin.cpp +++ b/cpp/src/qpid/sys/SslPlugin.cpp @@ -39,6 +39,8 @@ namespace qpid { namespace sys { +class Timer; + using namespace qpid::sys::ssl; struct SslServerOptions : ssl::SslOptions @@ -68,6 +70,8 @@ class SslProtocolFactoryTmpl : public ProtocolFactory { typedef SslAcceptorTmpl<T> SslAcceptor; + Timer& brokerTimer; + uint32_t maxNegotiateTime; const bool tcpNoDelay; T listener; const uint16_t listeningPort; @@ -75,7 +79,7 @@ class SslProtocolFactoryTmpl : public ProtocolFactory { bool nodict; public: - SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay); + SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay, Timer& timer, uint32_t maxTime); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); void connect(Poller::shared_ptr, const std::string& host, const std::string& port, ConnectionCodec::Factory*, @@ -132,16 +136,18 @@ static struct SslPlugin : public Plugin { try { ssl::initNSS(options, true); nssInitialized = true; - + const broker::Broker::Options& opts = broker->getOptions(); ProtocolFactory::shared_ptr protocol(options.multiplex ? static_cast<ProtocolFactory*>(new SslMuxProtocolFactory(options, opts.connectionBacklog, - opts.tcpNoDelay)) : + opts.tcpNoDelay, + broker->getTimer(), opts.maxNegotiateTime)) : static_cast<ProtocolFactory*>(new SslProtocolFactory(options, opts.connectionBacklog, - opts.tcpNoDelay))); + opts.tcpNoDelay, + broker->getTimer(), opts.maxNegotiateTime))); QPID_LOG(notice, "Listening for " << (options.multiplex ? "SSL or TCP" : "SSL") << " connections on TCP port " << @@ -156,14 +162,16 @@ static struct SslPlugin : public Plugin { } sslPlugin; template <class T> -SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& options, int backlog, bool nodelay) : +SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& options, int backlog, bool nodelay, Timer& timer, uint32_t maxTime) : + brokerTimer(timer), + maxNegotiateTime(maxTime), tcpNoDelay(nodelay), listeningPort(listener.listen(options.port, backlog, options.certName, options.clientAuth)), nodict(options.nodict) {} void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s, ConnectionCodec::Factory* f, bool isClient, - bool tcpNoDelay, bool nodict) { + Timer& timer, uint32_t maxTime, bool tcpNoDelay, bool nodict) { qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getFullAddress(), f, nodict); if (tcpNoDelay) { @@ -183,7 +191,7 @@ void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s, boost::bind(&qpid::sys::ssl::SslHandler::nobuffs, async, _1), boost::bind(&qpid::sys::ssl::SslHandler::idle, async, _1)); - async->init(aio, 4); + async->init(aio,timer, maxTime, 4); aio->start(poller); } @@ -192,7 +200,7 @@ void SslProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f, bool isClient) { const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s); - SslEstablished(poller, *sslSock, f, isClient, tcpNoDelay, nodict); + SslEstablished(poller, *sslSock, f, isClient, brokerTimer, maxNegotiateTime, tcpNoDelay, nodict); } template <class T> @@ -216,7 +224,7 @@ void SslMuxProtocolFactory::established(Poller::shared_ptr poller, const Socket& const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s); if (sslSock) { - SslEstablished(poller, *sslSock, f, isClient, tcpNoDelay, nodict); + SslEstablished(poller, *sslSock, f, isClient, brokerTimer, maxNegotiateTime, tcpNoDelay, nodict); return; } @@ -239,7 +247,7 @@ void SslMuxProtocolFactory::established(Poller::shared_ptr poller, const Socket& boost::bind(&AsynchIOHandler::nobuffs, async, _1), boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, 4); + async->init(aio, brokerTimer, maxNegotiateTime, 4); aio->start(poller); } diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp index bd10a5555a..551440f954 100644 --- a/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -36,14 +36,21 @@ namespace qpid { namespace sys { +class Timer; + class AsynchIOProtocolFactory : public ProtocolFactory { - const bool tcpNoDelay; boost::ptr_vector<Socket> listeners; boost::ptr_vector<AsynchAcceptor> acceptors; + Timer& brokerTimer; + uint32_t maxNegotiateTime; uint16_t listeningPort; + const bool tcpNoDelay; public: - AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay, bool shouldListen); + AsynchIOProtocolFactory(const std::string& host, const std::string& port, + int backlog, bool nodelay, + Timer& timer, uint32_t maxTime, + bool shouldListen); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); void connect(Poller::shared_ptr, const std::string& host, const std::string& port, ConnectionCodec::Factory*, @@ -90,6 +97,7 @@ static class TCPIOPlugin : public Plugin { "", boost::lexical_cast<std::string>(opts.port), opts.connectionBacklog, opts.tcpNoDelay, + broker->getTimer(), opts.maxNegotiateTime, shouldListen)); if (shouldListen) { @@ -101,7 +109,12 @@ static class TCPIOPlugin : public Plugin { } } tcpPlugin; -AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay, bool shouldListen) : +AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, + int backlog, bool nodelay, + Timer& timer, uint32_t maxTime, + bool shouldListen) : + brokerTimer(timer), + maxNegotiateTime(maxTime), tcpNoDelay(nodelay) { if (!shouldListen) { @@ -153,7 +166,7 @@ void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socke boost::bind(&AsynchIOHandler::nobuffs, async, _1), boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, 4); + async->init(aio, brokerTimer, maxNegotiateTime, 4); aio->start(poller); } diff --git a/cpp/src/qpid/sys/Timer.cpp b/cpp/src/qpid/sys/Timer.cpp index 47752e4584..973c6bd8b7 100644 --- a/cpp/src/qpid/sys/Timer.cpp +++ b/cpp/src/qpid/sys/Timer.cpp @@ -35,7 +35,7 @@ TimerTask::TimerTask(Duration timeout, const std::string& n) : sortTime(AbsTime::FarFuture()), period(timeout), nextFireTime(AbsTime::now(), timeout), - cancelled(false) + state(WAITING) {} TimerTask::TimerTask(AbsTime time, const std::string& n) : @@ -43,7 +43,7 @@ TimerTask::TimerTask(AbsTime time, const std::string& n) : sortTime(AbsTime::FarFuture()), period(0), nextFireTime(time), - cancelled(false) + state(WAITING) {} TimerTask::~TimerTask() {} @@ -52,27 +52,48 @@ bool TimerTask::readyToFire() const { return !(nextFireTime > AbsTime::now()); } +bool TimerTask::prepareToFire() { + Monitor::ScopedLock l(stateMonitor); + if (state != CANCELLED) { + state = CALLING; + return true; + } else { + return false; + } +} + void TimerTask::fireTask() { - cancelled = true; fire(); } +void TimerTask::finishFiring() { + Monitor::ScopedLock l(stateMonitor); + if (state != CANCELLED) { + state = WAITING; + stateMonitor.notifyAll(); + } +} + // This can only be used to setup the next fire time. After the Timer has already fired void TimerTask::setupNextFire() { if (period && readyToFire()) { nextFireTime = max(AbsTime::now(), AbsTime(nextFireTime, period)); - cancelled = false; } else { QPID_LOG(error, name << " couldn't setup next timer firing: " << Duration(nextFireTime, AbsTime::now()) << "[" << period << "]"); } } // Only allow tasks to be delayed -void TimerTask::restart() { nextFireTime = max(nextFireTime, AbsTime(AbsTime::now(), period)); } +void TimerTask::restart() { + nextFireTime = max(nextFireTime, AbsTime(AbsTime::now(), period)); +} void TimerTask::cancel() { - ScopedLock<Mutex> l(callbackLock); - cancelled = true; + Monitor::ScopedLock l(stateMonitor); + while (state == CALLING) { + stateMonitor.wait(); + } + state = CANCELLED; } void TimerTask::setFired() { @@ -96,6 +117,22 @@ Timer::~Timer() stop(); } +class TimerTaskCallbackScope { + TimerTask& tt; +public: + explicit TimerTaskCallbackScope(TimerTask& t) : + tt(t) + {} + + operator bool() { + return !tt.prepareToFire(); + } + + ~TimerTaskCallbackScope() { + tt.finishFiring(); + } +}; + // TODO AStitcher 21/08/09 The threshholds for emitting warnings are a little arbitrary void Timer::run() { @@ -112,8 +149,8 @@ void Timer::run() AbsTime start(AbsTime::now()); Duration delay(t->sortTime, start); { - ScopedLock<Mutex> l(t->callbackLock); - if (t->cancelled) { + TimerTaskCallbackScope s(*t); + if (s) { { Monitor::ScopedUnlock u(monitor); drop(t); diff --git a/cpp/src/qpid/sys/Timer.h b/cpp/src/qpid/sys/Timer.h index fccb17dbc2..5731b8d977 100644 --- a/cpp/src/qpid/sys/Timer.h +++ b/cpp/src/qpid/sys/Timer.h @@ -40,6 +40,7 @@ class Timer; class TimerTask : public RefCounted { friend class Timer; + friend class TimerTaskCallbackScope; friend bool operator<(const boost::intrusive_ptr<TimerTask>&, const boost::intrusive_ptr<TimerTask>&); @@ -47,9 +48,11 @@ class TimerTask : public RefCounted { AbsTime sortTime; Duration period; AbsTime nextFireTime; - Mutex callbackLock; - volatile bool cancelled; + qpid::sys::Monitor stateMonitor; + enum {WAITING, CALLING, CANCELLED} state; + bool prepareToFire(); + void finishFiring(); bool readyToFire() const; void fireTask(); diff --git a/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp b/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp index 249b769051..29b91f3e7a 100644 --- a/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp +++ b/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include <unistd.h> #include "qpid/sys/cyrus/CyrusSecurityLayer.h" #include <algorithm> #include "qpid/framing/reply_exceptions.h" diff --git a/cpp/src/qpid/sys/posix/LockFile.cpp b/cpp/src/qpid/sys/posix/LockFile.cpp index c1f1c37b47..9fdf83f1bd 100755 --- a/cpp/src/qpid/sys/posix/LockFile.cpp +++ b/cpp/src/qpid/sys/posix/LockFile.cpp @@ -46,7 +46,7 @@ LockFile::LockFile(const std::string& path_, bool create) errno = 0; int flags=create ? O_WRONLY|O_CREAT|O_NOFOLLOW : O_RDWR; int fd = ::open(path.c_str(), flags, 0644); - if (fd < 0) throw ErrnoException("Cannot open " + path, errno); + if (fd < 0) throw ErrnoException("Cannot open lock file " + path, errno); if (::lockf(fd, F_TLOCK, 0) < 0) { ::close(fd); throw ErrnoException("Cannot lock " + path, errno); diff --git a/cpp/src/qpid/sys/posix/MemStat.cpp b/cpp/src/qpid/sys/posix/MemStat.cpp index 72c53e5886..2fbf119cab 100644 --- a/cpp/src/qpid/sys/posix/MemStat.cpp +++ b/cpp/src/qpid/sys/posix/MemStat.cpp @@ -20,6 +20,7 @@ */ #include "qpid/sys/MemStat.h" + #include <malloc.h> void qpid::sys::MemStat::loadMemInfo(qmf::org::apache::qpid::broker::Memory* object) @@ -35,4 +36,3 @@ void qpid::sys::MemStat::loadMemInfo(qmf::org::apache::qpid::broker::Memory* obj object->set_malloc_keepcost(info.keepcost); } - diff --git a/cpp/src/qpid/sys/posix/SocketAddress.cpp b/cpp/src/qpid/sys/posix/SocketAddress.cpp index a7049c1851..344bd28669 100644 --- a/cpp/src/qpid/sys/posix/SocketAddress.cpp +++ b/cpp/src/qpid/sys/posix/SocketAddress.cpp @@ -35,14 +35,16 @@ namespace sys { SocketAddress::SocketAddress(const std::string& host0, const std::string& port0) : host(host0), port(port0), - addrInfo(0) + addrInfo(0), + currentAddrInfo(0) { } SocketAddress::SocketAddress(const SocketAddress& sa) : host(sa.host), port(sa.port), - addrInfo(0) + addrInfo(0), + currentAddrInfo(0) { } diff --git a/cpp/src/qpid/sys/posix/SystemInfo.cpp b/cpp/src/qpid/sys/posix/SystemInfo.cpp index 540cc8bc91..2b1bbb97df 100755 --- a/cpp/src/qpid/sys/posix/SystemInfo.cpp +++ b/cpp/src/qpid/sys/posix/SystemInfo.cpp @@ -18,10 +18,11 @@ * */ +#include "qpid/log/Statement.h" #include "qpid/sys/SystemInfo.h" - #include "qpid/sys/posix/check.h" - +#include <set> +#include <arpa/inet.h> #include <sys/ioctl.h> #include <sys/utsname.h> #include <sys/types.h> // For FreeBSD @@ -33,6 +34,7 @@ #include <fstream> #include <sstream> #include <netdb.h> +#include <string.h> #ifndef HOST_NAME_MAX # define HOST_NAME_MAX 256 @@ -59,48 +61,100 @@ bool SystemInfo::getLocalHostname (Address &address) { return true; } -static const string LOCALHOST("127.0.0.1"); +static const string LOOPBACK("127.0.0.1"); static const string TCP("tcp"); +// Test IPv4 address for loopback +inline bool IN_IS_ADDR_LOOPBACK(const ::in_addr* a) { + return ((ntohl(a->s_addr) & 0xff000000) == 0x7f000000); +} + +inline bool isLoopback(const ::sockaddr* addr) { + switch (addr->sa_family) { + case AF_INET: return IN_IS_ADDR_LOOPBACK(&((const ::sockaddr_in*)(const void*)addr)->sin_addr); + case AF_INET6: return IN6_IS_ADDR_LOOPBACK(&((const ::sockaddr_in6*)(const void*)addr)->sin6_addr); + default: return false; + } +} + void SystemInfo::getLocalIpAddresses (uint16_t port, std::vector<Address> &addrList) { ::ifaddrs* ifaddr = 0; QPID_POSIX_CHECK(::getifaddrs(&ifaddr)); for (::ifaddrs* ifap = ifaddr; ifap != 0; ifap = ifap->ifa_next) { if (ifap->ifa_addr == 0) continue; - + if (isLoopback(ifap->ifa_addr)) continue; int family = ifap->ifa_addr->sa_family; switch (family) { - case AF_INET: { - char dispName[NI_MAXHOST]; - int rc = ::getnameinfo( - ifap->ifa_addr, - (family == AF_INET) - ? sizeof(struct sockaddr_in) - : sizeof(struct sockaddr_in6), - dispName, sizeof(dispName), - 0, 0, NI_NUMERICHOST); - if (rc != 0) { - throw QPID_POSIX_ERROR(rc); + case AF_INET6: { + // Ignore link local addresses as: + // * The scope id is illegal in URL syntax + // * Clients won't be able to use a link local address + // without adding their own (potentially different) scope id + sockaddr_in6* sa6 = (sockaddr_in6*)(ifap->ifa_addr); + if (IN6_IS_ADDR_LINKLOCAL(&sa6->sin6_addr)) break; + // Fallthrough } - string addr(dispName); - if (addr != LOCALHOST) { - addrList.push_back(Address(TCP, addr, port)); - } - break; - } - // TODO: Url parsing currently can't cope with IPv6 addresses so don't return them - // when it can cope move this line to above "case AF_INET:" - case AF_INET6: - default: + case AF_INET: { + char dispName[NI_MAXHOST]; + int rc = ::getnameinfo( + ifap->ifa_addr, + (family == AF_INET) + ? sizeof(struct sockaddr_in) + : sizeof(struct sockaddr_in6), + dispName, sizeof(dispName), + 0, 0, NI_NUMERICHOST); + if (rc != 0) { + throw QPID_POSIX_ERROR(rc); + } + string addr(dispName); + addrList.push_back(Address(TCP, addr, port)); + break; + } + default: continue; } } - freeifaddrs(ifaddr); + ::freeifaddrs(ifaddr); if (addrList.empty()) { - addrList.push_back(Address(TCP, LOCALHOST, port)); + addrList.push_back(Address(TCP, LOOPBACK, port)); + } +} + +namespace { +struct AddrInfo { + struct addrinfo* ptr; + AddrInfo(const std::string& host) : ptr(0) { + ::addrinfo hints; + ::memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6 + if (::getaddrinfo(host.c_str(), NULL, &hints, &ptr) != 0) + ptr = 0; + } + ~AddrInfo() { if (ptr) ::freeaddrinfo(ptr); } +}; +} + +bool SystemInfo::isLocalHost(const std::string& host) { + std::vector<Address> myAddrs; + getLocalIpAddresses(0, myAddrs); + std::set<string> localHosts; + for (std::vector<Address>::const_iterator i = myAddrs.begin(); i != myAddrs.end(); ++i) + localHosts.insert(i->host); + // Resolve host + AddrInfo ai(host); + if (!ai.ptr) return false; + for (struct addrinfo *res = ai.ptr; res != NULL; res = res->ai_next) { + if (isLoopback(res->ai_addr)) return true; + // Get string form of IP addr + char addr[NI_MAXHOST] = ""; + int error = ::getnameinfo(res->ai_addr, res->ai_addrlen, addr, NI_MAXHOST, NULL, 0, + NI_NUMERICHOST | NI_NUMERICSERV); + if (error) return false; + if (localHosts.find(addr) != localHosts.end()) return true; } + return false; } void SystemInfo::getSystemId (std::string &osName, diff --git a/cpp/src/qpid/sys/ssl/SslHandler.cpp b/cpp/src/qpid/sys/ssl/SslHandler.cpp index 67bf4ea893..8613059f28 100644 --- a/cpp/src/qpid/sys/ssl/SslHandler.cpp +++ b/cpp/src/qpid/sys/ssl/SslHandler.cpp @@ -19,9 +19,9 @@ * */ #include "qpid/sys/ssl/SslHandler.h" - #include "qpid/sys/ssl/SslIo.h" #include "qpid/sys/ssl/SslSocket.h" +#include "qpid/sys/Timer.h" #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/log/Statement.h" @@ -42,6 +42,24 @@ struct Buff : public SslIO::BufferBase { { delete [] bytes;} }; +struct ProtocolTimeoutTask : public sys::TimerTask { + SslHandler& handler; + std::string id; + + ProtocolTimeoutTask(const std::string& i, const Duration& timeout, SslHandler& h) : + TimerTask(timeout, "ProtocolTimeout"), + handler(h), + id(i) + {} + + void fire() { + // If this fires it means that we didn't negotiate the connection in the timeout period + // Schedule closing the connection for the io thread + QPID_LOG(error, "Connection " << id << " No protocol received closing"); + handler.abort(); + } +}; + SslHandler::SslHandler(std::string id, ConnectionCodec::Factory* f, bool _nodict) : identifier(id), aio(0), @@ -55,12 +73,18 @@ SslHandler::SslHandler(std::string id, ConnectionCodec::Factory* f, bool _nodict SslHandler::~SslHandler() { if (codec) codec->closed(); + if (timeoutTimerTask) + timeoutTimerTask->cancel(); delete codec; } -void SslHandler::init(SslIO* a, int numBuffs) { +void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs) { aio = a; + // Start timer for this connection + timeoutTimerTask = new ProtocolTimeoutTask(identifier, maxTime*TIME_MSEC, *this); + timer.add(timeoutTimerTask); + // Give connection some buffers to use for (int i = 0; i < numBuffs; i++) { aio->queueReadBuffer(new Buff); @@ -80,8 +104,10 @@ void SslHandler::write(const framing::ProtocolInitiation& data) } void SslHandler::abort() { - // TODO: can't implement currently as underlying functionality not implemented - // aio->requestCallback(boost::bind(&SslHandler::eof, this, _1)); + // Don't disconnect if we're already disconnecting + if (!readError) { + aio->requestCallback(boost::bind(&SslHandler::eof, this, _1)); + } } void SslHandler::activateOutput() { aio->notifyPendingWrite(); @@ -109,6 +135,9 @@ void SslHandler::readbuff(SslIO& , SslIO::BufferBase* buff) { framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); framing::ProtocolInitiation protocolInit; if (protocolInit.decode(in)) { + // We've just got the protocol negotiation so we can cancel the timeout for that + timeoutTimerTask->cancel(); + decoded = in.getPosition(); QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")"); try { @@ -169,6 +198,10 @@ void SslHandler::idle(SslIO&){ if (isClient && codec == 0) { codec = factory->create(*this, identifier, getSecuritySettings(aio)); write(framing::ProtocolInitiation(codec->getVersion())); + // We've just sent the protocol negotiation so we can cancel the timeout for that + // This is not ideal, because we've not received anything yet, but heartbeats will + // be active soon + timeoutTimerTask->cancel(); return; } if (codec == 0) return; diff --git a/cpp/src/qpid/sys/ssl/SslHandler.h b/cpp/src/qpid/sys/ssl/SslHandler.h index 400fa317fd..74df2b7fb0 100644 --- a/cpp/src/qpid/sys/ssl/SslHandler.h +++ b/cpp/src/qpid/sys/ssl/SslHandler.h @@ -25,6 +25,8 @@ #include "qpid/sys/ConnectionCodec.h" #include "qpid/sys/OutputControl.h" +#include <boost/intrusive_ptr.hpp> + namespace qpid { namespace framing { @@ -32,6 +34,10 @@ namespace framing { } namespace sys { + +class Timer; +class TimerTask; + namespace ssl { class SslIO; @@ -46,6 +52,7 @@ class SslHandler : public OutputControl { bool readError; bool isClient; bool nodict; + boost::intrusive_ptr<sys::TimerTask> timeoutTimerTask; void write(const framing::ProtocolInitiation&); qpid::sys::SecuritySettings getSecuritySettings(SslIO* aio); @@ -53,7 +60,7 @@ class SslHandler : public OutputControl { public: SslHandler(std::string id, ConnectionCodec::Factory* f, bool nodict); ~SslHandler(); - void init(SslIO* a, int numBuffs); + void init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs); void setClient() { isClient = true; } diff --git a/cpp/src/qpid/sys/ssl/SslIo.cpp b/cpp/src/qpid/sys/ssl/SslIo.cpp index 2a7cf16923..789c205ead 100644 --- a/cpp/src/qpid/sys/ssl/SslIo.cpp +++ b/cpp/src/qpid/sys/ssl/SslIo.cpp @@ -257,6 +257,18 @@ void SslIO::queueWriteClose() { DispatchHandle::rewatchWrite(); } +void SslIO::requestCallback(RequestCallback callback) { + // TODO creating a function object every time isn't all that + // efficient - if this becomes heavily used do something better (what?) + assert(callback); + DispatchHandle::call(boost::bind(&SslIO::requestedCall, this, callback)); +} + +void SslIO::requestedCall(RequestCallback callback) { + assert(callback); + callback(*this); +} + /** Return a queued buffer if there are enough * to spare */ diff --git a/cpp/src/qpid/sys/ssl/SslIo.h b/cpp/src/qpid/sys/ssl/SslIo.h index c980d73831..b795594cd9 100644 --- a/cpp/src/qpid/sys/ssl/SslIo.h +++ b/cpp/src/qpid/sys/ssl/SslIo.h @@ -125,6 +125,7 @@ public: typedef boost::function2<void, SslIO&, const SslSocket&> ClosedCallback; typedef boost::function1<void, SslIO&> BuffersEmptyCallback; typedef boost::function1<void, SslIO&> IdleCallback; + typedef boost::function1<void, SslIO&> RequestCallback; private: @@ -159,6 +160,7 @@ public: void notifyPendingWrite(); void queueWriteClose(); bool writeQueueEmpty() { return writeQueue.empty(); } + void requestCallback(RequestCallback); BufferBase* getQueuedBuffer(); qpid::sys::SecuritySettings getSecuritySettings(); @@ -168,6 +170,7 @@ private: void readable(qpid::sys::DispatchHandle& handle); void writeable(qpid::sys::DispatchHandle& handle); void disconnected(qpid::sys::DispatchHandle& handle); + void requestedCall(RequestCallback); void close(qpid::sys::DispatchHandle& handle); }; diff --git a/cpp/src/qpid/sys/unordered_map.h b/cpp/src/qpid/sys/unordered_map.h index 5f7f9567c5..7ae71c3daa 100644 --- a/cpp/src/qpid/sys/unordered_map.h +++ b/cpp/src/qpid/sys/unordered_map.h @@ -23,6 +23,8 @@ #ifdef _MSC_VER # include <unordered_map> +#elif defined(__SUNPRO_CC) +# include <boost/tr1/unordered_map.hpp> #else # include <tr1/unordered_map> #endif /* _MSC_VER */ diff --git a/cpp/src/qpid/sys/windows/IocpPoller.cpp b/cpp/src/qpid/sys/windows/IocpPoller.cpp index 1805dd2cd8..c81cef87b0 100755 --- a/cpp/src/qpid/sys/windows/IocpPoller.cpp +++ b/cpp/src/qpid/sys/windows/IocpPoller.cpp @@ -96,6 +96,7 @@ void Poller::shutdown() { // Allow sloppy code to shut us down more than once. if (impl->isShutdown) return; + impl->isShutdown = true; ULONG_PTR key = 1; // Tell wait() it's a shutdown, not I/O PostQueuedCompletionStatus(impl->iocp, 0, key, 0); } @@ -110,7 +111,7 @@ bool Poller::interrupt(PollerHandle&) { } void Poller::run() { - do { + while (!impl->isShutdown) { Poller::Event event = this->wait(); // Handle shutdown @@ -124,7 +125,7 @@ void Poller::run() { // This should be impossible assert(false); } - } while (true); + } } void Poller::monitorHandle(PollerHandle& handle, Direction dir) { diff --git a/cpp/src/qpid/sys/windows/Socket.cpp b/cpp/src/qpid/sys/windows/Socket.cpp index b085f67539..a4374260cc 100644 --- a/cpp/src/qpid/sys/windows/Socket.cpp +++ b/cpp/src/qpid/sys/windows/Socket.cpp @@ -266,14 +266,17 @@ int Socket::getError() const void Socket::setTcpNoDelay() const { - int flag = 1; - int result = setsockopt(impl->fd, - IPPROTO_TCP, - TCP_NODELAY, - (char *)&flag, - sizeof(flag)); - QPID_WINSOCK_CHECK(result); + SOCKET& socket = impl->fd; nodelay = true; + if (socket != INVALID_SOCKET) { + int flag = 1; + int result = setsockopt(impl->fd, + IPPROTO_TCP, + TCP_NODELAY, + (char *)&flag, + sizeof(flag)); + QPID_WINSOCK_CHECK(result); + } } inline IOHandlePrivate* IOHandlePrivate::getImpl(const qpid::sys::IOHandle &h) diff --git a/cpp/src/qpid/sys/windows/SystemInfo.cpp b/cpp/src/qpid/sys/windows/SystemInfo.cpp index 4da440bdd4..cef78dcc60 100755 --- a/cpp/src/qpid/sys/windows/SystemInfo.cpp +++ b/cpp/src/qpid/sys/windows/SystemInfo.cpp @@ -23,9 +23,11 @@ # define _WIN32_WINNT 0x0501 #endif -#include "qpid/sys/IntegerTypes.h" #include "qpid/sys/SystemInfo.h" +#include "qpid/sys/IntegerTypes.h" +#include "qpid/Exception.h"
+#include <assert.h> #include <winsock2.h> #include <ws2tcpip.h> #include <windows.h> @@ -93,6 +95,12 @@ void SystemInfo::getLocalIpAddresses (uint16_t port, } } +bool SystemInfo::isLocalHost(const std::string& candidateHost) { + // FIXME aconway 2012-05-03: not implemented. + assert(0); + throw Exception("Not implemented: isLocalHost"); +} + void SystemInfo::getSystemId (std::string &osName, std::string &nodeName, std::string &release, |
