summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
committerKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
commitd43d1912b376322e27fdcda551a73f9ff5487972 (patch)
treece493e10baa95f44be8beb5778ce51783463196d /cpp/src/qpid/sys
parent04877fec0c6346edec67072d7f2d247740cf2af5 (diff)
downloadqpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.cpp43
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.h10
-rw-r--r--cpp/src/qpid/sys/MemStat.cpp (renamed from cpp/src/qpid/sys/windows/MemStat.cpp)4
-rw-r--r--cpp/src/qpid/sys/SslPlugin.cpp28
-rw-r--r--cpp/src/qpid/sys/TCPIOPlugin.cpp21
-rw-r--r--cpp/src/qpid/sys/Timer.cpp55
-rw-r--r--cpp/src/qpid/sys/Timer.h7
-rw-r--r--cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp1
-rwxr-xr-xcpp/src/qpid/sys/posix/LockFile.cpp2
-rw-r--r--cpp/src/qpid/sys/posix/MemStat.cpp2
-rw-r--r--cpp/src/qpid/sys/posix/SocketAddress.cpp6
-rwxr-xr-xcpp/src/qpid/sys/posix/SystemInfo.cpp108
-rw-r--r--cpp/src/qpid/sys/ssl/SslHandler.cpp41
-rw-r--r--cpp/src/qpid/sys/ssl/SslHandler.h9
-rw-r--r--cpp/src/qpid/sys/ssl/SslIo.cpp12
-rw-r--r--cpp/src/qpid/sys/ssl/SslIo.h3
-rw-r--r--cpp/src/qpid/sys/unordered_map.h2
-rwxr-xr-xcpp/src/qpid/sys/windows/IocpPoller.cpp5
-rw-r--r--cpp/src/qpid/sys/windows/Socket.cpp17
-rwxr-xr-xcpp/src/qpid/sys/windows/SystemInfo.cpp10
20 files changed, 310 insertions, 76 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp
index 5233002850..8a485db72d 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.cpp
+++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp
@@ -23,6 +23,7 @@
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Socket.h"
#include "qpid/sys/SecuritySettings.h"
+#include "qpid/sys/Timer.h"
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/log/Statement.h"
@@ -41,11 +42,30 @@ struct Buff : public AsynchIO::BufferBase {
{ delete [] bytes;}
};
-AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) :
+struct ProtocolTimeoutTask : public sys::TimerTask {
+ AsynchIOHandler& handler;
+ std::string id;
+
+ ProtocolTimeoutTask(const std::string& i, const Duration& timeout, AsynchIOHandler& h) :
+ TimerTask(timeout, "ProtocolTimeout"),
+ handler(h),
+ id(i)
+ {}
+
+ void fire() {
+ // If this fires it means that we didn't negotiate the connection in the timeout period
+ // Schedule closing the connection for the io thread
+ QPID_LOG(error, "Connection " << id << " No protocol received closing");
+ handler.abort();
+ }
+};
+
+AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f) :
identifier(id),
aio(0),
factory(f),
codec(0),
+ reads(0),
readError(false),
isClient(false),
readCredit(InfiniteCredit)
@@ -54,12 +74,18 @@ AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) :
AsynchIOHandler::~AsynchIOHandler() {
if (codec)
codec->closed();
+ if (timeoutTimerTask)
+ timeoutTimerTask->cancel();
delete codec;
}
-void AsynchIOHandler::init(AsynchIO* a, int numBuffs) {
+void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime, int numBuffs) {
aio = a;
+ // Start timer for this connection
+ timeoutTimerTask = new ProtocolTimeoutTask(identifier, maxTime*TIME_MSEC, *this);
+ timer.add(timeoutTimerTask);
+
// Give connection some buffers to use
for (int i = 0; i < numBuffs; i++) {
aio->queueReadBuffer(new Buff);
@@ -129,10 +155,18 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
}
}
+ ++reads;
size_t decoded = 0;
if (codec) { // Already initiated
try {
decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
+ // When we've decoded 3 reads (probably frames) we will have authenticated and
+ // started heartbeats, if specified, in many (but not all) cases so now we will cancel
+ // the idle connection timeout - this is really hacky, and would be better implemented
+ // in the connection, but that isn't actually created until the first decode.
+ if (reads == 3) {
+ timeoutTimerTask->cancel();
+ }
}catch(const std::exception& e){
QPID_LOG(error, e.what());
readError = true;
@@ -143,6 +177,7 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
framing::ProtocolInitiation protocolInit;
if (protocolInit.decode(in)) {
decoded = in.getPosition();
+
QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
try {
codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings());
@@ -202,6 +237,10 @@ void AsynchIOHandler::idle(AsynchIO&){
if (isClient && codec == 0) {
codec = factory->create(*this, identifier, SecuritySettings());
write(framing::ProtocolInitiation(codec->getVersion()));
+ // We've just sent the protocol negotiation so we can cancel the timeout for that
+ // This is not ideal, because we've not received anything yet, but heartbeats will
+ // be active soon
+ timeoutTimerTask->cancel();
return;
}
if (codec == 0) return;
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h
index b9867606c4..307aad5b85 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.h
+++ b/cpp/src/qpid/sys/AsynchIOHandler.h
@@ -27,6 +27,8 @@
#include "qpid/sys/Mutex.h"
#include "qpid/CommonImportExport.h"
+#include <boost/intrusive_ptr.hpp>
+
namespace qpid {
namespace framing {
@@ -38,24 +40,28 @@ namespace sys {
class AsynchIO;
struct AsynchIOBufferBase;
class Socket;
+class Timer;
+class TimerTask;
class AsynchIOHandler : public OutputControl {
std::string identifier;
AsynchIO* aio;
ConnectionCodec::Factory* factory;
ConnectionCodec* codec;
+ uint32_t reads;
bool readError;
bool isClient;
AtomicValue<int32_t> readCredit;
static const int32_t InfiniteCredit = -1;
Mutex creditLock;
+ boost::intrusive_ptr<sys::TimerTask> timeoutTimerTask;
void write(const framing::ProtocolInitiation&);
public:
- QPID_COMMON_EXTERN AsynchIOHandler(std::string id, ConnectionCodec::Factory* f);
+ QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f );
QPID_COMMON_EXTERN ~AsynchIOHandler();
- QPID_COMMON_EXTERN void init(AsynchIO* a, int numBuffs);
+ QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime, int numBuffs);
QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; }
diff --git a/cpp/src/qpid/sys/windows/MemStat.cpp b/cpp/src/qpid/sys/MemStat.cpp
index b1d25c5fc5..c71fba785c 100644
--- a/cpp/src/qpid/sys/windows/MemStat.cpp
+++ b/cpp/src/qpid/sys/MemStat.cpp
@@ -21,9 +21,11 @@
#include "qpid/sys/MemStat.h"
+// Null memory stats provider:
+// This is for platforms that do not have a way to get allocated
+// memory status
void qpid::sys::MemStat::loadMemInfo(qmf::org::apache::qpid::broker::Memory*)
{
- // TODO: Add Windows-specific memory stats to the object and load them here.
}
diff --git a/cpp/src/qpid/sys/SslPlugin.cpp b/cpp/src/qpid/sys/SslPlugin.cpp
index 48baef9042..3b50527c0a 100644
--- a/cpp/src/qpid/sys/SslPlugin.cpp
+++ b/cpp/src/qpid/sys/SslPlugin.cpp
@@ -39,6 +39,8 @@
namespace qpid {
namespace sys {
+class Timer;
+
using namespace qpid::sys::ssl;
struct SslServerOptions : ssl::SslOptions
@@ -68,6 +70,8 @@ class SslProtocolFactoryTmpl : public ProtocolFactory {
typedef SslAcceptorTmpl<T> SslAcceptor;
+ Timer& brokerTimer;
+ uint32_t maxNegotiateTime;
const bool tcpNoDelay;
T listener;
const uint16_t listeningPort;
@@ -75,7 +79,7 @@ class SslProtocolFactoryTmpl : public ProtocolFactory {
bool nodict;
public:
- SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay);
+ SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay, Timer& timer, uint32_t maxTime);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
@@ -132,16 +136,18 @@ static struct SslPlugin : public Plugin {
try {
ssl::initNSS(options, true);
nssInitialized = true;
-
+
const broker::Broker::Options& opts = broker->getOptions();
ProtocolFactory::shared_ptr protocol(options.multiplex ?
static_cast<ProtocolFactory*>(new SslMuxProtocolFactory(options,
opts.connectionBacklog,
- opts.tcpNoDelay)) :
+ opts.tcpNoDelay,
+ broker->getTimer(), opts.maxNegotiateTime)) :
static_cast<ProtocolFactory*>(new SslProtocolFactory(options,
opts.connectionBacklog,
- opts.tcpNoDelay)));
+ opts.tcpNoDelay,
+ broker->getTimer(), opts.maxNegotiateTime)));
QPID_LOG(notice, "Listening for " <<
(options.multiplex ? "SSL or TCP" : "SSL") <<
" connections on TCP port " <<
@@ -156,14 +162,16 @@ static struct SslPlugin : public Plugin {
} sslPlugin;
template <class T>
-SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& options, int backlog, bool nodelay) :
+SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& options, int backlog, bool nodelay, Timer& timer, uint32_t maxTime) :
+ brokerTimer(timer),
+ maxNegotiateTime(maxTime),
tcpNoDelay(nodelay), listeningPort(listener.listen(options.port, backlog, options.certName, options.clientAuth)),
nodict(options.nodict)
{}
void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s,
ConnectionCodec::Factory* f, bool isClient,
- bool tcpNoDelay, bool nodict) {
+ Timer& timer, uint32_t maxTime, bool tcpNoDelay, bool nodict) {
qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getFullAddress(), f, nodict);
if (tcpNoDelay) {
@@ -183,7 +191,7 @@ void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s,
boost::bind(&qpid::sys::ssl::SslHandler::nobuffs, async, _1),
boost::bind(&qpid::sys::ssl::SslHandler::idle, async, _1));
- async->init(aio, 4);
+ async->init(aio,timer, maxTime, 4);
aio->start(poller);
}
@@ -192,7 +200,7 @@ void SslProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
ConnectionCodec::Factory* f, bool isClient) {
const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
- SslEstablished(poller, *sslSock, f, isClient, tcpNoDelay, nodict);
+ SslEstablished(poller, *sslSock, f, isClient, brokerTimer, maxNegotiateTime, tcpNoDelay, nodict);
}
template <class T>
@@ -216,7 +224,7 @@ void SslMuxProtocolFactory::established(Poller::shared_ptr poller, const Socket&
const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
if (sslSock) {
- SslEstablished(poller, *sslSock, f, isClient, tcpNoDelay, nodict);
+ SslEstablished(poller, *sslSock, f, isClient, brokerTimer, maxNegotiateTime, tcpNoDelay, nodict);
return;
}
@@ -239,7 +247,7 @@ void SslMuxProtocolFactory::established(Poller::shared_ptr poller, const Socket&
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, 4);
+ async->init(aio, brokerTimer, maxNegotiateTime, 4);
aio->start(poller);
}
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp
index bd10a5555a..551440f954 100644
--- a/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -36,14 +36,21 @@
namespace qpid {
namespace sys {
+class Timer;
+
class AsynchIOProtocolFactory : public ProtocolFactory {
- const bool tcpNoDelay;
boost::ptr_vector<Socket> listeners;
boost::ptr_vector<AsynchAcceptor> acceptors;
+ Timer& brokerTimer;
+ uint32_t maxNegotiateTime;
uint16_t listeningPort;
+ const bool tcpNoDelay;
public:
- AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay, bool shouldListen);
+ AsynchIOProtocolFactory(const std::string& host, const std::string& port,
+ int backlog, bool nodelay,
+ Timer& timer, uint32_t maxTime,
+ bool shouldListen);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
@@ -90,6 +97,7 @@ static class TCPIOPlugin : public Plugin {
"", boost::lexical_cast<std::string>(opts.port),
opts.connectionBacklog,
opts.tcpNoDelay,
+ broker->getTimer(), opts.maxNegotiateTime,
shouldListen));
if (shouldListen) {
@@ -101,7 +109,12 @@ static class TCPIOPlugin : public Plugin {
}
} tcpPlugin;
-AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay, bool shouldListen) :
+AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port,
+ int backlog, bool nodelay,
+ Timer& timer, uint32_t maxTime,
+ bool shouldListen) :
+ brokerTimer(timer),
+ maxNegotiateTime(maxTime),
tcpNoDelay(nodelay)
{
if (!shouldListen) {
@@ -153,7 +166,7 @@ void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socke
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, 4);
+ async->init(aio, brokerTimer, maxNegotiateTime, 4);
aio->start(poller);
}
diff --git a/cpp/src/qpid/sys/Timer.cpp b/cpp/src/qpid/sys/Timer.cpp
index 47752e4584..973c6bd8b7 100644
--- a/cpp/src/qpid/sys/Timer.cpp
+++ b/cpp/src/qpid/sys/Timer.cpp
@@ -35,7 +35,7 @@ TimerTask::TimerTask(Duration timeout, const std::string& n) :
sortTime(AbsTime::FarFuture()),
period(timeout),
nextFireTime(AbsTime::now(), timeout),
- cancelled(false)
+ state(WAITING)
{}
TimerTask::TimerTask(AbsTime time, const std::string& n) :
@@ -43,7 +43,7 @@ TimerTask::TimerTask(AbsTime time, const std::string& n) :
sortTime(AbsTime::FarFuture()),
period(0),
nextFireTime(time),
- cancelled(false)
+ state(WAITING)
{}
TimerTask::~TimerTask() {}
@@ -52,27 +52,48 @@ bool TimerTask::readyToFire() const {
return !(nextFireTime > AbsTime::now());
}
+bool TimerTask::prepareToFire() {
+ Monitor::ScopedLock l(stateMonitor);
+ if (state != CANCELLED) {
+ state = CALLING;
+ return true;
+ } else {
+ return false;
+ }
+}
+
void TimerTask::fireTask() {
- cancelled = true;
fire();
}
+void TimerTask::finishFiring() {
+ Monitor::ScopedLock l(stateMonitor);
+ if (state != CANCELLED) {
+ state = WAITING;
+ stateMonitor.notifyAll();
+ }
+}
+
// This can only be used to setup the next fire time. After the Timer has already fired
void TimerTask::setupNextFire() {
if (period && readyToFire()) {
nextFireTime = max(AbsTime::now(), AbsTime(nextFireTime, period));
- cancelled = false;
} else {
QPID_LOG(error, name << " couldn't setup next timer firing: " << Duration(nextFireTime, AbsTime::now()) << "[" << period << "]");
}
}
// Only allow tasks to be delayed
-void TimerTask::restart() { nextFireTime = max(nextFireTime, AbsTime(AbsTime::now(), period)); }
+void TimerTask::restart() {
+ nextFireTime = max(nextFireTime, AbsTime(AbsTime::now(), period));
+}
void TimerTask::cancel() {
- ScopedLock<Mutex> l(callbackLock);
- cancelled = true;
+ Monitor::ScopedLock l(stateMonitor);
+ while (state == CALLING) {
+ stateMonitor.wait();
+ }
+ state = CANCELLED;
}
void TimerTask::setFired() {
@@ -96,6 +117,22 @@ Timer::~Timer()
stop();
}
+class TimerTaskCallbackScope {
+ TimerTask& tt;
+public:
+ explicit TimerTaskCallbackScope(TimerTask& t) :
+ tt(t)
+ {}
+
+ operator bool() {
+ return !tt.prepareToFire();
+ }
+
+ ~TimerTaskCallbackScope() {
+ tt.finishFiring();
+ }
+};
+
// TODO AStitcher 21/08/09 The threshholds for emitting warnings are a little arbitrary
void Timer::run()
{
@@ -112,8 +149,8 @@ void Timer::run()
AbsTime start(AbsTime::now());
Duration delay(t->sortTime, start);
{
- ScopedLock<Mutex> l(t->callbackLock);
- if (t->cancelled) {
+ TimerTaskCallbackScope s(*t);
+ if (s) {
{
Monitor::ScopedUnlock u(monitor);
drop(t);
diff --git a/cpp/src/qpid/sys/Timer.h b/cpp/src/qpid/sys/Timer.h
index fccb17dbc2..5731b8d977 100644
--- a/cpp/src/qpid/sys/Timer.h
+++ b/cpp/src/qpid/sys/Timer.h
@@ -40,6 +40,7 @@ class Timer;
class TimerTask : public RefCounted {
friend class Timer;
+ friend class TimerTaskCallbackScope;
friend bool operator<(const boost::intrusive_ptr<TimerTask>&,
const boost::intrusive_ptr<TimerTask>&);
@@ -47,9 +48,11 @@ class TimerTask : public RefCounted {
AbsTime sortTime;
Duration period;
AbsTime nextFireTime;
- Mutex callbackLock;
- volatile bool cancelled;
+ qpid::sys::Monitor stateMonitor;
+ enum {WAITING, CALLING, CANCELLED} state;
+ bool prepareToFire();
+ void finishFiring();
bool readyToFire() const;
void fireTask();
diff --git a/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp b/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp
index 249b769051..29b91f3e7a 100644
--- a/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp
+++ b/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include <unistd.h>
#include "qpid/sys/cyrus/CyrusSecurityLayer.h"
#include <algorithm>
#include "qpid/framing/reply_exceptions.h"
diff --git a/cpp/src/qpid/sys/posix/LockFile.cpp b/cpp/src/qpid/sys/posix/LockFile.cpp
index c1f1c37b47..9fdf83f1bd 100755
--- a/cpp/src/qpid/sys/posix/LockFile.cpp
+++ b/cpp/src/qpid/sys/posix/LockFile.cpp
@@ -46,7 +46,7 @@ LockFile::LockFile(const std::string& path_, bool create)
errno = 0;
int flags=create ? O_WRONLY|O_CREAT|O_NOFOLLOW : O_RDWR;
int fd = ::open(path.c_str(), flags, 0644);
- if (fd < 0) throw ErrnoException("Cannot open " + path, errno);
+ if (fd < 0) throw ErrnoException("Cannot open lock file " + path, errno);
if (::lockf(fd, F_TLOCK, 0) < 0) {
::close(fd);
throw ErrnoException("Cannot lock " + path, errno);
diff --git a/cpp/src/qpid/sys/posix/MemStat.cpp b/cpp/src/qpid/sys/posix/MemStat.cpp
index 72c53e5886..2fbf119cab 100644
--- a/cpp/src/qpid/sys/posix/MemStat.cpp
+++ b/cpp/src/qpid/sys/posix/MemStat.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/sys/MemStat.h"
+
#include <malloc.h>
void qpid::sys::MemStat::loadMemInfo(qmf::org::apache::qpid::broker::Memory* object)
@@ -35,4 +36,3 @@ void qpid::sys::MemStat::loadMemInfo(qmf::org::apache::qpid::broker::Memory* obj
object->set_malloc_keepcost(info.keepcost);
}
-
diff --git a/cpp/src/qpid/sys/posix/SocketAddress.cpp b/cpp/src/qpid/sys/posix/SocketAddress.cpp
index a7049c1851..344bd28669 100644
--- a/cpp/src/qpid/sys/posix/SocketAddress.cpp
+++ b/cpp/src/qpid/sys/posix/SocketAddress.cpp
@@ -35,14 +35,16 @@ namespace sys {
SocketAddress::SocketAddress(const std::string& host0, const std::string& port0) :
host(host0),
port(port0),
- addrInfo(0)
+ addrInfo(0),
+ currentAddrInfo(0)
{
}
SocketAddress::SocketAddress(const SocketAddress& sa) :
host(sa.host),
port(sa.port),
- addrInfo(0)
+ addrInfo(0),
+ currentAddrInfo(0)
{
}
diff --git a/cpp/src/qpid/sys/posix/SystemInfo.cpp b/cpp/src/qpid/sys/posix/SystemInfo.cpp
index 540cc8bc91..2b1bbb97df 100755
--- a/cpp/src/qpid/sys/posix/SystemInfo.cpp
+++ b/cpp/src/qpid/sys/posix/SystemInfo.cpp
@@ -18,10 +18,11 @@
*
*/
+#include "qpid/log/Statement.h"
#include "qpid/sys/SystemInfo.h"
-
#include "qpid/sys/posix/check.h"
-
+#include <set>
+#include <arpa/inet.h>
#include <sys/ioctl.h>
#include <sys/utsname.h>
#include <sys/types.h> // For FreeBSD
@@ -33,6 +34,7 @@
#include <fstream>
#include <sstream>
#include <netdb.h>
+#include <string.h>
#ifndef HOST_NAME_MAX
# define HOST_NAME_MAX 256
@@ -59,48 +61,100 @@ bool SystemInfo::getLocalHostname (Address &address) {
return true;
}
-static const string LOCALHOST("127.0.0.1");
+static const string LOOPBACK("127.0.0.1");
static const string TCP("tcp");
+// Test IPv4 address for loopback
+inline bool IN_IS_ADDR_LOOPBACK(const ::in_addr* a) {
+ return ((ntohl(a->s_addr) & 0xff000000) == 0x7f000000);
+}
+
+inline bool isLoopback(const ::sockaddr* addr) {
+ switch (addr->sa_family) {
+ case AF_INET: return IN_IS_ADDR_LOOPBACK(&((const ::sockaddr_in*)(const void*)addr)->sin_addr);
+ case AF_INET6: return IN6_IS_ADDR_LOOPBACK(&((const ::sockaddr_in6*)(const void*)addr)->sin6_addr);
+ default: return false;
+ }
+}
+
void SystemInfo::getLocalIpAddresses (uint16_t port,
std::vector<Address> &addrList) {
::ifaddrs* ifaddr = 0;
QPID_POSIX_CHECK(::getifaddrs(&ifaddr));
for (::ifaddrs* ifap = ifaddr; ifap != 0; ifap = ifap->ifa_next) {
if (ifap->ifa_addr == 0) continue;
-
+ if (isLoopback(ifap->ifa_addr)) continue;
int family = ifap->ifa_addr->sa_family;
switch (family) {
- case AF_INET: {
- char dispName[NI_MAXHOST];
- int rc = ::getnameinfo(
- ifap->ifa_addr,
- (family == AF_INET)
- ? sizeof(struct sockaddr_in)
- : sizeof(struct sockaddr_in6),
- dispName, sizeof(dispName),
- 0, 0, NI_NUMERICHOST);
- if (rc != 0) {
- throw QPID_POSIX_ERROR(rc);
+ case AF_INET6: {
+ // Ignore link local addresses as:
+ // * The scope id is illegal in URL syntax
+ // * Clients won't be able to use a link local address
+ // without adding their own (potentially different) scope id
+ sockaddr_in6* sa6 = (sockaddr_in6*)(ifap->ifa_addr);
+ if (IN6_IS_ADDR_LINKLOCAL(&sa6->sin6_addr)) break;
+ // Fallthrough
}
- string addr(dispName);
- if (addr != LOCALHOST) {
- addrList.push_back(Address(TCP, addr, port));
- }
- break;
- }
- // TODO: Url parsing currently can't cope with IPv6 addresses so don't return them
- // when it can cope move this line to above "case AF_INET:"
- case AF_INET6:
- default:
+ case AF_INET: {
+ char dispName[NI_MAXHOST];
+ int rc = ::getnameinfo(
+ ifap->ifa_addr,
+ (family == AF_INET)
+ ? sizeof(struct sockaddr_in)
+ : sizeof(struct sockaddr_in6),
+ dispName, sizeof(dispName),
+ 0, 0, NI_NUMERICHOST);
+ if (rc != 0) {
+ throw QPID_POSIX_ERROR(rc);
+ }
+ string addr(dispName);
+ addrList.push_back(Address(TCP, addr, port));
+ break;
+ }
+ default:
continue;
}
}
- freeifaddrs(ifaddr);
+ ::freeifaddrs(ifaddr);
if (addrList.empty()) {
- addrList.push_back(Address(TCP, LOCALHOST, port));
+ addrList.push_back(Address(TCP, LOOPBACK, port));
+ }
+}
+
+namespace {
+struct AddrInfo {
+ struct addrinfo* ptr;
+ AddrInfo(const std::string& host) : ptr(0) {
+ ::addrinfo hints;
+ ::memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6
+ if (::getaddrinfo(host.c_str(), NULL, &hints, &ptr) != 0)
+ ptr = 0;
+ }
+ ~AddrInfo() { if (ptr) ::freeaddrinfo(ptr); }
+};
+}
+
+bool SystemInfo::isLocalHost(const std::string& host) {
+ std::vector<Address> myAddrs;
+ getLocalIpAddresses(0, myAddrs);
+ std::set<string> localHosts;
+ for (std::vector<Address>::const_iterator i = myAddrs.begin(); i != myAddrs.end(); ++i)
+ localHosts.insert(i->host);
+ // Resolve host
+ AddrInfo ai(host);
+ if (!ai.ptr) return false;
+ for (struct addrinfo *res = ai.ptr; res != NULL; res = res->ai_next) {
+ if (isLoopback(res->ai_addr)) return true;
+ // Get string form of IP addr
+ char addr[NI_MAXHOST] = "";
+ int error = ::getnameinfo(res->ai_addr, res->ai_addrlen, addr, NI_MAXHOST, NULL, 0,
+ NI_NUMERICHOST | NI_NUMERICSERV);
+ if (error) return false;
+ if (localHosts.find(addr) != localHosts.end()) return true;
}
+ return false;
}
void SystemInfo::getSystemId (std::string &osName,
diff --git a/cpp/src/qpid/sys/ssl/SslHandler.cpp b/cpp/src/qpid/sys/ssl/SslHandler.cpp
index 67bf4ea893..8613059f28 100644
--- a/cpp/src/qpid/sys/ssl/SslHandler.cpp
+++ b/cpp/src/qpid/sys/ssl/SslHandler.cpp
@@ -19,9 +19,9 @@
*
*/
#include "qpid/sys/ssl/SslHandler.h"
-
#include "qpid/sys/ssl/SslIo.h"
#include "qpid/sys/ssl/SslSocket.h"
+#include "qpid/sys/Timer.h"
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/log/Statement.h"
@@ -42,6 +42,24 @@ struct Buff : public SslIO::BufferBase {
{ delete [] bytes;}
};
+struct ProtocolTimeoutTask : public sys::TimerTask {
+ SslHandler& handler;
+ std::string id;
+
+ ProtocolTimeoutTask(const std::string& i, const Duration& timeout, SslHandler& h) :
+ TimerTask(timeout, "ProtocolTimeout"),
+ handler(h),
+ id(i)
+ {}
+
+ void fire() {
+ // If this fires it means that we didn't negotiate the connection in the timeout period
+ // Schedule closing the connection for the io thread
+ QPID_LOG(error, "Connection " << id << " No protocol received closing");
+ handler.abort();
+ }
+};
+
SslHandler::SslHandler(std::string id, ConnectionCodec::Factory* f, bool _nodict) :
identifier(id),
aio(0),
@@ -55,12 +73,18 @@ SslHandler::SslHandler(std::string id, ConnectionCodec::Factory* f, bool _nodict
SslHandler::~SslHandler() {
if (codec)
codec->closed();
+ if (timeoutTimerTask)
+ timeoutTimerTask->cancel();
delete codec;
}
-void SslHandler::init(SslIO* a, int numBuffs) {
+void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs) {
aio = a;
+ // Start timer for this connection
+ timeoutTimerTask = new ProtocolTimeoutTask(identifier, maxTime*TIME_MSEC, *this);
+ timer.add(timeoutTimerTask);
+
// Give connection some buffers to use
for (int i = 0; i < numBuffs; i++) {
aio->queueReadBuffer(new Buff);
@@ -80,8 +104,10 @@ void SslHandler::write(const framing::ProtocolInitiation& data)
}
void SslHandler::abort() {
- // TODO: can't implement currently as underlying functionality not implemented
- // aio->requestCallback(boost::bind(&SslHandler::eof, this, _1));
+ // Don't disconnect if we're already disconnecting
+ if (!readError) {
+ aio->requestCallback(boost::bind(&SslHandler::eof, this, _1));
+ }
}
void SslHandler::activateOutput() {
aio->notifyPendingWrite();
@@ -109,6 +135,9 @@ void SslHandler::readbuff(SslIO& , SslIO::BufferBase* buff) {
framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
framing::ProtocolInitiation protocolInit;
if (protocolInit.decode(in)) {
+ // We've just got the protocol negotiation so we can cancel the timeout for that
+ timeoutTimerTask->cancel();
+
decoded = in.getPosition();
QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
try {
@@ -169,6 +198,10 @@ void SslHandler::idle(SslIO&){
if (isClient && codec == 0) {
codec = factory->create(*this, identifier, getSecuritySettings(aio));
write(framing::ProtocolInitiation(codec->getVersion()));
+ // We've just sent the protocol negotiation so we can cancel the timeout for that
+ // This is not ideal, because we've not received anything yet, but heartbeats will
+ // be active soon
+ timeoutTimerTask->cancel();
return;
}
if (codec == 0) return;
diff --git a/cpp/src/qpid/sys/ssl/SslHandler.h b/cpp/src/qpid/sys/ssl/SslHandler.h
index 400fa317fd..74df2b7fb0 100644
--- a/cpp/src/qpid/sys/ssl/SslHandler.h
+++ b/cpp/src/qpid/sys/ssl/SslHandler.h
@@ -25,6 +25,8 @@
#include "qpid/sys/ConnectionCodec.h"
#include "qpid/sys/OutputControl.h"
+#include <boost/intrusive_ptr.hpp>
+
namespace qpid {
namespace framing {
@@ -32,6 +34,10 @@ namespace framing {
}
namespace sys {
+
+class Timer;
+class TimerTask;
+
namespace ssl {
class SslIO;
@@ -46,6 +52,7 @@ class SslHandler : public OutputControl {
bool readError;
bool isClient;
bool nodict;
+ boost::intrusive_ptr<sys::TimerTask> timeoutTimerTask;
void write(const framing::ProtocolInitiation&);
qpid::sys::SecuritySettings getSecuritySettings(SslIO* aio);
@@ -53,7 +60,7 @@ class SslHandler : public OutputControl {
public:
SslHandler(std::string id, ConnectionCodec::Factory* f, bool nodict);
~SslHandler();
- void init(SslIO* a, int numBuffs);
+ void init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs);
void setClient() { isClient = true; }
diff --git a/cpp/src/qpid/sys/ssl/SslIo.cpp b/cpp/src/qpid/sys/ssl/SslIo.cpp
index 2a7cf16923..789c205ead 100644
--- a/cpp/src/qpid/sys/ssl/SslIo.cpp
+++ b/cpp/src/qpid/sys/ssl/SslIo.cpp
@@ -257,6 +257,18 @@ void SslIO::queueWriteClose() {
DispatchHandle::rewatchWrite();
}
+void SslIO::requestCallback(RequestCallback callback) {
+ // TODO creating a function object every time isn't all that
+ // efficient - if this becomes heavily used do something better (what?)
+ assert(callback);
+ DispatchHandle::call(boost::bind(&SslIO::requestedCall, this, callback));
+}
+
+void SslIO::requestedCall(RequestCallback callback) {
+ assert(callback);
+ callback(*this);
+}
+
/** Return a queued buffer if there are enough
* to spare
*/
diff --git a/cpp/src/qpid/sys/ssl/SslIo.h b/cpp/src/qpid/sys/ssl/SslIo.h
index c980d73831..b795594cd9 100644
--- a/cpp/src/qpid/sys/ssl/SslIo.h
+++ b/cpp/src/qpid/sys/ssl/SslIo.h
@@ -125,6 +125,7 @@ public:
typedef boost::function2<void, SslIO&, const SslSocket&> ClosedCallback;
typedef boost::function1<void, SslIO&> BuffersEmptyCallback;
typedef boost::function1<void, SslIO&> IdleCallback;
+ typedef boost::function1<void, SslIO&> RequestCallback;
private:
@@ -159,6 +160,7 @@ public:
void notifyPendingWrite();
void queueWriteClose();
bool writeQueueEmpty() { return writeQueue.empty(); }
+ void requestCallback(RequestCallback);
BufferBase* getQueuedBuffer();
qpid::sys::SecuritySettings getSecuritySettings();
@@ -168,6 +170,7 @@ private:
void readable(qpid::sys::DispatchHandle& handle);
void writeable(qpid::sys::DispatchHandle& handle);
void disconnected(qpid::sys::DispatchHandle& handle);
+ void requestedCall(RequestCallback);
void close(qpid::sys::DispatchHandle& handle);
};
diff --git a/cpp/src/qpid/sys/unordered_map.h b/cpp/src/qpid/sys/unordered_map.h
index 5f7f9567c5..7ae71c3daa 100644
--- a/cpp/src/qpid/sys/unordered_map.h
+++ b/cpp/src/qpid/sys/unordered_map.h
@@ -23,6 +23,8 @@
#ifdef _MSC_VER
# include <unordered_map>
+#elif defined(__SUNPRO_CC)
+# include <boost/tr1/unordered_map.hpp>
#else
# include <tr1/unordered_map>
#endif /* _MSC_VER */
diff --git a/cpp/src/qpid/sys/windows/IocpPoller.cpp b/cpp/src/qpid/sys/windows/IocpPoller.cpp
index 1805dd2cd8..c81cef87b0 100755
--- a/cpp/src/qpid/sys/windows/IocpPoller.cpp
+++ b/cpp/src/qpid/sys/windows/IocpPoller.cpp
@@ -96,6 +96,7 @@ void Poller::shutdown() {
// Allow sloppy code to shut us down more than once.
if (impl->isShutdown)
return;
+ impl->isShutdown = true;
ULONG_PTR key = 1; // Tell wait() it's a shutdown, not I/O
PostQueuedCompletionStatus(impl->iocp, 0, key, 0);
}
@@ -110,7 +111,7 @@ bool Poller::interrupt(PollerHandle&) {
}
void Poller::run() {
- do {
+ while (!impl->isShutdown) {
Poller::Event event = this->wait();
// Handle shutdown
@@ -124,7 +125,7 @@ void Poller::run() {
// This should be impossible
assert(false);
}
- } while (true);
+ }
}
void Poller::monitorHandle(PollerHandle& handle, Direction dir) {
diff --git a/cpp/src/qpid/sys/windows/Socket.cpp b/cpp/src/qpid/sys/windows/Socket.cpp
index b085f67539..a4374260cc 100644
--- a/cpp/src/qpid/sys/windows/Socket.cpp
+++ b/cpp/src/qpid/sys/windows/Socket.cpp
@@ -266,14 +266,17 @@ int Socket::getError() const
void Socket::setTcpNoDelay() const
{
- int flag = 1;
- int result = setsockopt(impl->fd,
- IPPROTO_TCP,
- TCP_NODELAY,
- (char *)&flag,
- sizeof(flag));
- QPID_WINSOCK_CHECK(result);
+ SOCKET& socket = impl->fd;
nodelay = true;
+ if (socket != INVALID_SOCKET) {
+ int flag = 1;
+ int result = setsockopt(impl->fd,
+ IPPROTO_TCP,
+ TCP_NODELAY,
+ (char *)&flag,
+ sizeof(flag));
+ QPID_WINSOCK_CHECK(result);
+ }
}
inline IOHandlePrivate* IOHandlePrivate::getImpl(const qpid::sys::IOHandle &h)
diff --git a/cpp/src/qpid/sys/windows/SystemInfo.cpp b/cpp/src/qpid/sys/windows/SystemInfo.cpp
index 4da440bdd4..cef78dcc60 100755
--- a/cpp/src/qpid/sys/windows/SystemInfo.cpp
+++ b/cpp/src/qpid/sys/windows/SystemInfo.cpp
@@ -23,9 +23,11 @@
# define _WIN32_WINNT 0x0501
#endif
-#include "qpid/sys/IntegerTypes.h"
#include "qpid/sys/SystemInfo.h"
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/Exception.h"
+#include <assert.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#include <windows.h>
@@ -93,6 +95,12 @@ void SystemInfo::getLocalIpAddresses (uint16_t port,
}
}
+bool SystemInfo::isLocalHost(const std::string& candidateHost) {
+ // FIXME aconway 2012-05-03: not implemented.
+ assert(0);
+ throw Exception("Not implemented: isLocalHost");
+}
+
void SystemInfo::getSystemId (std::string &osName,
std::string &nodeName,
std::string &release,