summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2011-10-21 14:42:12 +0000
committerStephen D. Huston <shuston@apache.org>2011-10-21 14:42:12 +0000
commitf83677056891e436bf5ba99e79240df2a44528cd (patch)
tree625bfd644b948e89105630759cf6decb0435354d /cpp/src/qpid/sys
parentebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (diff)
downloadqpid-python-QPID-2519.tar.gz
Merged out from trunkQPID-2519
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r--cpp/src/qpid/sys/AggregateOutput.h2
-rw-r--r--cpp/src/qpid/sys/AsynchIO.h4
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.h2
-rw-r--r--cpp/src/qpid/sys/AtomicValue.h7
-rw-r--r--cpp/src/qpid/sys/AtomicValue_gcc.h11
-rw-r--r--cpp/src/qpid/sys/ClusterSafe.cpp12
-rw-r--r--cpp/src/qpid/sys/ClusterSafe.h21
-rw-r--r--cpp/src/qpid/sys/CopyOnWriteArray.h6
-rw-r--r--cpp/src/qpid/sys/PollableQueue.h21
-rw-r--r--cpp/src/qpid/sys/Poller.h2
-rw-r--r--cpp/src/qpid/sys/ProtocolFactory.h3
-rw-r--r--cpp/src/qpid/sys/RdmaIOPlugin.cpp24
-rw-r--r--cpp/src/qpid/sys/Socket.h34
-rw-r--r--cpp/src/qpid/sys/SocketAddress.h10
-rw-r--r--cpp/src/qpid/sys/SslPlugin.cpp154
-rw-r--r--cpp/src/qpid/sys/StateMonitor.h14
-rw-r--r--cpp/src/qpid/sys/TCPIOPlugin.cpp105
-rw-r--r--cpp/src/qpid/sys/Timer.cpp22
-rw-r--r--cpp/src/qpid/sys/Timer.h8
-rw-r--r--cpp/src/qpid/sys/TimerWarnings.cpp16
-rw-r--r--cpp/src/qpid/sys/alloca.h25
-rw-r--r--cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp3
-rw-r--r--cpp/src/qpid/sys/epoll/EpollPoller.cpp7
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp36
-rwxr-xr-xcpp/src/qpid/sys/posix/LockFile.cpp3
-rw-r--r--cpp/src/qpid/sys/posix/Socket.cpp171
-rw-r--r--cpp/src/qpid/sys/posix/SocketAddress.cpp80
-rw-r--r--cpp/src/qpid/sys/posix/Thread.cpp3
-rw-r--r--cpp/src/qpid/sys/posix/Time.cpp7
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.cpp14
-rw-r--r--cpp/src/qpid/sys/rdma/rdma_wrap.cpp15
-rw-r--r--cpp/src/qpid/sys/rdma/rdma_wrap.h10
-rw-r--r--cpp/src/qpid/sys/ssl/SslHandler.h2
-rw-r--r--cpp/src/qpid/sys/ssl/SslIo.cpp22
-rw-r--r--cpp/src/qpid/sys/ssl/SslIo.h18
-rw-r--r--cpp/src/qpid/sys/ssl/SslSocket.cpp163
-rw-r--r--cpp/src/qpid/sys/ssl/SslSocket.h46
-rw-r--r--cpp/src/qpid/sys/windows/AsynchIO.cpp71
-rwxr-xr-xcpp/src/qpid/sys/windows/AsynchIoResult.h6
-rwxr-xr-xcpp/src/qpid/sys/windows/IocpPoller.cpp6
-rw-r--r--cpp/src/qpid/sys/windows/Shlib.cpp3
-rw-r--r--[-rwxr-xr-x]cpp/src/qpid/sys/windows/Socket.cpp188
-rw-r--r--cpp/src/qpid/sys/windows/SocketAddress.cpp120
-rw-r--r--cpp/src/qpid/sys/windows/SslAsynchIO.h3
-rwxr-xr-xcpp/src/qpid/sys/windows/StrError.cpp7
-rwxr-xr-xcpp/src/qpid/sys/windows/Thread.cpp285
-rw-r--r--cpp/src/qpid/sys/windows/Time.cpp36
-rw-r--r--cpp/src/qpid/sys/windows/mingw32_compat.h39
-rw-r--r--cpp/src/qpid/sys/windows/uuid.cpp6
49 files changed, 1243 insertions, 630 deletions
diff --git a/cpp/src/qpid/sys/AggregateOutput.h b/cpp/src/qpid/sys/AggregateOutput.h
index 6dad998bb0..d7c0ff29e3 100644
--- a/cpp/src/qpid/sys/AggregateOutput.h
+++ b/cpp/src/qpid/sys/AggregateOutput.h
@@ -41,7 +41,7 @@ namespace sys {
* doOutput is called in another.
*/
-class AggregateOutput : public OutputTask, public OutputControl
+class QPID_COMMON_CLASS_EXTERN AggregateOutput : public OutputTask, public OutputControl
{
typedef std::deque<OutputTask*> TaskList;
diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h
index 50da8fa4fc..41f74f7ed0 100644
--- a/cpp/src/qpid/sys/AsynchIO.h
+++ b/cpp/src/qpid/sys/AsynchIO.h
@@ -64,8 +64,8 @@ public:
// deletes. To correctly manage heaps when needed, the allocate and
// delete should both be done from the same class/library.
QPID_COMMON_EXTERN static AsynchConnector* create(const Socket& s,
- std::string hostname,
- uint16_t port,
+ const std::string& hostname,
+ const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb);
virtual void start(boost::shared_ptr<Poller> poller) = 0;
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h
index e1885bac79..b9867606c4 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.h
+++ b/cpp/src/qpid/sys/AsynchIOHandler.h
@@ -57,7 +57,7 @@ class AsynchIOHandler : public OutputControl {
QPID_COMMON_EXTERN ~AsynchIOHandler();
QPID_COMMON_EXTERN void init(AsynchIO* a, int numBuffs);
- QPID_COMMON_EXTERN void setClient() { isClient = true; }
+ QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; }
// Output side
QPID_COMMON_EXTERN void abort();
diff --git a/cpp/src/qpid/sys/AtomicValue.h b/cpp/src/qpid/sys/AtomicValue.h
index 6e90eafead..bf995f991e 100644
--- a/cpp/src/qpid/sys/AtomicValue.h
+++ b/cpp/src/qpid/sys/AtomicValue.h
@@ -22,7 +22,12 @@
*
*/
-#if defined( __GNUC__ ) && __GNUC__ >= 4 && ( defined( __i686__ ) || defined( __x86_64__ ) )
+// Have to check for clang before gcc as clang pretends to be gcc too
+#if defined( __clang__ )
+// Use the clang doesn't support atomic builtins for 64 bit values, so use the slow versions
+#include "qpid/sys/AtomicValue_mutex.h"
+
+#elif defined( __GNUC__ ) && __GNUC__ >= 4 && ( defined( __i686__ ) || defined( __x86_64__ ) )
// Use the Gnu C built-in atomic operations if compiling with gcc on a suitable platform.
#include "qpid/sys/AtomicValue_gcc.h"
diff --git a/cpp/src/qpid/sys/AtomicValue_gcc.h b/cpp/src/qpid/sys/AtomicValue_gcc.h
index d022b07c1d..724bae422e 100644
--- a/cpp/src/qpid/sys/AtomicValue_gcc.h
+++ b/cpp/src/qpid/sys/AtomicValue_gcc.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,6 +39,9 @@ class AtomicValue
public:
AtomicValue(T init=0) : value(init) {}
+ // Not atomic. Don't call concurrently with atomic ops.
+ AtomicValue<T>& operator=(T newValue) { value = newValue; return *this; }
+
// Update and return new value.
inline T operator+=(T n) { return __sync_add_and_fetch(&value, n); }
inline T operator-=(T n) { return __sync_sub_and_fetch(&value, n); }
@@ -54,11 +57,11 @@ class AtomicValue
/** If current value == testval then set to newval. Returns the old value. */
T valueCompareAndSwap(T testval, T newval) { return __sync_val_compare_and_swap(&value, testval, newval); }
- /** If current value == testval then set to newval. Returns true if the swap was performed. */
+ /** If current value == testval then set to newval. Returns true if the swap was performed. */
bool boolCompareAndSwap(T testval, T newval) { return __sync_bool_compare_and_swap(&value, testval, newval); }
T get() const { return const_cast<AtomicValue<T>*>(this)->fetchAndAdd(static_cast<T>(0)); }
-
+
private:
T value;
};
diff --git a/cpp/src/qpid/sys/ClusterSafe.cpp b/cpp/src/qpid/sys/ClusterSafe.cpp
index c6b527dfdf..dd37615145 100644
--- a/cpp/src/qpid/sys/ClusterSafe.cpp
+++ b/cpp/src/qpid/sys/ClusterSafe.cpp
@@ -34,8 +34,6 @@ QPID_TSS bool inContext = false;
bool isClusterSafe() { return !inCluster || inContext; }
-bool isCluster() { return inCluster; }
-
void assertClusterSafe() {
if (!isClusterSafe()) {
QPID_LOG(critical, "Modified cluster state outside of cluster context");
@@ -53,6 +51,16 @@ ClusterSafeScope::~ClusterSafeScope() {
inContext = save;
}
+ClusterUnsafeScope::ClusterUnsafeScope() {
+ save = inContext;
+ inContext = false;
+}
+
+ClusterUnsafeScope::~ClusterUnsafeScope() {
+ assert(!inContext);
+ inContext = save;
+}
+
void enableClusterSafe() { inCluster = true; }
}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/ClusterSafe.h b/cpp/src/qpid/sys/ClusterSafe.h
index 15675e8cc5..27e4eb46a5 100644
--- a/cpp/src/qpid/sys/ClusterSafe.h
+++ b/cpp/src/qpid/sys/ClusterSafe.h
@@ -52,14 +52,9 @@ QPID_COMMON_EXTERN void assertClusterSafe();
*/
QPID_COMMON_EXTERN bool isClusterSafe();
-/** Return true in a clustered broker */
-QPID_COMMON_EXTERN bool isCluster();
-
/**
- * Base class for classes that encapsulate state which is replicated
- * to all members of a cluster. Acts as a marker for clustered state
- * and provides functions to assist detecting bugs in cluster
- * behavior.
+ * Mark a scope as cluster safe. Sets isClusterSafe in constructor and resets
+ * to previous value in destructor.
*/
class ClusterSafeScope {
public:
@@ -70,6 +65,18 @@ class ClusterSafeScope {
};
/**
+ * Mark a scope as cluster unsafe. Clears isClusterSafe in constructor and resets
+ * to previous value in destructor.
+ */
+class ClusterUnsafeScope {
+ public:
+ QPID_COMMON_EXTERN ClusterUnsafeScope();
+ QPID_COMMON_EXTERN ~ClusterUnsafeScope();
+ private:
+ bool save;
+};
+
+/**
* Enable cluster-safe assertions. By default they are no-ops.
* Called by cluster code.
*/
diff --git a/cpp/src/qpid/sys/CopyOnWriteArray.h b/cpp/src/qpid/sys/CopyOnWriteArray.h
index 45a231dfd8..41384fc5a4 100644
--- a/cpp/src/qpid/sys/CopyOnWriteArray.h
+++ b/cpp/src/qpid/sys/CopyOnWriteArray.h
@@ -43,6 +43,12 @@ public:
CopyOnWriteArray() {}
CopyOnWriteArray(const CopyOnWriteArray& c) : array(c.array) {}
+ bool empty()
+ {
+ Mutex::ScopedLock l(lock);
+ return array ? array->empty() : true;
+ }
+
void add(T& t)
{
Mutex::ScopedLock l(lock);
diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h
index 81c2301c1e..03b9d0084d 100644
--- a/cpp/src/qpid/sys/PollableQueue.h
+++ b/cpp/src/qpid/sys/PollableQueue.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,7 +28,8 @@
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <algorithm>
-#include <vector>
+#include <deque>
+#include "qpid/log/Statement.h" // FIXME aconway 2011-08-05:
namespace qpid {
namespace sys {
@@ -44,7 +45,7 @@ class Poller;
template <class T>
class PollableQueue {
public:
- typedef std::vector<T> Batch;
+ typedef std::deque<T> Batch;
typedef T value_type;
/**
@@ -68,11 +69,11 @@ class PollableQueue {
const boost::shared_ptr<sys::Poller>& poller);
~PollableQueue();
-
+
/** Push a value onto the queue. Thread safe */
void push(const T& t);
- /** Start polling. */
+ /** Start polling. */
void start();
/** Stop polling and wait for the current callback, if any, to complete. */
@@ -90,14 +91,14 @@ class PollableQueue {
* ensure clean shutdown with no events left on the queue.
*/
void shutdown();
-
+
private:
typedef sys::Monitor::ScopedLock ScopedLock;
typedef sys::Monitor::ScopedUnlock ScopedUnlock;
void dispatch(PollableCondition& cond);
void process();
-
+
mutable sys::Monitor lock;
Callback callback;
PollableCondition condition;
@@ -107,7 +108,7 @@ class PollableQueue {
};
template <class T> PollableQueue<T>::PollableQueue(
- const Callback& cb, const boost::shared_ptr<sys::Poller>& p)
+ const Callback& cb, const boost::shared_ptr<sys::Poller>& p)
: callback(cb),
condition(boost::bind(&PollableQueue<T>::dispatch, this, _1), p),
stopped(true)
@@ -151,7 +152,7 @@ template <class T> void PollableQueue<T>::process() {
putBack = callback(batch);
}
// put back unprocessed items.
- queue.insert(queue.begin(), putBack, typename Batch::const_iterator(batch.end()));
+ queue.insert(queue.begin(), putBack, typename Batch::const_iterator(batch.end()));
batch.clear();
}
}
diff --git a/cpp/src/qpid/sys/Poller.h b/cpp/src/qpid/sys/Poller.h
index ec53b79bad..01ee139ee6 100644
--- a/cpp/src/qpid/sys/Poller.h
+++ b/cpp/src/qpid/sys/Poller.h
@@ -120,7 +120,7 @@ class PollerHandle {
friend struct Poller::Event;
PollerHandlePrivate* const impl;
- QPID_COMMON_EXTERN virtual void processEvent(Poller::EventType) {};
+ QPID_COMMON_INLINE_EXTERN virtual void processEvent(Poller::EventType) {};
public:
QPID_COMMON_EXTERN PollerHandle(const IOHandle& h);
diff --git a/cpp/src/qpid/sys/ProtocolFactory.h b/cpp/src/qpid/sys/ProtocolFactory.h
index b233b2da1a..4d198a92da 100644
--- a/cpp/src/qpid/sys/ProtocolFactory.h
+++ b/cpp/src/qpid/sys/ProtocolFactory.h
@@ -39,11 +39,10 @@ class ProtocolFactory : public qpid::SharedObject<ProtocolFactory>
virtual ~ProtocolFactory() = 0;
virtual uint16_t getPort() const = 0;
- virtual std::string getHost() const = 0;
virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0;
virtual void connect(
boost::shared_ptr<Poller>,
- const std::string& host, int16_t port,
+ const std::string& host, const std::string& port,
ConnectionCodec::Factory* codec,
ConnectFailedCallback failed) = 0;
virtual bool supports(const std::string& /*capability*/) { return false; }
diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp
index d53db20598..6769e5383c 100644
--- a/cpp/src/qpid/sys/RdmaIOPlugin.cpp
+++ b/cpp/src/qpid/sys/RdmaIOPlugin.cpp
@@ -31,7 +31,6 @@
#include "qpid/sys/SecuritySettings.h"
#include <boost/bind.hpp>
-#include <boost/lexical_cast.hpp>
#include <memory>
#include <netdb.h>
@@ -212,10 +211,9 @@ void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) {
if (readError) {
return;
}
- size_t decoded = 0;
try {
if (codec) {
- decoded = codec->decode(buff->bytes(), buff->dataCount());
+ (void) codec->decode(buff->bytes(), buff->dataCount());
}else{
// Need to start protocol processing
initProtocolIn(buff);
@@ -230,9 +228,7 @@ void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) {
void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) {
framing::Buffer in(buff->bytes(), buff->dataCount());
framing::ProtocolInitiation protocolInit;
- size_t decoded = 0;
if (protocolInit.decode(in)) {
- decoded = in.getPosition();
QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")");
codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings());
@@ -254,10 +250,9 @@ class RdmaIOProtocolFactory : public ProtocolFactory {
public:
RdmaIOProtocolFactory(int16_t port, int backlog);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const string& host, int16_t port, ConnectionCodec::Factory*, ConnectFailedCallback);
+ void connect(Poller::shared_ptr, const string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback);
uint16_t getPort() const;
- string getHost() const;
private:
bool request(Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectionCodec::Factory*);
@@ -347,18 +342,7 @@ uint16_t RdmaIOProtocolFactory::getPort() const {
return listeningPort; // Immutable no need for lock.
}
-string RdmaIOProtocolFactory::getHost() const {
- //return listener.getSockname();
- return "";
-}
-
void RdmaIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) {
- ::sockaddr_in sin;
-
- sin.sin_family = AF_INET;
- sin.sin_port = htons(listeningPort);
- sin.sin_addr.s_addr = INADDR_ANY;
-
listener.reset(
new Rdma::Listener(
Rdma::ConnectionParams(65536, Rdma::DEFAULT_WR_ENTRIES),
@@ -387,7 +371,7 @@ void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connectio
void RdmaIOProtocolFactory::connect(
Poller::shared_ptr poller,
- const std::string& host, int16_t port,
+ const std::string& host, const std::string& port,
ConnectionCodec::Factory* f,
ConnectFailedCallback failed)
{
@@ -399,7 +383,7 @@ void RdmaIOProtocolFactory::connect(
boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1),
boost::bind(&RdmaIOProtocolFactory::rejected, this, _1, _2, failed));
- SocketAddress sa(host, boost::lexical_cast<std::string>(port));
+ SocketAddress sa(host, port);
c->start(poller, sa);
}
diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h
index 7d50afc59f..defec4879c 100644
--- a/cpp/src/qpid/sys/Socket.h
+++ b/cpp/src/qpid/sys/Socket.h
@@ -33,21 +33,21 @@ namespace sys {
class Duration;
class SocketAddress;
-class Socket : public IOHandle
+class QPID_COMMON_CLASS_EXTERN Socket : public IOHandle
{
public:
/** Create a socket wrapper for descriptor. */
QPID_COMMON_EXTERN Socket();
- /** Set timeout for read and write */
- void setTimeout(const Duration& interval) const;
+ /** Create a new Socket which is the same address family as this one */
+ QPID_COMMON_EXTERN Socket* createSameTypeSocket() const;
/** Set socket non blocking */
void setNonblocking() const;
QPID_COMMON_EXTERN void setTcpNoDelay() const;
- QPID_COMMON_EXTERN void connect(const std::string& host, uint16_t port) const;
+ QPID_COMMON_EXTERN void connect(const std::string& host, const std::string& port) const;
QPID_COMMON_EXTERN void connect(const SocketAddress&) const;
QPID_COMMON_EXTERN void close() const;
@@ -57,19 +57,9 @@ public:
*@param backlog maximum number of pending connections.
*@return The bound port.
*/
- QPID_COMMON_EXTERN int listen(uint16_t port = 0, int backlog = 10) const;
+ QPID_COMMON_EXTERN int listen(const std::string& host = "", const std::string& port = "0", int backlog = 10) const;
QPID_COMMON_EXTERN int listen(const SocketAddress&, int backlog = 10) const;
- /** Returns the "socket name" ie the address bound to
- * the near end of the socket
- */
- QPID_COMMON_EXTERN std::string getSockname() const;
-
- /** Returns the "peer name" ie the address bound to
- * the remote end of the socket
- */
- std::string getPeername() const;
-
/**
* Returns an address (host and port) for the remote end of the
* socket
@@ -84,16 +74,13 @@ public:
/**
* Returns the full address of the connection: local and remote host and port.
*/
- QPID_COMMON_EXTERN std::string getFullAddress() const { return getLocalAddress()+"-"+getPeerAddress(); }
-
- QPID_COMMON_EXTERN uint16_t getLocalPort() const;
- uint16_t getRemotePort() const;
+ QPID_COMMON_INLINE_EXTERN std::string getFullAddress() const { return getLocalAddress()+"-"+getPeerAddress(); }
/**
* Returns the error code stored in the socket. This may be used
* to determine the result of a non-blocking connect.
*/
- int getError() const;
+ QPID_COMMON_EXTERN int getError() const;
/** Accept a connection from a socket that is already listening
* and has an incoming connection
@@ -108,8 +95,13 @@ private:
/** Create socket */
void createSocket(const SocketAddress&) const;
+public:
+ /** Construct socket with existing handle */
Socket(IOHandlePrivate*);
- mutable std::string connectname;
+
+protected:
+ mutable std::string localname;
+ mutable std::string peername;
mutable bool nonblocking;
mutable bool nodelay;
};
diff --git a/cpp/src/qpid/sys/SocketAddress.h b/cpp/src/qpid/sys/SocketAddress.h
index 27b9642f2c..dcca109d94 100644
--- a/cpp/src/qpid/sys/SocketAddress.h
+++ b/cpp/src/qpid/sys/SocketAddress.h
@@ -27,6 +27,7 @@
#include <string>
struct addrinfo;
+struct sockaddr;
namespace qpid {
namespace sys {
@@ -41,12 +42,19 @@ public:
QPID_COMMON_EXTERN SocketAddress& operator=(const SocketAddress&);
QPID_COMMON_EXTERN ~SocketAddress();
- std::string asString() const;
+ QPID_COMMON_EXTERN bool nextAddress();
+ QPID_COMMON_EXTERN std::string asString(bool numeric=true) const;
+ QPID_COMMON_EXTERN void setAddrInfoPort(uint16_t port);
+
+ QPID_COMMON_EXTERN static std::string asString(::sockaddr const * const addr, size_t addrlen);
+ QPID_COMMON_EXTERN static uint16_t getPort(::sockaddr const * const addr);
+
private:
std::string host;
std::string port;
mutable ::addrinfo* addrInfo;
+ mutable ::addrinfo* currentAddrInfo;
};
}}
diff --git a/cpp/src/qpid/sys/SslPlugin.cpp b/cpp/src/qpid/sys/SslPlugin.cpp
index b0e791d60b..ab15785492 100644
--- a/cpp/src/qpid/sys/SslPlugin.cpp
+++ b/cpp/src/qpid/sys/SslPlugin.cpp
@@ -25,6 +25,8 @@
#include "qpid/sys/ssl/check.h"
#include "qpid/sys/ssl/util.h"
#include "qpid/sys/ssl/SslHandler.h"
+#include "qpid/sys/AsynchIOHandler.h"
+#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/ssl/SslIo.h"
#include "qpid/sys/ssl/SslSocket.h"
#include "qpid/broker/Broker.h"
@@ -37,15 +39,19 @@
namespace qpid {
namespace sys {
+using namespace qpid::sys::ssl;
+
struct SslServerOptions : ssl::SslOptions
{
uint16_t port;
bool clientAuth;
bool nodict;
+ bool multiplex;
SslServerOptions() : port(5671),
clientAuth(false),
- nodict(false)
+ nodict(false),
+ multiplex(false)
{
addOptions()
("ssl-port", optValue(port, "PORT"), "Port on which to listen for SSL connections")
@@ -56,29 +62,37 @@ struct SslServerOptions : ssl::SslOptions
}
};
-class SslProtocolFactory : public ProtocolFactory {
+template <class T>
+class SslProtocolFactoryTmpl : public ProtocolFactory {
+ private:
+
+ typedef SslAcceptorTmpl<T> SslAcceptor;
+
const bool tcpNoDelay;
- qpid::sys::ssl::SslSocket listener;
+ T listener;
const uint16_t listeningPort;
- std::auto_ptr<qpid::sys::ssl::SslAcceptor> acceptor;
+ std::auto_ptr<SslAcceptor> acceptor;
bool nodict;
public:
- SslProtocolFactory(const SslServerOptions&, int backlog, bool nodelay);
+ SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& host, int16_t port,
+ void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
boost::function2<void, int, std::string> failed);
uint16_t getPort() const;
- std::string getHost() const;
bool supports(const std::string& capability);
private:
- void established(Poller::shared_ptr, const qpid::sys::ssl::SslSocket&, ConnectionCodec::Factory*,
+ void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
bool isClient);
};
+typedef SslProtocolFactoryTmpl<SslSocket> SslProtocolFactory;
+typedef SslProtocolFactoryTmpl<SslMuxSocket> SslMuxProtocolFactory;
+
+
// Static instance to initialise plugin
static struct SslPlugin : public Plugin {
SslServerOptions options;
@@ -87,24 +101,48 @@ static struct SslPlugin : public Plugin {
~SslPlugin() { ssl::shutdownNSS(); }
- void earlyInitialize(Target&) {
+ void earlyInitialize(Target& target) {
+ broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
+ if (broker && !options.certDbPath.empty()) {
+ const broker::Broker::Options& opts = broker->getOptions();
+
+ if (opts.port == options.port && // AMQP & AMQPS ports are the same
+ opts.port != 0) {
+ // The presence of this option is used to signal to the TCP
+ // plugin not to start listening on the shared port. The actual
+ // value cannot be configured through the command line or config
+ // file (other than by setting the ports to the same value)
+ // because we are only adding it after option parsing.
+ options.multiplex = true;
+ options.addOptions()("ssl-multiplex", optValue(options.multiplex), "Allow SSL and non-SSL connections on the same port");
+ }
+ }
}
void initialize(Target& target) {
+ QPID_LOG(trace, "Initialising SSL plugin");
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
// Only provide to a Broker
if (broker) {
if (options.certDbPath.empty()) {
- QPID_LOG(info, "SSL plugin not enabled, you must set --ssl-cert-db to enable it.");
+ QPID_LOG(notice, "SSL plugin not enabled, you must set --ssl-cert-db to enable it.");
} else {
try {
ssl::initNSS(options, true);
const broker::Broker::Options& opts = broker->getOptions();
- ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(options,
- opts.connectionBacklog,
- opts.tcpNoDelay));
- QPID_LOG(notice, "Listening for SSL connections on TCP port " << protocol->getPort());
+
+ ProtocolFactory::shared_ptr protocol(options.multiplex ?
+ static_cast<ProtocolFactory*>(new SslMuxProtocolFactory(options,
+ opts.connectionBacklog,
+ opts.tcpNoDelay)) :
+ static_cast<ProtocolFactory*>(new SslProtocolFactory(options,
+ opts.connectionBacklog,
+ opts.tcpNoDelay)));
+ QPID_LOG(notice, "Listening for " <<
+ (options.multiplex ? "SSL or TCP" : "SSL") <<
+ " connections on TCP port " <<
+ protocol->getPort());
broker->registerProtocolFactory("ssl", protocol);
} catch (const std::exception& e) {
QPID_LOG(error, "Failed to initialise SSL plugin: " << e.what());
@@ -114,13 +152,15 @@ static struct SslPlugin : public Plugin {
}
} sslPlugin;
-SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options, int backlog, bool nodelay) :
+template <class T>
+SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& options, int backlog, bool nodelay) :
tcpNoDelay(nodelay), listeningPort(listener.listen(options.port, backlog, options.certName, options.clientAuth)),
nodict(options.nodict)
{}
-void SslProtocolFactory::established(Poller::shared_ptr poller, const qpid::sys::ssl::SslSocket& s,
- ConnectionCodec::Factory* f, bool isClient) {
+void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s,
+ ConnectionCodec::Factory* f, bool isClient,
+ bool tcpNoDelay, bool nodict) {
qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getFullAddress(), f, nodict);
if (tcpNoDelay) {
@@ -128,8 +168,10 @@ void SslProtocolFactory::established(Poller::shared_ptr poller, const qpid::sys:
QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
}
- if (isClient)
+ if (isClient) {
async->setClient();
+ }
+
qpid::sys::ssl::SslIO* aio = new qpid::sys::ssl::SslIO(s,
boost::bind(&qpid::sys::ssl::SslHandler::readbuff, async, _1, _2),
boost::bind(&qpid::sys::ssl::SslHandler::eof, async, _1),
@@ -142,25 +184,66 @@ void SslProtocolFactory::established(Poller::shared_ptr poller, const qpid::sys:
aio->start(poller);
}
-uint16_t SslProtocolFactory::getPort() const {
- return listeningPort; // Immutable no need for lock.
+template <>
+void SslProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f, bool isClient) {
+ const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
+
+ SslEstablished(poller, *sslSock, f, isClient, tcpNoDelay, nodict);
}
-std::string SslProtocolFactory::getHost() const {
- return listener.getSockname();
+template <class T>
+uint16_t SslProtocolFactoryTmpl<T>::getPort() const {
+ return listeningPort; // Immutable no need for lock.
}
-void SslProtocolFactory::accept(Poller::shared_ptr poller,
- ConnectionCodec::Factory* fact) {
+template <class T>
+void SslProtocolFactoryTmpl<T>::accept(Poller::shared_ptr poller,
+ ConnectionCodec::Factory* fact) {
acceptor.reset(
- new qpid::sys::ssl::SslAcceptor(listener,
- boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false)));
+ new SslAcceptor(listener,
+ boost::bind(&SslProtocolFactoryTmpl<T>::established,
+ this, poller, _1, fact, false)));
acceptor->start(poller);
}
-void SslProtocolFactory::connect(
+template <>
+void SslMuxProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f, bool isClient) {
+ const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
+
+ if (sslSock) {
+ SslEstablished(poller, *sslSock, f, isClient, tcpNoDelay, nodict);
+ return;
+ }
+
+ AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f);
+
+ if (tcpNoDelay) {
+ s.setTcpNoDelay();
+ QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
+ }
+
+ if (isClient) {
+ async->setClient();
+ }
+ AsynchIO* aio = AsynchIO::create
+ (s,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
+
+ async->init(aio, 4);
+ aio->start(poller);
+}
+
+template <class T>
+void SslProtocolFactoryTmpl<T>::connect(
Poller::shared_ptr poller,
- const std::string& host, int16_t port,
+ const std::string& host, const std::string& port,
ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
{
@@ -171,9 +254,9 @@ void SslProtocolFactory::connect(
// is no longer needed.
qpid::sys::ssl::SslSocket* socket = new qpid::sys::ssl::SslSocket();
- new qpid::sys::ssl::SslConnector (*socket, poller, host, port,
- boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, true),
- failed);
+ new SslConnector(*socket, poller, host, port,
+ boost::bind(&SslProtocolFactoryTmpl<T>::established, this, poller, _1, fact, true),
+ failed);
}
namespace
@@ -181,6 +264,7 @@ namespace
const std::string SSL = "ssl";
}
+template <>
bool SslProtocolFactory::supports(const std::string& capability)
{
std::string s = capability;
@@ -188,4 +272,12 @@ bool SslProtocolFactory::supports(const std::string& capability)
return s == SSL;
}
+template <>
+bool SslMuxProtocolFactory::supports(const std::string& capability)
+{
+ std::string s = capability;
+ transform(s.begin(), s.end(), s.begin(), tolower);
+ return s == SSL || s == "tcp";
+}
+
}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/StateMonitor.h b/cpp/src/qpid/sys/StateMonitor.h
index 5a92756f3a..eac37a8543 100644
--- a/cpp/src/qpid/sys/StateMonitor.h
+++ b/cpp/src/qpid/sys/StateMonitor.h
@@ -41,9 +41,9 @@ class StateMonitor : public Waitable
struct Set : public std::bitset<MaxEnum + 1> {
Set() {}
Set(Enum s) { set(s); }
- Set(Enum s, Enum t) { set(s).set(t); }
- Set(Enum s, Enum t, Enum u) { set(s).set(t).set(u); }
- Set(Enum s, Enum t, Enum u, Enum v) { set(s).set(t).set(u).set(v); }
+ Set(Enum s, Enum t) { std::bitset<MaxEnum + 1>::set(s).set(t); }
+ Set(Enum s, Enum t, Enum u) { std::bitset<MaxEnum + 1>::set(s).set(t).set(u); }
+ Set(Enum s, Enum t, Enum u, Enum v) { std::bitset<MaxEnum + 1>::set(s).set(t).set(u).set(v); }
};
@@ -60,13 +60,13 @@ class StateMonitor : public Waitable
operator Enum() const { return state; }
/** @pre Caller holds a ScopedLock */
- void waitFor(Enum s) { ScopedWait(*this); while (s != state) wait(); }
+ void waitFor(Enum s) { ScopedWait w(*this); while (s != state) wait(); }
/** @pre Caller holds a ScopedLock */
- void waitFor(Set s) { ScopedWait(*this); while (!s.test(state)) wait(); }
+ void waitFor(Set s) { ScopedWait w(*this); while (!s.test(state)) wait(); }
/** @pre Caller holds a ScopedLock */
- void waitNot(Enum s) { ScopedWait(*this); while (s == state) wait(); }
+ void waitNot(Enum s) { ScopedWait w(*this); while (s == state) wait(); }
/** @pre Caller holds a ScopedLock */
- void waitNot(Set s) { ScopedWait(*this); while (s.test(state)) wait(); }
+ void waitNot(Set s) { ScopedWait w(*this); while (s.test(state)) wait(); }
private:
Enum state;
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp
index a6528f9ad9..8a99d8db71 100644
--- a/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -25,31 +25,31 @@
#include "qpid/Plugin.h"
#include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/Poller.h"
#include "qpid/broker/Broker.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
-#include <memory>
+#include <boost/ptr_container/ptr_vector.hpp>
namespace qpid {
namespace sys {
class AsynchIOProtocolFactory : public ProtocolFactory {
const bool tcpNoDelay;
- Socket listener;
- const uint16_t listeningPort;
- std::auto_ptr<AsynchAcceptor> acceptor;
+ boost::ptr_vector<Socket> listeners;
+ boost::ptr_vector<AsynchAcceptor> acceptors;
+ uint16_t listeningPort;
public:
- AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay);
+ AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay, bool shouldListen);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& host, int16_t port,
+ void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
ConnectFailedCallback);
uint16_t getPort() const;
- std::string getHost() const;
private:
void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
@@ -57,27 +57,78 @@ class AsynchIOProtocolFactory : public ProtocolFactory {
void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
};
+static bool sslMultiplexEnabled(void)
+{
+ Options o;
+ Plugin::addOptions(o);
+
+ if (o.find_nothrow("ssl-multiplex", false)) {
+ // This option is added by the SSL plugin when the SSL port
+ // is configured to be the same as the main port.
+ QPID_LOG(notice, "SSL multiplexing enabled");
+ return true;
+ }
+ return false;
+}
+
// Static instance to initialise plugin
static class TCPIOPlugin : public Plugin {
void earlyInitialize(Target&) {
}
-
+
void initialize(Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
// Only provide to a Broker
if (broker) {
const broker::Broker::Options& opts = broker->getOptions();
- ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog,
- opts.tcpNoDelay));
- QPID_LOG(notice, "Listening on TCP port " << protocol->getPort());
- broker->registerProtocolFactory("tcp", protocol);
+
+ // Check for SSL on the same port
+ bool shouldListen = !sslMultiplexEnabled();
+
+ ProtocolFactory::shared_ptr protocolt(
+ new AsynchIOProtocolFactory(
+ "", boost::lexical_cast<std::string>(opts.port),
+ opts.connectionBacklog,
+ opts.tcpNoDelay,
+ shouldListen));
+ if (shouldListen) {
+ QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort());
+ }
+ broker->registerProtocolFactory("tcp", protocolt);
}
}
} tcpPlugin;
-AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay) :
- tcpNoDelay(nodelay), listeningPort(listener.listen(port, backlog))
-{}
+AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay, bool shouldListen) :
+ tcpNoDelay(nodelay)
+{
+ if (!shouldListen) {
+ return;
+ }
+
+ SocketAddress sa(host, port);
+
+ // We must have at least one resolved address
+ QPID_LOG(info, "Listening to: " << sa.asString())
+ Socket* s = new Socket;
+ uint16_t lport = s->listen(sa, backlog);
+ QPID_LOG(debug, "Listened to: " << lport);
+ listeners.push_back(s);
+
+ listeningPort = lport;
+
+ // Try any other resolved addresses
+ while (sa.nextAddress()) {
+ // Hack to ensure that all listening connections are on the same port
+ sa.setAddrInfoPort(listeningPort);
+ QPID_LOG(info, "Listening to: " << sa.asString())
+ Socket* s = new Socket;
+ uint16_t lport = s->listen(sa, backlog);
+ QPID_LOG(debug, "Listened to: " << lport);
+ listeners.push_back(s);
+ }
+
+}
void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
ConnectionCodec::Factory* f, bool isClient) {
@@ -107,16 +158,14 @@ uint16_t AsynchIOProtocolFactory::getPort() const {
return listeningPort; // Immutable no need for lock.
}
-std::string AsynchIOProtocolFactory::getHost() const {
- return listener.getSockname();
-}
-
void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
ConnectionCodec::Factory* fact) {
- acceptor.reset(
- AsynchAcceptor::create(listener,
- boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
- acceptor->start(poller);
+ for (unsigned i = 0; i<listeners.size(); ++i) {
+ acceptors.push_back(
+ AsynchAcceptor::create(listeners[i],
+ boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
+ acceptors[i].start(poller);
+ }
}
void AsynchIOProtocolFactory::connectFailed(
@@ -130,7 +179,7 @@ void AsynchIOProtocolFactory::connectFailed(
void AsynchIOProtocolFactory::connect(
Poller::shared_ptr poller,
- const std::string& host, int16_t port,
+ const std::string& host, const std::string& port,
ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
{
@@ -139,8 +188,8 @@ void AsynchIOProtocolFactory::connect(
// upon connection failure or by the AsynchIO upon connection
// shutdown. The allocated AsynchConnector frees itself when it
// is no longer needed.
-
Socket* socket = new Socket();
+ try {
AsynchConnector* c = AsynchConnector::create(
*socket,
host,
@@ -150,6 +199,12 @@ void AsynchIOProtocolFactory::connect(
boost::bind(&AsynchIOProtocolFactory::connectFailed,
this, _1, _2, _3, failed));
c->start(poller);
+ } catch (std::exception&) {
+ // TODO: Design question - should we do the error callback and also throw?
+ int errCode = socket->getError();
+ connectFailed(*socket, errCode, strError(errCode), failed);
+ throw;
+ }
}
}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/Timer.cpp b/cpp/src/qpid/sys/Timer.cpp
index a97ccd1bd1..47752e4584 100644
--- a/cpp/src/qpid/sys/Timer.cpp
+++ b/cpp/src/qpid/sys/Timer.cpp
@@ -75,6 +75,12 @@ void TimerTask::cancel() {
cancelled = true;
}
+void TimerTask::setFired() {
+ // Set nextFireTime to just before now, making readyToFire() true.
+ nextFireTime = AbsTime(sys::now(), Duration(-1));
+}
+
+
Timer::Timer() :
active(false),
late(50 * TIME_MSEC),
@@ -131,12 +137,14 @@ void Timer::run()
bool warningsEnabled;
QPID_LOG_TEST(warning, warningsEnabled);
if (warningsEnabled) {
- if (delay > late && overrun > overran)
- warn.lateAndOverran(t->name, delay, overrun, Duration(start, end));
+ if (overrun > overran) {
+ if (delay > overran) // if delay is significant to an overrun.
+ warn.lateAndOverran(t->name, delay, overrun, Duration(start, end));
+ else
+ warn.overran(t->name, overrun, Duration(start, end));
+ }
else if (delay > late)
warn.late(t->name, delay);
- else if (overrun > overran)
- warn.overran(t->name, overrun, Duration(start, end));
}
continue;
} else {
@@ -183,7 +191,11 @@ void Timer::stop()
// Allow subclasses to override behavior when firing a task.
void Timer::fire(boost::intrusive_ptr<TimerTask> t) {
- t->fireTask();
+ try {
+ t->fireTask();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Exception thrown by timer task " << t->getName() << ": " << e.what());
+ }
}
// Provided for subclasses: called when a task is droped.
diff --git a/cpp/src/qpid/sys/Timer.h b/cpp/src/qpid/sys/Timer.h
index 98ba39ce38..fccb17dbc2 100644
--- a/cpp/src/qpid/sys/Timer.h
+++ b/cpp/src/qpid/sys/Timer.h
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -64,6 +64,10 @@ class TimerTask : public RefCounted {
std::string getName() const { return name; }
+ // Move the nextFireTime so readyToFire is true.
+ // Used by the cluster, where tasks are fired on cluster events, not on local time.
+ QPID_COMMON_EXTERN void setFired();
+
protected:
// Must be overridden with callback
virtual void fire() = 0;
diff --git a/cpp/src/qpid/sys/TimerWarnings.cpp b/cpp/src/qpid/sys/TimerWarnings.cpp
index 48a56eb472..85e26da54a 100644
--- a/cpp/src/qpid/sys/TimerWarnings.cpp
+++ b/cpp/src/qpid/sys/TimerWarnings.cpp
@@ -56,20 +56,22 @@ void TimerWarnings::log() {
std::string task = i->first;
TaskStats& stats = i->second;
if (stats.lateDelay.count)
- QPID_LOG(warning, task << " task late "
+ QPID_LOG(info, task << " task late "
<< stats.lateDelay.count << " times by "
<< stats.lateDelay.average()/TIME_MSEC << "ms on average.");
+
if (stats.overranOverrun.count)
- QPID_LOG(warning, task << " task overran "
+ QPID_LOG(info, task << " task overran "
<< stats.overranOverrun.count << " times by "
<< stats.overranOverrun.average()/TIME_MSEC << "ms (taking "
<< stats.overranTime.average() << "ns) on average.");
- if (stats.lateAndOverranDelay.count)
- QPID_LOG(warning, task << " task overran "
- << stats.overranOverrun.count << " times by "
- << stats.overranOverrun.average()/TIME_MSEC << "ms (taking "
- << stats.overranTime.average() << "ns) on average.");
+ if (stats.lateAndOverranOverrun.count)
+ QPID_LOG(info, task << " task late and overran "
+ << stats.lateAndOverranOverrun.count << " times: late "
+ << stats.lateAndOverranDelay.average()/TIME_MSEC << "ms, overran "
+ << stats.lateAndOverranOverrun.average()/TIME_MSEC << "ms (taking "
+ << stats.lateAndOverranTime.average() << "ns) on average.");
}
nextReport = AbsTime(now(), interval);
diff --git a/cpp/src/qpid/sys/alloca.h b/cpp/src/qpid/sys/alloca.h
index e989670e4f..b3f59b7c3f 100644
--- a/cpp/src/qpid/sys/alloca.h
+++ b/cpp/src/qpid/sys/alloca.h
@@ -21,19 +21,22 @@
*
*/
-#if (defined(_WINDOWS) || defined (WIN32)) && defined(_MSC_VER)
-#include <malloc.h>
-#ifdef alloc
-# undef alloc
-#endif
-#define alloc _alloc
-#ifdef alloca
-# undef alloca
-#endif
-#define alloca _alloca
+#if (defined(_WINDOWS) || defined (WIN32))
+# include <malloc.h>
+
+# if defined(_MSC_VER)
+# ifdef alloc
+# undef alloc
+# endif
+# define alloc _alloc
+# ifdef alloca
+# undef alloca
+# endif
+# define alloca _alloca
+# endif
#endif
#if !defined _WINDOWS && !defined WIN32
-#include <alloca.h>
+# include <alloca.h>
#endif
#endif /*!QPID_SYS_ALLOCA_H*/
diff --git a/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp b/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp
index 454ce62495..249b769051 100644
--- a/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp
+++ b/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp
@@ -57,6 +57,7 @@ size_t CyrusSecurityLayer::decode(const char* input, size_t size)
copied += count;
decodeBuffer.position += count;
size_t decodedSize = codec->decode(decodeBuffer.data, decodeBuffer.position);
+ if (decodedSize == 0) break;
if (decodedSize < decodeBuffer.position) {
::memmove(decodeBuffer.data, decodeBuffer.data + decodedSize, decodeBuffer.position - decodedSize);
}
@@ -106,7 +107,7 @@ size_t CyrusSecurityLayer::encode(const char* buffer, size_t size)
bool CyrusSecurityLayer::canEncode()
{
- return encrypted || codec->canEncode();
+ return codec && (encrypted || codec->canEncode());
}
void CyrusSecurityLayer::init(qpid::sys::Codec* c)
diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
index 9ad05c71a3..dcc9d9181c 100644
--- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp
+++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
@@ -384,7 +384,12 @@ void PollerPrivate::resetMode(PollerHandlePrivate& eh) {
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
+ int rc = ::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd(), &epe);
+ // If something has closed the fd in the meantime try adding it back
+ if (rc ==-1 && errno == ENOENT) {
+ rc = ::epoll_ctl(epollFd, EPOLL_CTL_ADD, eh.fd(), &epe);
+ }
+ QPID_POSIX_CHECK(rc);
eh.setActive();
return;
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 119a6aa8a4..dab8bd09c6 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -149,11 +149,12 @@ private:
ConnectedCallback connCallback;
FailedCallback failCallback;
const Socket& socket;
+ SocketAddress sa;
public:
AsynchConnector(const Socket& socket,
- std::string hostname,
- uint16_t port,
+ const std::string& hostname,
+ const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb);
void start(Poller::shared_ptr poller);
@@ -161,8 +162,8 @@ public:
};
AsynchConnector::AsynchConnector(const Socket& s,
- std::string hostname,
- uint16_t port,
+ const std::string& hostname,
+ const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb) :
DispatchHandle(s,
@@ -171,11 +172,13 @@ AsynchConnector::AsynchConnector(const Socket& s,
boost::bind(&AsynchConnector::connComplete, this, _1)),
connCallback(connCb),
failCallback(failCb),
- socket(s)
+ socket(s),
+ sa(hostname, port)
{
socket.setNonblocking();
- SocketAddress sa(hostname, boost::lexical_cast<std::string>(port));
+
// Note, not catching any exceptions here, also has effect of destructing
+ QPID_LOG(info, "Connecting: " << sa.asString());
socket.connect(sa);
}
@@ -191,11 +194,26 @@ void AsynchConnector::stop()
void AsynchConnector::connComplete(DispatchHandle& h)
{
- h.stopWatch();
int errCode = socket.getError();
if (errCode == 0) {
+ h.stopWatch();
connCallback(socket);
} else {
+ // Retry while we cause an immediate exception
+ // (asynch failure will be handled by re-entering here at the top)
+ while (sa.nextAddress()) {
+ try {
+ // Try next address without deleting ourselves
+ QPID_LOG(debug, "Ignored socket connect error: " << strError(errCode));
+ QPID_LOG(info, "Retrying connect: " << sa.asString());
+ socket.connect(sa);
+ return;
+ } catch (const std::exception& e) {
+ QPID_LOG(debug, "Ignored socket connect exception: " << e.what());
+ }
+ errCode = socket.getError();
+ }
+ h.stopWatch();
failCallback(socket, errCode, strError(errCode));
}
DispatchHandle::doDelete();
@@ -589,8 +607,8 @@ AsynchAcceptor* AsynchAcceptor::create(const Socket& s,
}
AsynchConnector* AsynchConnector::create(const Socket& s,
- std::string hostname,
- uint16_t port,
+ const std::string& hostname,
+ const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb)
{
diff --git a/cpp/src/qpid/sys/posix/LockFile.cpp b/cpp/src/qpid/sys/posix/LockFile.cpp
index 1862ff6ac9..f5a6c292cb 100755
--- a/cpp/src/qpid/sys/posix/LockFile.cpp
+++ b/cpp/src/qpid/sys/posix/LockFile.cpp
@@ -58,8 +58,7 @@ LockFile::~LockFile() {
if (impl) {
int f = impl->fd;
if (f >= 0) {
- int unused_ret;
- unused_ret = ::lockf(f, F_ULOCK, 0); // Suppress warnings about ignoring return value.
+ (void) ::lockf(f, F_ULOCK, 0); // Suppress warnings about ignoring return value.
::close(f);
impl->fd = -1;
}
diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp
index 7b906f33e8..4a6dc66f80 100644
--- a/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/cpp/src/qpid/sys/posix/Socket.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,65 +34,35 @@
#include <netdb.h>
#include <cstdlib>
#include <string.h>
-#include <iostream>
-
-#include <boost/format.hpp>
-#include <boost/lexical_cast.hpp>
namespace qpid {
namespace sys {
namespace {
-std::string getName(int fd, bool local, bool includeService = false)
+std::string getName(int fd, bool local)
{
- ::sockaddr_storage name; // big enough for any socket address
- ::socklen_t namelen = sizeof(name);
-
- int result = -1;
+ ::sockaddr_storage name_s; // big enough for any socket address
+ ::sockaddr* name = (::sockaddr*)&name_s;
+ ::socklen_t namelen = sizeof(name_s);
+
if (local) {
- result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
+ QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) );
} else {
- result = ::getpeername(fd, (::sockaddr*)&name, &namelen);
+ QPID_POSIX_CHECK( ::getpeername(fd, name, &namelen) );
}
- QPID_POSIX_CHECK(result);
-
- char servName[NI_MAXSERV];
- char dispName[NI_MAXHOST];
- if (includeService) {
- if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName),
- servName, sizeof(servName),
- NI_NUMERICHOST | NI_NUMERICSERV) != 0)
- throw QPID_POSIX_ERROR(rc);
- return std::string(dispName) + ":" + std::string(servName);
-
- } else {
- if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), 0, 0, NI_NUMERICHOST) != 0)
- throw QPID_POSIX_ERROR(rc);
- return dispName;
- }
+ return SocketAddress::asString(name, namelen);
}
-std::string getService(int fd, bool local)
+uint16_t getLocalPort(int fd)
{
- ::sockaddr_storage name; // big enough for any socket address
- ::socklen_t namelen = sizeof(name);
-
- int result = -1;
- if (local) {
- result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
- } else {
- result = ::getpeername(fd, (::sockaddr*)&name, &namelen);
- }
+ ::sockaddr_storage name_s; // big enough for any socket address
+ ::sockaddr* name = (::sockaddr*)&name_s;
+ ::socklen_t namelen = sizeof(name_s);
- QPID_POSIX_CHECK(result);
+ QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) );
- char servName[NI_MAXSERV];
- if (int rc=::getnameinfo((::sockaddr*)&name, namelen, 0, 0,
- servName, sizeof(servName),
- NI_NUMERICHOST | NI_NUMERICSERV) != 0)
- throw QPID_POSIX_ERROR(rc);
- return servName;
+ return SocketAddress::getPort(name);
}
}
@@ -119,6 +89,11 @@ void Socket::createSocket(const SocketAddress& sa) const
try {
if (nonblocking) setNonblocking();
if (nodelay) setTcpNoDelay();
+ if (getAddrInfo(sa).ai_family == AF_INET6) {
+ int flag = 1;
+ int result = ::setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&flag, sizeof(flag));
+ QPID_POSIX_CHECK(result);
+ }
} catch (std::exception&) {
::close(s);
socket = -1;
@@ -126,13 +101,18 @@ void Socket::createSocket(const SocketAddress& sa) const
}
}
-void Socket::setTimeout(const Duration& interval) const
-{
- const int& socket = impl->fd;
- struct timeval tv;
- toTimeval(tv, interval);
- setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
- setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
+Socket* Socket::createSameTypeSocket() const {
+ int& socket = impl->fd;
+ // Socket currently has no actual socket attached
+ if (socket == -1)
+ return new Socket;
+
+ ::sockaddr_storage sa;
+ ::socklen_t salen = sizeof(sa);
+ QPID_POSIX_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen));
+ int s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
+ if (s < 0) throw QPID_POSIX_ERROR(errno);
+ return new Socket(new IOHandlePrivate(s));
}
void Socket::setNonblocking() const {
@@ -149,20 +129,27 @@ void Socket::setTcpNoDelay() const
nodelay = true;
if (socket != -1) {
int flag = 1;
- int result = setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
+ int result = ::setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
QPID_POSIX_CHECK(result);
}
}
-void Socket::connect(const std::string& host, uint16_t port) const
+void Socket::connect(const std::string& host, const std::string& port) const
{
- SocketAddress sa(host, boost::lexical_cast<std::string>(port));
+ SocketAddress sa(host, port);
connect(sa);
}
void Socket::connect(const SocketAddress& addr) const
{
- connectname = addr.asString();
+ // The display name for an outbound connection needs to be the name that was specified
+ // for the address rather than a resolved IP address as we don't know which of
+ // the IP addresses is actually the one that will be connected to.
+ peername = addr.asString(false);
+
+ // However the string we compare with the local port must be numeric or it might not
+ // match when it should as getLocalAddress() will always be numeric
+ std::string connectname = addr.asString();
createSocket(addr);
@@ -170,7 +157,24 @@ void Socket::connect(const SocketAddress& addr) const
// TODO the correct thing to do here is loop on failure until you've used all the returned addresses
if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) &&
(errno != EINPROGRESS)) {
- throw Exception(QPID_MSG(strError(errno) << ": " << connectname));
+ throw Exception(QPID_MSG(strError(errno) << ": " << peername));
+ }
+ // When connecting to a port on the same host which no longer has
+ // a process associated with it, the OS occasionally chooses the
+ // remote port (which is unoccupied) as the port to bind the local
+ // end of the socket, resulting in a "circular" connection.
+ //
+ // This seems like something the OS should prevent but I have
+ // confirmed that sporadic hangs in
+ // cluster_tests.LongTests.test_failover on RHEL5 are caused by
+ // such a circular connection.
+ //
+ // Raise an error if we see such a connection, since we know there is
+ // no listener on the peer address.
+ //
+ if (getLocalAddress() == connectname) {
+ close();
+ throw Exception(QPID_MSG("Connection refused: " << peername));
}
}
@@ -183,9 +187,9 @@ Socket::close() const
socket = -1;
}
-int Socket::listen(uint16_t port, int backlog) const
+int Socket::listen(const std::string& host, const std::string& port, int backlog) const
{
- SocketAddress sa("", boost::lexical_cast<std::string>(port));
+ SocketAddress sa(host, port);
return listen(sa, backlog);
}
@@ -195,26 +199,24 @@ int Socket::listen(const SocketAddress& sa, int backlog) const
const int& socket = impl->fd;
int yes=1;
- QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
+ QPID_POSIX_CHECK(::setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0)
throw Exception(QPID_MSG("Can't bind to port " << sa.asString() << ": " << strError(errno)));
if (::listen(socket, backlog) < 0)
throw Exception(QPID_MSG("Can't listen on port " << sa.asString() << ": " << strError(errno)));
- struct sockaddr_in name;
- socklen_t namelen = sizeof(name);
- if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0)
- throw QPID_POSIX_ERROR(errno);
-
- return ntohs(name.sin_port);
+ return getLocalPort(socket);
}
Socket* Socket::accept() const
{
int afd = ::accept(impl->fd, 0, 0);
- if ( afd >= 0)
- return new Socket(new IOHandlePrivate(afd));
+ if ( afd >= 0) {
+ Socket* s = new Socket(new IOHandlePrivate(afd));
+ s->localname = localname;
+ return s;
+ }
else if (errno == EAGAIN)
return 0;
else throw QPID_POSIX_ERROR(errno);
@@ -230,37 +232,20 @@ int Socket::write(const void *buf, size_t count) const
return ::write(impl->fd, buf, count);
}
-std::string Socket::getSockname() const
-{
- return getName(impl->fd, true);
-}
-
-std::string Socket::getPeername() const
-{
- return getName(impl->fd, false);
-}
-
std::string Socket::getPeerAddress() const
{
- if (connectname.empty()) {
- connectname = getName(impl->fd, false, true);
+ if (peername.empty()) {
+ peername = getName(impl->fd, false);
}
- return connectname;
+ return peername;
}
std::string Socket::getLocalAddress() const
{
- return getName(impl->fd, true, true);
-}
-
-uint16_t Socket::getLocalPort() const
-{
- return std::atoi(getService(impl->fd, true).c_str());
-}
-
-uint16_t Socket::getRemotePort() const
-{
- return std::atoi(getService(impl->fd, true).c_str());
+ if (localname.empty()) {
+ localname = getName(impl->fd, true);
+ }
+ return localname;
}
int Socket::getError() const
diff --git a/cpp/src/qpid/sys/posix/SocketAddress.cpp b/cpp/src/qpid/sys/posix/SocketAddress.cpp
index 8f5f29d793..077942ef2f 100644
--- a/cpp/src/qpid/sys/posix/SocketAddress.cpp
+++ b/cpp/src/qpid/sys/posix/SocketAddress.cpp
@@ -21,11 +21,13 @@
#include "qpid/sys/SocketAddress.h"
-#include "qpid/sys/posix/check.h"
+#include "qpid/Exception.h"
+#include "qpid/Msg.h"
#include <sys/socket.h>
-#include <string.h>
+#include <netinet/in.h>
#include <netdb.h>
+#include <string.h>
namespace qpid {
namespace sys {
@@ -46,15 +48,9 @@ SocketAddress::SocketAddress(const SocketAddress& sa) :
SocketAddress& SocketAddress::operator=(const SocketAddress& sa)
{
- if (&sa != this) {
- host = sa.host;
- port = sa.port;
+ SocketAddress temp(sa);
- if (addrInfo) {
- ::freeaddrinfo(addrInfo);
- addrInfo = 0;
- }
- }
+ std::swap(temp, *this);
return *this;
}
@@ -65,9 +61,61 @@ SocketAddress::~SocketAddress()
}
}
-std::string SocketAddress::asString() const
+std::string SocketAddress::asString(::sockaddr const * const addr, size_t addrlen)
+{
+ char servName[NI_MAXSERV];
+ char dispName[NI_MAXHOST];
+ if (int rc=::getnameinfo(addr, addrlen,
+ dispName, sizeof(dispName),
+ servName, sizeof(servName),
+ NI_NUMERICHOST | NI_NUMERICSERV) != 0)
+ throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
+ std::string s;
+ switch (addr->sa_family) {
+ case AF_INET: s += dispName; break;
+ case AF_INET6: s += "["; s += dispName; s+= "]"; break;
+ default: throw Exception(QPID_MSG("Unexpected socket type"));
+ }
+ s += ":";
+ s += servName;
+ return s;
+}
+
+uint16_t SocketAddress::getPort(::sockaddr const * const addr)
{
- return host + ":" + port;
+ switch (addr->sa_family) {
+ case AF_INET: return ntohs(((::sockaddr_in*)addr)->sin_port);
+ case AF_INET6: return ntohs(((::sockaddr_in6*)addr)->sin6_port);
+ default:throw Exception(QPID_MSG("Unexpected socket type"));
+ }
+}
+
+std::string SocketAddress::asString(bool numeric) const
+{
+ if (!numeric)
+ return host + ":" + port;
+ // Canonicalise into numeric id
+ const ::addrinfo& ai = getAddrInfo(*this);
+
+ return asString(ai.ai_addr, ai.ai_addrlen);
+}
+
+bool SocketAddress::nextAddress() {
+ bool r = currentAddrInfo->ai_next != 0;
+ if (r)
+ currentAddrInfo = currentAddrInfo->ai_next;
+ return r;
+}
+
+void SocketAddress::setAddrInfoPort(uint16_t port) {
+ if (!currentAddrInfo) return;
+
+ ::addrinfo& ai = *currentAddrInfo;
+ switch (ai.ai_family) {
+ case AF_INET: ((::sockaddr_in*)ai.ai_addr)->sin_port = htons(port); return;
+ case AF_INET6:((::sockaddr_in6*)ai.ai_addr)->sin6_port = htons(port); return;
+ default: throw Exception(QPID_MSG("Unexpected socket type"));
+ }
}
const ::addrinfo& getAddrInfo(const SocketAddress& sa)
@@ -75,7 +123,8 @@ const ::addrinfo& getAddrInfo(const SocketAddress& sa)
if (!sa.addrInfo) {
::addrinfo hints;
::memset(&hints, 0, sizeof(hints));
- hints.ai_family = AF_INET; // Change this to support IPv6
+ hints.ai_flags = AI_ADDRCONFIG; // Only use protocols that we have configured interfaces for
+ hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6
hints.ai_socktype = SOCK_STREAM;
const char* node = 0;
@@ -88,10 +137,11 @@ const ::addrinfo& getAddrInfo(const SocketAddress& sa)
int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo);
if (n != 0)
- throw Exception(QPID_MSG("Cannot resolve " << sa.host << ": " << ::gai_strerror(n)));
+ throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n)));
+ sa.currentAddrInfo = sa.addrInfo;
}
- return *sa.addrInfo;
+ return *sa.currentAddrInfo;
}
}}
diff --git a/cpp/src/qpid/sys/posix/Thread.cpp b/cpp/src/qpid/sys/posix/Thread.cpp
index b466733260..a1d6396763 100644
--- a/cpp/src/qpid/sys/posix/Thread.cpp
+++ b/cpp/src/qpid/sys/posix/Thread.cpp
@@ -37,7 +37,8 @@ void* runRunnable(void* p)
}
}
-struct ThreadPrivate {
+class ThreadPrivate {
+public:
pthread_t thread;
ThreadPrivate(Runnable* runnable) {
diff --git a/cpp/src/qpid/sys/posix/Time.cpp b/cpp/src/qpid/sys/posix/Time.cpp
index b3858279b4..9661f0c5e8 100644
--- a/cpp/src/qpid/sys/posix/Time.cpp
+++ b/cpp/src/qpid/sys/posix/Time.cpp
@@ -27,6 +27,7 @@
#include <stdio.h>
#include <sys/time.h>
#include <unistd.h>
+#include <iomanip>
namespace {
int64_t max_abstime() { return std::numeric_limits<int64_t>::max(); }
@@ -103,6 +104,12 @@ void outputFormattedNow(std::ostream& o) {
o << " ";
}
+void outputHiresNow(std::ostream& o) {
+ ::timespec time;
+ ::clock_gettime(CLOCK_REALTIME, &time);
+ o << time.tv_sec << "." << std::setw(9) << std::setfill('0') << time.tv_nsec << "s ";
+}
+
void sleep(int secs) {
::sleep(secs);
}
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
index c80c94cba6..78bcdec68e 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
@@ -140,8 +140,8 @@ namespace Rdma {
// Prepost recv buffers before we go any further
qp->allocateRecvBuffers(recvBufferCount, bufferSize+FrameHeaderSize);
- // Create xmit buffers
- qp->createSendBuffers(xmitBufferCount, bufferSize+FrameHeaderSize);
+ // Create xmit buffers, reserve space for frame header.
+ qp->createSendBuffers(xmitBufferCount, bufferSize, FrameHeaderSize);
}
AsynchIO::~AsynchIO() {
@@ -210,12 +210,14 @@ namespace Rdma {
}
break;
case 1:
- Buffer* ob = buff ? buff : getSendBuffer();
+ if (!buff)
+ buff = getSendBuffer();
// Add FrameHeader after frame data
FrameHeader header(credit);
- ::memcpy(ob->bytes()+ob->dataCount(), &header, FrameHeaderSize);
- ob->dataCount(ob->dataCount()+FrameHeaderSize);
- qp->postSend(ob);
+ assert(buff->dataCount() <= buff->byteCount()); // ensure app data doesn't impinge on reserved space.
+ ::memcpy(buff->bytes()+buff->dataCount(), &header, FrameHeaderSize);
+ buff->dataCount(buff->dataCount()+FrameHeaderSize);
+ qp->postSend(buff);
break;
}
}
diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
index 6d38c42502..efe454c5be 100644
--- a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
+++ b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
@@ -50,8 +50,9 @@ namespace Rdma {
return count;
}
- Buffer::Buffer(uint32_t lkey, char* bytes, const int32_t byteCount) :
- bufferSize(byteCount)
+ Buffer::Buffer(uint32_t lkey, char* bytes, const int32_t byteCount,
+ const int32_t reserve) :
+ bufferSize(byteCount + reserve), reserved(reserve)
{
sge.addr = (uintptr_t) bytes;
sge.length = 0;
@@ -163,21 +164,21 @@ namespace Rdma {
}
// Create buffers to use for writing
- void QueuePair::createSendBuffers(int sendBufferCount, int bufferSize)
+ void QueuePair::createSendBuffers(int sendBufferCount, int bufferSize, int reserved)
{
assert(!smr);
// Round up buffersize to cacheline (64 bytes)
- bufferSize = (bufferSize+63) & (~63);
+ int dataLength = (bufferSize+reserved+63) & (~63);
// Allocate memory block for all receive buffers
- char* mem = new char [sendBufferCount * bufferSize];
- smr = regMr(pd.get(), mem, sendBufferCount * bufferSize, ::IBV_ACCESS_LOCAL_WRITE);
+ char* mem = new char [sendBufferCount * dataLength];
+ smr = regMr(pd.get(), mem, sendBufferCount * dataLength, ::IBV_ACCESS_LOCAL_WRITE);
sendBuffers.reserve(sendBufferCount);
freeBuffers.reserve(sendBufferCount);
for (int i = 0; i<sendBufferCount; ++i) {
// Allocate xmit buffer
- sendBuffers.push_back(Buffer(smr->lkey, &mem[i*bufferSize], bufferSize));
+ sendBuffers.push_back(Buffer(smr->lkey, &mem[i*dataLength], bufferSize, reserved));
freeBuffers.push_back(i);
}
}
diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.h b/cpp/src/qpid/sys/rdma/rdma_wrap.h
index 28bddd2165..8e3429027b 100644
--- a/cpp/src/qpid/sys/rdma/rdma_wrap.h
+++ b/cpp/src/qpid/sys/rdma/rdma_wrap.h
@@ -57,8 +57,9 @@ namespace Rdma {
void dataCount(int32_t);
private:
- Buffer(uint32_t lkey, char* bytes, const int32_t byteCount);
+ Buffer(uint32_t lkey, char* bytes, const int32_t byteCount, const int32_t reserve=0);
int32_t bufferSize;
+ int32_t reserved; // for framing header
::ibv_sge sge;
};
@@ -66,8 +67,9 @@ namespace Rdma {
return (char*) sge.addr;
}
+ /** return the number of bytes available for application data */
inline int32_t Buffer::byteCount() const {
- return bufferSize;
+ return bufferSize - reserved;
}
inline int32_t Buffer::dataCount() const {
@@ -75,6 +77,8 @@ namespace Rdma {
}
inline void Buffer::dataCount(int32_t s) {
+ // catch any attempt to overflow a buffer
+ assert(s <= bufferSize + reserved);
sge.length = s;
}
@@ -136,7 +140,7 @@ namespace Rdma {
typedef boost::intrusive_ptr<QueuePair> intrusive_ptr;
// Create a buffers to use for writing
- void createSendBuffers(int sendBufferCount, int bufferSize);
+ void createSendBuffers(int sendBufferCount, int dataSize, int headerSize);
// Get a send buffer
Buffer* getSendBuffer();
diff --git a/cpp/src/qpid/sys/ssl/SslHandler.h b/cpp/src/qpid/sys/ssl/SslHandler.h
index a340109966..400fa317fd 100644
--- a/cpp/src/qpid/sys/ssl/SslHandler.h
+++ b/cpp/src/qpid/sys/ssl/SslHandler.h
@@ -35,7 +35,7 @@ namespace sys {
namespace ssl {
class SslIO;
-class SslIOBufferBase;
+struct SslIOBufferBase;
class SslSocket;
class SslHandler : public OutputControl {
diff --git a/cpp/src/qpid/sys/ssl/SslIo.cpp b/cpp/src/qpid/sys/ssl/SslIo.cpp
index a58a137473..4a59819183 100644
--- a/cpp/src/qpid/sys/ssl/SslIo.cpp
+++ b/cpp/src/qpid/sys/ssl/SslIo.cpp
@@ -68,29 +68,33 @@ __thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms
* Asynch Acceptor
*/
-SslAcceptor::SslAcceptor(const SslSocket& s, Callback callback) :
+template <class T>
+SslAcceptorTmpl<T>::SslAcceptorTmpl(const T& s, Callback callback) :
acceptedCallback(callback),
- handle(s, boost::bind(&SslAcceptor::readable, this, _1), 0, 0),
+ handle(s, boost::bind(&SslAcceptorTmpl<T>::readable, this, _1), 0, 0),
socket(s) {
s.setNonblocking();
ignoreSigpipe();
}
-SslAcceptor::~SslAcceptor()
+template <class T>
+SslAcceptorTmpl<T>::~SslAcceptorTmpl()
{
handle.stopWatch();
}
-void SslAcceptor::start(Poller::shared_ptr poller) {
+template <class T>
+void SslAcceptorTmpl<T>::start(Poller::shared_ptr poller) {
handle.startWatch(poller);
}
/*
* We keep on accepting as long as there is something to accept
*/
-void SslAcceptor::readable(DispatchHandle& h) {
- SslSocket* s;
+template <class T>
+void SslAcceptorTmpl<T>::readable(DispatchHandle& h) {
+ Socket* s;
do {
errno = 0;
// TODO: Currently we ignore the peers address, perhaps we should
@@ -110,6 +114,10 @@ void SslAcceptor::readable(DispatchHandle& h) {
h.rewatch();
}
+// Explicitly instantiate the templates we need
+template class SslAcceptorTmpl<SslSocket>;
+template class SslAcceptorTmpl<SslMuxSocket>;
+
/*
* Asynch Connector
*/
@@ -117,7 +125,7 @@ void SslAcceptor::readable(DispatchHandle& h) {
SslConnector::SslConnector(const SslSocket& s,
Poller::shared_ptr poller,
std::string hostname,
- uint16_t port,
+ std::string port,
ConnectedCallback connCb,
FailedCallback failCb) :
DispatchHandle(s,
diff --git a/cpp/src/qpid/sys/ssl/SslIo.h b/cpp/src/qpid/sys/ssl/SslIo.h
index 53ac69d8d6..c980d73831 100644
--- a/cpp/src/qpid/sys/ssl/SslIo.h
+++ b/cpp/src/qpid/sys/ssl/SslIo.h
@@ -29,26 +29,30 @@
namespace qpid {
namespace sys {
+
+class Socket;
+
namespace ssl {
-
+
class SslSocket;
/*
* Asynchronous ssl acceptor: accepts connections then does a callback
* with the accepted fd
*/
-class SslAcceptor {
+template <class T>
+class SslAcceptorTmpl {
public:
- typedef boost::function1<void, const SslSocket&> Callback;
+ typedef boost::function1<void, const Socket&> Callback;
private:
Callback acceptedCallback;
qpid::sys::DispatchHandle handle;
- const SslSocket& socket;
+ const T& socket;
public:
- SslAcceptor(const SslSocket& s, Callback callback);
- ~SslAcceptor();
+ SslAcceptorTmpl(const T& s, Callback callback);
+ ~SslAcceptorTmpl();
void start(qpid::sys::Poller::shared_ptr poller);
private:
@@ -73,7 +77,7 @@ public:
SslConnector(const SslSocket& socket,
Poller::shared_ptr poller,
std::string hostname,
- uint16_t port,
+ std::string port,
ConnectedCallback connCb,
FailedCallback failCb = 0);
diff --git a/cpp/src/qpid/sys/ssl/SslSocket.cpp b/cpp/src/qpid/sys/ssl/SslSocket.cpp
index 01e2658877..30234bb686 100644
--- a/cpp/src/qpid/sys/ssl/SslSocket.cpp
+++ b/cpp/src/qpid/sys/ssl/SslSocket.cpp
@@ -25,11 +25,13 @@
#include "qpid/Exception.h"
#include "qpid/sys/posix/check.h"
#include "qpid/sys/posix/PrivatePosix.h"
+#include "qpid/log/Statement.h"
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/errno.h>
+#include <poll.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <netdb.h>
@@ -50,36 +52,6 @@ namespace sys {
namespace ssl {
namespace {
-std::string getName(int fd, bool local, bool includeService = false)
-{
- ::sockaddr_storage name; // big enough for any socket address
- ::socklen_t namelen = sizeof(name);
-
- int result = -1;
- if (local) {
- result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
- } else {
- result = ::getpeername(fd, (::sockaddr*)&name, &namelen);
- }
-
- QPID_POSIX_CHECK(result);
-
- char servName[NI_MAXSERV];
- char dispName[NI_MAXHOST];
- if (includeService) {
- if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName),
- servName, sizeof(servName),
- NI_NUMERICHOST | NI_NUMERICSERV) != 0)
- throw QPID_POSIX_ERROR(rc);
- return std::string(dispName) + ":" + std::string(servName);
-
- } else {
- if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), 0, 0, NI_NUMERICHOST) != 0)
- throw QPID_POSIX_ERROR(rc);
- return dispName;
- }
-}
-
std::string getService(int fd, bool local)
{
::sockaddr_storage name; // big enough for any socket address
@@ -132,7 +104,7 @@ std::string getDomainFromSubject(std::string subject)
}
-SslSocket::SslSocket() : IOHandle(new IOHandlePrivate()), socket(0), prototype(0)
+SslSocket::SslSocket() : socket(0), prototype(0)
{
impl->fd = ::socket (PF_INET, SOCK_STREAM, 0);
if (impl->fd < 0) throw QPID_POSIX_ERROR(errno);
@@ -144,7 +116,7 @@ SslSocket::SslSocket() : IOHandle(new IOHandlePrivate()), socket(0), prototype(0
* returned from accept. Because we use posix accept rather than
* PR_Accept, we have to reset the handshake.
*/
-SslSocket::SslSocket(IOHandlePrivate* ioph, PRFileDesc* model) : IOHandle(ioph), socket(0), prototype(0)
+SslSocket::SslSocket(IOHandlePrivate* ioph, PRFileDesc* model) : Socket(ioph), socket(0), prototype(0)
{
socket = SSL_ImportFD(model, PR_ImportTCPSocket(impl->fd));
NSS_CHECK(SSL_ResetHandshake(socket, true));
@@ -158,7 +130,7 @@ void SslSocket::setNonblocking() const
PR_SetSocketOption(socket, &option);
}
-void SslSocket::connect(const std::string& host, uint16_t port) const
+void SslSocket::connect(const std::string& host, const std::string& port) const
{
std::stringstream namestream;
namestream << host << ":" << port;
@@ -180,7 +152,7 @@ void SslSocket::connect(const std::string& host, uint16_t port) const
PRHostEnt hostEntry;
PR_CHECK(PR_GetHostByName(host.data(), hostBuffer, PR_NETDB_BUF_SIZE, &hostEntry));
PRNetAddr address;
- int value = PR_EnumerateHostEnt(0, &hostEntry, port, &address);
+ int value = PR_EnumerateHostEnt(0, &hostEntry, boost::lexical_cast<PRUint16>(port), &address);
if (value < 0) {
throw Exception(QPID_MSG("Error getting address for host: " << ErrorString()));
} else if (value == 0) {
@@ -238,6 +210,7 @@ int SslSocket::listen(uint16_t port, int backlog, const std::string& certName, b
SslSocket* SslSocket::accept() const
{
+ QPID_LOG(trace, "Accepting SSL connection.");
int afd = ::accept(impl->fd, 0, 0);
if ( afd >= 0) {
return new SslSocket(new IOHandlePrivate(afd), prototype);
@@ -248,36 +221,109 @@ SslSocket* SslSocket::accept() const
}
}
-int SslSocket::read(void *buf, size_t count) const
-{
- return PR_Read(socket, buf, count);
-}
+#define SSL_STREAM_MAX_WAIT_ms 20
+#define SSL_STREAM_MAX_RETRIES 2
-int SslSocket::write(const void *buf, size_t count) const
-{
- return PR_Write(socket, buf, count);
-}
+static bool isSslStream(int afd) {
+ int retries = SSL_STREAM_MAX_RETRIES;
+ unsigned char buf[5] = {};
-std::string SslSocket::getSockname() const
-{
- return getName(impl->fd, true);
+ do {
+ struct pollfd fd = {afd, POLLIN, 0};
+
+ /*
+ * Note that this is blocking the accept thread, so connections that
+ * send no data can limit the rate at which we can accept new
+ * connections.
+ */
+ if (::poll(&fd, 1, SSL_STREAM_MAX_WAIT_ms) > 0) {
+ errno = 0;
+ int result = recv(afd, buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT);
+ if (result == sizeof(buf)) {
+ break;
+ }
+ if (errno && errno != EAGAIN) {
+ int err = errno;
+ ::close(afd);
+ throw QPID_POSIX_ERROR(err);
+ }
+ }
+ } while (retries-- > 0);
+
+ if (retries < 0) {
+ return false;
+ }
+
+ /*
+ * SSLv2 Client Hello format
+ * http://www.mozilla.org/projects/security/pki/nss/ssl/draft02.html
+ *
+ * Bytes 0-1: RECORD-LENGTH
+ * Byte 2: MSG-CLIENT-HELLO (1)
+ * Byte 3: CLIENT-VERSION-MSB
+ * Byte 4: CLIENT-VERSION-LSB
+ *
+ * Allowed versions:
+ * 2.0 - SSLv2
+ * 3.0 - SSLv3
+ * 3.1 - TLS 1.0
+ * 3.2 - TLS 1.1
+ * 3.3 - TLS 1.2
+ *
+ * The version sent in the Client-Hello is the latest version supported by
+ * the client. NSS may send version 3.x in an SSLv2 header for
+ * maximum compatibility.
+ */
+ bool isSSL2Handshake = buf[2] == 1 && // MSG-CLIENT-HELLO
+ ((buf[3] == 3 && buf[4] <= 3) || // SSL 3.0 & TLS 1.0-1.2 (v3.1-3.3)
+ (buf[3] == 2 && buf[4] == 0)); // SSL 2
+
+ /*
+ * SSLv3/TLS Client Hello format
+ * RFC 2246
+ *
+ * Byte 0: ContentType (handshake - 22)
+ * Bytes 1-2: ProtocolVersion {major, minor}
+ *
+ * Allowed versions:
+ * 3.0 - SSLv3
+ * 3.1 - TLS 1.0
+ * 3.2 - TLS 1.1
+ * 3.3 - TLS 1.2
+ */
+ bool isSSL3Handshake = buf[0] == 22 && // handshake
+ (buf[1] == 3 && buf[2] <= 3); // SSL 3.0 & TLS 1.0-1.2 (v3.1-3.3)
+
+ return isSSL2Handshake || isSSL3Handshake;
}
-std::string SslSocket::getPeername() const
+Socket* SslMuxSocket::accept() const
{
- return getName(impl->fd, false);
+ int afd = ::accept(impl->fd, 0, 0);
+ if (afd >= 0) {
+ QPID_LOG(trace, "Accepting connection with optional SSL wrapper.");
+ if (isSslStream(afd)) {
+ QPID_LOG(trace, "Accepted SSL connection.");
+ return new SslSocket(new IOHandlePrivate(afd), prototype);
+ } else {
+ QPID_LOG(trace, "Accepted Plaintext connection.");
+ return new Socket(new IOHandlePrivate(afd));
+ }
+ } else if (errno == EAGAIN) {
+ return 0;
+ } else {
+ throw QPID_POSIX_ERROR(errno);
+ }
}
-std::string SslSocket::getPeerAddress() const
+int SslSocket::read(void *buf, size_t count) const
{
- if (!connectname.empty())
- return connectname;
- return getName(impl->fd, false, true);
+ return PR_Read(socket, buf, count);
}
-std::string SslSocket::getLocalAddress() const
+int SslSocket::write(const void *buf, size_t count) const
{
- return getName(impl->fd, true, true);
+ return PR_Write(socket, buf, count);
}
uint16_t SslSocket::getLocalPort() const
@@ -290,17 +336,6 @@ uint16_t SslSocket::getRemotePort() const
return atoi(getService(impl->fd, true).c_str());
}
-int SslSocket::getError() const
-{
- int result;
- socklen_t rSize = sizeof (result);
-
- if (::getsockopt(impl->fd, SOL_SOCKET, SO_ERROR, &result, &rSize) < 0)
- throw QPID_POSIX_ERROR(errno);
-
- return result;
-}
-
void SslSocket::setTcpNoDelay(bool nodelay) const
{
if (nodelay) {
diff --git a/cpp/src/qpid/sys/ssl/SslSocket.h b/cpp/src/qpid/sys/ssl/SslSocket.h
index 25712c98d5..eabadcbe23 100644
--- a/cpp/src/qpid/sys/ssl/SslSocket.h
+++ b/cpp/src/qpid/sys/ssl/SslSocket.h
@@ -23,6 +23,7 @@
*/
#include "qpid/sys/IOHandle.h"
+#include "qpid/sys/Socket.h"
#include <nspr.h>
#include <string>
@@ -36,7 +37,7 @@ class Duration;
namespace ssl {
-class SslSocket : public qpid::sys::IOHandle
+class SslSocket : public qpid::sys::Socket
{
public:
/** Create a socket wrapper for descriptor. */
@@ -53,7 +54,7 @@ public:
* NSSInit().*/
void setCertName(const std::string& certName);
- void connect(const std::string& host, uint16_t port) const;
+ void connect(const std::string& host, const std::string& port) const;
void close() const;
@@ -75,45 +76,13 @@ public:
int read(void *buf, size_t count) const;
int write(const void *buf, size_t count) const;
- /** Returns the "socket name" ie the address bound to
- * the near end of the socket
- */
- std::string getSockname() const;
-
- /** Returns the "peer name" ie the address bound to
- * the remote end of the socket
- */
- std::string getPeername() const;
-
- /**
- * Returns an address (host and port) for the remote end of the
- * socket
- */
- std::string getPeerAddress() const;
- /**
- * Returns an address (host and port) for the local end of the
- * socket
- */
- std::string getLocalAddress() const;
-
- /**
- * Returns the full address of the connection: local and remote host and port.
- */
- std::string getFullAddress() const { return getLocalAddress()+"-"+getPeerAddress(); }
-
uint16_t getLocalPort() const;
uint16_t getRemotePort() const;
- /**
- * Returns the error code stored in the socket. This may be used
- * to determine the result of a non-blocking connect.
- */
- int getError() const;
-
int getKeyLen() const;
std::string getClientAuthId() const;
-private:
+protected:
mutable std::string connectname;
mutable PRFileDesc* socket;
std::string certname;
@@ -126,6 +95,13 @@ private:
mutable PRFileDesc* prototype;
SslSocket(IOHandlePrivate* ioph, PRFileDesc* model);
+ friend class SslMuxSocket;
+};
+
+class SslMuxSocket : public SslSocket
+{
+public:
+ Socket* accept() const;
};
}}}
diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp
index 38d8842521..30378d4c5f 100644
--- a/cpp/src/qpid/sys/windows/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -30,6 +30,7 @@
#include "qpid/log/Statement.h"
#include "qpid/sys/windows/check.h"
+#include "qpid/sys/windows/mingw32_compat.h"
#include <boost/thread/once.hpp>
@@ -46,16 +47,13 @@ namespace {
/*
* The function pointers for AcceptEx and ConnectEx need to be looked up
- * at run time. Make sure this is done only once.
+ * at run time.
*/
-boost::once_flag lookUpAcceptExOnce = BOOST_ONCE_INIT;
-LPFN_ACCEPTEX fnAcceptEx = 0;
-typedef void (*lookUpFunc)(const qpid::sys::Socket &);
-
-void lookUpAcceptEx() {
- SOCKET h = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::Socket& s) {
+ SOCKET h = toSocketHandle(s);
GUID guidAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes = 0;
+ LPFN_ACCEPTEX fnAcceptEx;
WSAIoctl(h,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidAcceptEx,
@@ -65,9 +63,9 @@ void lookUpAcceptEx() {
&dwBytes,
NULL,
NULL);
- closesocket(h);
if (fnAcceptEx == 0)
throw qpid::Exception(QPID_MSG("Failed to look up AcceptEx"));
+ return fnAcceptEx;
}
}
@@ -94,18 +92,15 @@ private:
AsynchAcceptor::Callback acceptedCallback;
const Socket& socket;
+ const LPFN_ACCEPTEX fnAcceptEx;
};
AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback)
: acceptedCallback(callback),
- socket(s) {
+ socket(s),
+ fnAcceptEx(lookUpAcceptEx(s)) {
s.setNonblocking();
-#if (BOOST_VERSION >= 103500) /* boost 1.35 or later reversed the args */
- boost::call_once(lookUpAcceptExOnce, lookUpAcceptEx);
-#else
- boost::call_once(lookUpAcceptEx, lookUpAcceptExOnce);
-#endif
}
AsynchAcceptor::~AsynchAcceptor()
@@ -114,7 +109,8 @@ AsynchAcceptor::~AsynchAcceptor()
}
void AsynchAcceptor::start(Poller::shared_ptr poller) {
- poller->monitorHandle(PollerHandle(socket), Poller::INPUT);
+ PollerHandle ph = PollerHandle(socket);
+ poller->monitorHandle(ph, Poller::INPUT);
restart ();
}
@@ -122,25 +118,26 @@ void AsynchAcceptor::restart(void) {
DWORD bytesReceived = 0; // Not used, needed for AcceptEx API
AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback,
this,
- toSocketHandle(socket));
+ socket);
BOOL status;
- status = ::fnAcceptEx(toSocketHandle(socket),
- toSocketHandle(*result->newSocket),
- result->addressBuffer,
- 0,
- AsynchAcceptResult::SOCKADDRMAXLEN,
- AsynchAcceptResult::SOCKADDRMAXLEN,
- &bytesReceived,
- result->overlapped());
+ status = fnAcceptEx(toSocketHandle(socket),
+ toSocketHandle(*result->newSocket),
+ result->addressBuffer,
+ 0,
+ AsynchAcceptResult::SOCKADDRMAXLEN,
+ AsynchAcceptResult::SOCKADDRMAXLEN,
+ &bytesReceived,
+ result->overlapped());
QPID_WINDOWS_CHECK_ASYNC_START(status);
}
AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb,
AsynchAcceptor *acceptor,
- SOCKET listener)
- : callback(cb), acceptor(acceptor), listener(listener) {
- newSocket.reset (new Socket());
+ const Socket& listener)
+ : callback(cb), acceptor(acceptor),
+ listener(toSocketHandle(listener)),
+ newSocket(listener.createSameTypeSocket()) {
}
void AsynchAcceptResult::success(size_t /*bytesTransferred*/) {
@@ -154,7 +151,7 @@ void AsynchAcceptResult::success(size_t /*bytesTransferred*/) {
delete this;
}
-void AsynchAcceptResult::failure(int status) {
+void AsynchAcceptResult::failure(int /*status*/) {
//if (status != WSA_OPERATION_ABORTED)
// Can there be anything else? ;
delete this;
@@ -173,20 +170,20 @@ private:
FailedCallback failCallback;
const Socket& socket;
const std::string hostname;
- const uint16_t port;
+ const std::string port;
public:
AsynchConnector(const Socket& socket,
- std::string hostname,
- uint16_t port,
+ const std::string& hostname,
+ const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb = 0);
void start(Poller::shared_ptr poller);
};
AsynchConnector::AsynchConnector(const Socket& sock,
- std::string hname,
- uint16_t p,
+ const std::string& hname,
+ const std::string& p,
ConnectedCallback connCb,
FailedCallback failCb) :
connCallback(connCb), failCallback(failCb), socket(sock),
@@ -216,8 +213,8 @@ AsynchAcceptor* AsynchAcceptor::create(const Socket& s,
}
AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s,
- std::string hostname,
- uint16_t port,
+ const std::string& hostname,
+ const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb)
{
@@ -410,8 +407,9 @@ void AsynchIO::queueForDeletion() {
}
void AsynchIO::start(Poller::shared_ptr poller0) {
+ PollerHandle ph = PollerHandle(socket);
poller = poller0;
- poller->monitorHandle(PollerHandle(socket), Poller::INPUT);
+ poller->monitorHandle(ph, Poller::INPUT);
if (writeQueue.size() > 0) // Already have data queued for write
notifyPendingWrite();
startReading();
@@ -584,7 +582,6 @@ void AsynchIO::notifyIdle(void) {
void AsynchIO::startWrite(AsynchIO::BufferBase* buff) {
writeInProgress = true;
InterlockedIncrement(&opsInProgress);
- int writeCount = buff->byteCount-buff->dataCount;
AsynchWriteResult *result =
new AsynchWriteResult(boost::bind(&AsynchIO::completion, this, _1),
buff,
diff --git a/cpp/src/qpid/sys/windows/AsynchIoResult.h b/cpp/src/qpid/sys/windows/AsynchIoResult.h
index 66c89efc11..27e4c22138 100755
--- a/cpp/src/qpid/sys/windows/AsynchIoResult.h
+++ b/cpp/src/qpid/sys/windows/AsynchIoResult.h
@@ -83,22 +83,22 @@ class AsynchAcceptResult : public AsynchResult {
public:
AsynchAcceptResult(qpid::sys::AsynchAcceptor::Callback cb,
AsynchAcceptor *acceptor,
- SOCKET listener);
+ const qpid::sys::Socket& listener);
virtual void success (size_t bytesTransferred);
virtual void failure (int error);
private:
virtual void complete(void) {} // No-op for this class.
- std::auto_ptr<qpid::sys::Socket> newSocket;
qpid::sys::AsynchAcceptor::Callback callback;
AsynchAcceptor *acceptor;
SOCKET listener;
+ std::auto_ptr<qpid::sys::Socket> newSocket;
// AcceptEx needs a place to write the local and remote addresses
// when accepting the connection. Place those here; get enough for
// IPv6 addresses, even if the socket is IPv4.
- enum { SOCKADDRMAXLEN = sizeof sockaddr_in6 + 16,
+ enum { SOCKADDRMAXLEN = sizeof(sockaddr_in6) + 16,
SOCKADDRBUFLEN = 2 * SOCKADDRMAXLEN };
char addressBuffer[SOCKADDRBUFLEN];
};
diff --git a/cpp/src/qpid/sys/windows/IocpPoller.cpp b/cpp/src/qpid/sys/windows/IocpPoller.cpp
index d326ab02ac..1805dd2cd8 100755
--- a/cpp/src/qpid/sys/windows/IocpPoller.cpp
+++ b/cpp/src/qpid/sys/windows/IocpPoller.cpp
@@ -152,9 +152,9 @@ void Poller::monitorHandle(PollerHandle& handle, Direction dir) {
}
// All no-ops...
-void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {}
-void Poller::registerHandle(PollerHandle& handle) {}
-void Poller::unregisterHandle(PollerHandle& handle) {}
+void Poller::unmonitorHandle(PollerHandle& /*handle*/, Direction /*dir*/) {}
+void Poller::registerHandle(PollerHandle& /*handle*/) {}
+void Poller::unregisterHandle(PollerHandle& /*handle*/) {}
Poller::Event Poller::wait(Duration timeout) {
DWORD timeoutMs = 0;
diff --git a/cpp/src/qpid/sys/windows/Shlib.cpp b/cpp/src/qpid/sys/windows/Shlib.cpp
index 38027de93f..ba18747eb4 100644
--- a/cpp/src/qpid/sys/windows/Shlib.cpp
+++ b/cpp/src/qpid/sys/windows/Shlib.cpp
@@ -44,7 +44,8 @@ void Shlib::unload() {
}
void* Shlib::getSymbol(const char* name) {
- void* sym = GetProcAddress(static_cast<HMODULE>(handle), name);
+ // Double cast avoids warning about casting function pointer to object
+ void *sym = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(GetProcAddress(static_cast<HMODULE>(handle), name)));
if (sym == NULL)
throw QPID_WINDOWS_ERROR(GetLastError());
return sym;
diff --git a/cpp/src/qpid/sys/windows/Socket.cpp b/cpp/src/qpid/sys/windows/Socket.cpp
index 11fb8b4133..1fa4768329 100755..100644
--- a/cpp/src/qpid/sys/windows/Socket.cpp
+++ b/cpp/src/qpid/sys/windows/Socket.cpp
@@ -20,19 +20,18 @@
*/
#include "qpid/sys/Socket.h"
+
#include "qpid/sys/SocketAddress.h"
-#include "qpid/sys/windows/IoHandlePrivate.h"
#include "qpid/sys/windows/check.h"
-#include "qpid/sys/Time.h"
+#include "qpid/sys/windows/IoHandlePrivate.h"
-#include <cstdlib>
-#include <string.h>
+// Ensure we get all of winsock2.h
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
#include <winsock2.h>
-#include <boost/format.hpp>
-#include <boost/lexical_cast.hpp>
-
// Need to initialize WinSock. Ideally, this would be a singleton or embedded
// in some one-time initialization function. I tried boost singleton and could
// not get it to compile (and others located in google had the same problem).
@@ -84,53 +83,30 @@ namespace sys {
namespace {
-std::string getName(SOCKET fd, bool local, bool includeService = false)
+std::string getName(SOCKET fd, bool local)
{
- sockaddr_in name; // big enough for any socket address
- socklen_t namelen = sizeof(name);
+ ::sockaddr_storage name_s; // big enough for any socket address
+ ::sockaddr* name = (::sockaddr*)&name_s;
+ ::socklen_t namelen = sizeof(name_s);
+
if (local) {
- QPID_WINSOCK_CHECK(::getsockname(fd, (sockaddr*)&name, &namelen));
+ QPID_WINSOCK_CHECK(::getsockname(fd, name, &namelen));
} else {
- QPID_WINSOCK_CHECK(::getpeername(fd, (sockaddr*)&name, &namelen));
+ QPID_WINSOCK_CHECK(::getpeername(fd, name, &namelen));
}
- char servName[NI_MAXSERV];
- char dispName[NI_MAXHOST];
- if (includeService) {
- if (int rc = ::getnameinfo((sockaddr*)&name, namelen,
- dispName, sizeof(dispName),
- servName, sizeof(servName),
- NI_NUMERICHOST | NI_NUMERICSERV) != 0)
- throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
- return std::string(dispName) + ":" + std::string(servName);
- } else {
- if (int rc = ::getnameinfo((sockaddr*)&name, namelen,
- dispName, sizeof(dispName),
- 0, 0,
- NI_NUMERICHOST) != 0)
- throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
- return dispName;
- }
+ return SocketAddress::asString(name, namelen);
}
-std::string getService(SOCKET fd, bool local)
+uint16_t getLocalPort(int fd)
{
- sockaddr_in name; // big enough for any socket address
- socklen_t namelen = sizeof(name);
-
- if (local) {
- QPID_WINSOCK_CHECK(::getsockname(fd, (sockaddr*)&name, &namelen));
- } else {
- QPID_WINSOCK_CHECK(::getpeername(fd, (sockaddr*)&name, &namelen));
- }
+ ::sockaddr_storage name_s; // big enough for any socket address
+ ::sockaddr* name = (::sockaddr*)&name_s;
+ ::socklen_t namelen = sizeof(name_s);
+
+ QPID_WINSOCK_CHECK(::getsockname(fd, name, &namelen));
- char servName[NI_MAXSERV];
- if (int rc = ::getnameinfo((sockaddr*)&name, namelen,
- 0, 0,
- servName, sizeof(servName),
- NI_NUMERICHOST | NI_NUMERICSERV) != 0)
- throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
- return servName;
+ return SocketAddress::getPort(name);
}
} // namespace
@@ -138,13 +114,7 @@ Socket::Socket() :
IOHandle(new IOHandlePrivate),
nonblocking(false),
nodelay(false)
-{
- SOCKET& socket = impl->fd;
- if (socket != INVALID_SOCKET) Socket::close();
- SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0);
- if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
- socket = s;
-}
+{}
Socket::Socket(IOHandlePrivate* h) :
IOHandle(h),
@@ -152,8 +122,7 @@ Socket::Socket(IOHandlePrivate* h) :
nodelay(false)
{}
-void
-Socket::createSocket(const SocketAddress& sa) const
+void Socket::createSocket(const SocketAddress& sa) const
{
SOCKET& socket = impl->fd;
if (socket != INVALID_SOCKET) Socket::close();
@@ -168,24 +137,24 @@ Socket::createSocket(const SocketAddress& sa) const
if (nonblocking) setNonblocking();
if (nodelay) setTcpNoDelay();
} catch (std::exception&) {
- closesocket(s);
+ ::closesocket(s);
socket = INVALID_SOCKET;
throw;
}
}
-void Socket::setTimeout(const Duration& interval) const
-{
- const SOCKET& socket = impl->fd;
- int64_t nanosecs = interval;
- nanosecs /= (1000 * 1000); // nsecs -> usec -> msec
- int msec = 0;
- if (nanosecs > std::numeric_limits<int>::max())
- msec = std::numeric_limits<int>::max();
- else
- msec = static_cast<int>(nanosecs);
- setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&msec, sizeof(msec));
- setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&msec, sizeof(msec));
+Socket* Socket::createSameTypeSocket() const {
+ SOCKET& socket = impl->fd;
+ // Socket currently has no actual socket attached
+ if (socket == INVALID_SOCKET)
+ return new Socket;
+
+ ::sockaddr_storage sa;
+ ::socklen_t salen = sizeof(sa);
+ QPID_WINSOCK_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen));
+ SOCKET s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
+ if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
+ return new Socket(new IOHandlePrivate(s));
}
void Socket::setNonblocking() const {
@@ -193,30 +162,25 @@ void Socket::setNonblocking() const {
QPID_WINSOCK_CHECK(ioctlsocket(impl->fd, FIONBIO, &nonblock));
}
-void Socket::connect(const std::string& host, uint16_t port) const
+void Socket::connect(const std::string& host, const std::string& port) const
{
- SocketAddress sa(host, boost::lexical_cast<std::string>(port));
+ SocketAddress sa(host, port);
connect(sa);
}
void
Socket::connect(const SocketAddress& addr) const
{
+ peername = addr.asString(false);
+
+ createSocket(addr);
+
const SOCKET& socket = impl->fd;
- const addrinfo *addrs = &(getAddrInfo(addr));
- int error = 0;
+ int err;
WSASetLastError(0);
- while (addrs != 0) {
- if ((::connect(socket, addrs->ai_addr, addrs->ai_addrlen) == 0) ||
- (WSAGetLastError() == WSAEWOULDBLOCK))
- break;
- // Error... save this error code and see if there are other address
- // to try before throwing the exception.
- error = WSAGetLastError();
- addrs = addrs->ai_next;
- }
- if (error)
- throw qpid::Exception(QPID_MSG(strError(error) << ": " << connectname));
+ if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) != 0) &&
+ ((err = ::WSAGetLastError()) != WSAEWOULDBLOCK))
+ throw qpid::Exception(QPID_MSG(strError(err) << ": " << peername));
}
void
@@ -247,24 +211,26 @@ int Socket::read(void *buf, size_t count) const
return received;
}
-int Socket::listen(uint16_t port, int backlog) const
+int Socket::listen(const std::string& host, const std::string& port, int backlog) const
+{
+ SocketAddress sa(host, port);
+ return listen(sa, backlog);
+}
+
+int Socket::listen(const SocketAddress& addr, int backlog) const
{
+ createSocket(addr);
+
const SOCKET& socket = impl->fd;
BOOL yes=1;
QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, sizeof(yes)));
- struct sockaddr_in name;
- memset(&name, 0, sizeof(name));
- name.sin_family = AF_INET;
- name.sin_port = htons(port);
- name.sin_addr.s_addr = 0;
- if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) == SOCKET_ERROR)
- throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(WSAGetLastError())));
+
+ if (::bind(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) == SOCKET_ERROR)
+ throw Exception(QPID_MSG("Can't bind to " << addr.asString() << ": " << strError(WSAGetLastError())));
if (::listen(socket, backlog) == SOCKET_ERROR)
- throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(WSAGetLastError())));
-
- socklen_t namelen = sizeof(name);
- QPID_WINSOCK_CHECK(::getsockname(socket, (struct sockaddr*)&name, &namelen));
- return ntohs(name.sin_port);
+ throw Exception(QPID_MSG("Can't listen on " <<addr.asString() << ": " << strError(WSAGetLastError())));
+
+ return getLocalPort(socket);
}
Socket* Socket::accept() const
@@ -277,36 +243,20 @@ Socket* Socket::accept() const
else throw QPID_WINDOWS_ERROR(WSAGetLastError());
}
-std::string Socket::getSockname() const
-{
- return getName(impl->fd, true);
-}
-
-std::string Socket::getPeername() const
-{
- return getName(impl->fd, false);
-}
-
std::string Socket::getPeerAddress() const
{
- if (!connectname.empty())
- return std::string (connectname);
- return getName(impl->fd, false, true);
+ if (peername.empty()) {
+ peername = getName(impl->fd, false);
+ }
+ return peername;
}
std::string Socket::getLocalAddress() const
{
- return getName(impl->fd, true, true);
-}
-
-uint16_t Socket::getLocalPort() const
-{
- return atoi(getService(impl->fd, true).c_str());
-}
-
-uint16_t Socket::getRemotePort() const
-{
- return atoi(getService(impl->fd, true).c_str());
+ if (localname.empty()) {
+ localname = getName(impl->fd, true);
+ }
+ return localname;
}
int Socket::getError() const
diff --git a/cpp/src/qpid/sys/windows/SocketAddress.cpp b/cpp/src/qpid/sys/windows/SocketAddress.cpp
index 501cff1297..77bbf85810 100644
--- a/cpp/src/qpid/sys/windows/SocketAddress.cpp
+++ b/cpp/src/qpid/sys/windows/SocketAddress.cpp
@@ -21,7 +21,13 @@
#include "qpid/sys/SocketAddress.h"
-#include "qpid/sys/windows/check.h"
+#include "qpid/Exception.h"
+#include "qpid/Msg.h"
+
+// Ensure we get all of winsock2.h
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
#include <winsock2.h>
#include <ws2tcpip.h>
@@ -35,37 +41,111 @@ SocketAddress::SocketAddress(const std::string& host0, const std::string& port0)
port(port0),
addrInfo(0)
{
- ::addrinfo hints;
- ::memset(&hints, 0, sizeof(hints));
- hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well
- hints.ai_socktype = SOCK_STREAM;
-
- const char* node = 0;
- if (host.empty()) {
- hints.ai_flags |= AI_PASSIVE;
- } else {
- node = host.c_str();
- }
- const char* service = port.empty() ? "0" : port.c_str();
+}
- int n = ::getaddrinfo(node, service, &hints, &addrInfo);
- if (n != 0)
- throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n)));
+SocketAddress::SocketAddress(const SocketAddress& sa) :
+ host(sa.host),
+ port(sa.port),
+ addrInfo(0)
+{
+}
+
+SocketAddress& SocketAddress::operator=(const SocketAddress& sa)
+{
+ SocketAddress temp(sa);
+
+ std::swap(temp, *this);
+ return *this;
}
SocketAddress::~SocketAddress()
{
- ::freeaddrinfo(addrInfo);
+ if (addrInfo) {
+ ::freeaddrinfo(addrInfo);
+ }
}
-std::string SocketAddress::asString() const
+std::string SocketAddress::asString(::sockaddr const * const addr, size_t addrlen)
{
- return host + ":" + port;
+ char servName[NI_MAXSERV];
+ char dispName[NI_MAXHOST];
+ if (int rc=::getnameinfo(addr, addrlen,
+ dispName, sizeof(dispName),
+ servName, sizeof(servName),
+ NI_NUMERICHOST | NI_NUMERICSERV) != 0)
+ throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
+ std::string s;
+ switch (addr->sa_family) {
+ case AF_INET: s += dispName; break;
+ case AF_INET6: s += "["; s += dispName; s+= "]"; break;
+ default: throw Exception(QPID_MSG("Unexpected socket type"));
+ }
+ s += ":";
+ s += servName;
+ return s;
+}
+
+uint16_t SocketAddress::getPort(::sockaddr const * const addr)
+{
+ switch (addr->sa_family) {
+ case AF_INET: return ntohs(((::sockaddr_in*)addr)->sin_port);
+ case AF_INET6: return ntohs(((::sockaddr_in6*)addr)->sin6_port);
+ default:throw Exception(QPID_MSG("Unexpected socket type"));
+ }
+}
+
+std::string SocketAddress::asString(bool numeric) const
+{
+ if (!numeric)
+ return host + ":" + port;
+ // Canonicalise into numeric id
+ const ::addrinfo& ai = getAddrInfo(*this);
+
+ return asString(ai.ai_addr, ai.ai_addrlen);
+}
+
+bool SocketAddress::nextAddress() {
+ bool r = currentAddrInfo->ai_next != 0;
+ if (r)
+ currentAddrInfo = currentAddrInfo->ai_next;
+ return r;
+}
+
+void SocketAddress::setAddrInfoPort(uint16_t port) {
+ if (!currentAddrInfo) return;
+
+ ::addrinfo& ai = *currentAddrInfo;
+ switch (ai.ai_family) {
+ case AF_INET: ((::sockaddr_in*)ai.ai_addr)->sin_port = htons(port); return;
+ case AF_INET6:((::sockaddr_in6*)ai.ai_addr)->sin6_port = htons(port); return;
+ default: throw Exception(QPID_MSG("Unexpected socket type"));
+ }
}
const ::addrinfo& getAddrInfo(const SocketAddress& sa)
{
- return *sa.addrInfo;
+ if (!sa.addrInfo) {
+ ::addrinfo hints;
+ ::memset(&hints, 0, sizeof(hints));
+ hints.ai_flags = AI_ADDRCONFIG; // Only use protocols that we have configured interfaces for
+ hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6
+ hints.ai_socktype = SOCK_STREAM;
+
+ const char* node = 0;
+ if (sa.host.empty()) {
+ hints.ai_flags |= AI_PASSIVE;
+ } else {
+ node = sa.host.c_str();
+ }
+ const char* service = sa.port.empty() ? "0" : sa.port.c_str();
+
+ int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo);
+ if (n != 0)
+ throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n)));
+ sa.currentAddrInfo = sa.addrInfo;
+ }
+
+ return *sa.currentAddrInfo;
}
}}
diff --git a/cpp/src/qpid/sys/windows/SslAsynchIO.h b/cpp/src/qpid/sys/windows/SslAsynchIO.h
index 3cdf2c8f08..edec081ced 100644
--- a/cpp/src/qpid/sys/windows/SslAsynchIO.h
+++ b/cpp/src/qpid/sys/windows/SslAsynchIO.h
@@ -39,9 +39,6 @@ namespace qpid {
namespace sys {
namespace windows {
-class Socket;
-class Poller;
-
/*
* SSL/Schannel shim between the frame-handling and AsynchIO layers.
* SslAsynchIO creates a regular AsynchIO object to handle I/O and this class
diff --git a/cpp/src/qpid/sys/windows/StrError.cpp b/cpp/src/qpid/sys/windows/StrError.cpp
index 9c1bfcd79c..546d399d16 100755
--- a/cpp/src/qpid/sys/windows/StrError.cpp
+++ b/cpp/src/qpid/sys/windows/StrError.cpp
@@ -30,6 +30,7 @@ namespace sys {
std::string strError(int err) {
const size_t bufsize = 512;
char buf[bufsize];
+ buf[0] = 0;
if (0 == FormatMessage (FORMAT_MESSAGE_MAX_WIDTH_MASK
| FORMAT_MESSAGE_FROM_SYSTEM,
0,
@@ -39,7 +40,11 @@ std::string strError(int err) {
bufsize,
0))
{
- strerror_s (buf, bufsize, err);
+#ifdef _MSC_VER
+ strerror_s(buf, bufsize, err);
+#else
+ return std::string(strerror(err));
+#endif
}
return std::string(buf);
}
diff --git a/cpp/src/qpid/sys/windows/Thread.cpp b/cpp/src/qpid/sys/windows/Thread.cpp
index 583a9613a3..23b0033be4 100755
--- a/cpp/src/qpid/sys/windows/Thread.cpp
+++ b/cpp/src/qpid/sys/windows/Thread.cpp
@@ -19,6 +19,11 @@
*
*/
+// Ensure definition of OpenThread in mingw
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/windows/check.h"
@@ -26,50 +31,204 @@
#include <process.h>
#include <windows.h>
-namespace {
-unsigned __stdcall runRunnable(void* p)
-{
- static_cast<qpid::sys::Runnable*>(p)->run();
- _endthreadex(0);
- return 0;
-}
-}
+/*
+ * This implementation distinguishes between two types of thread: Qpid
+ * threads (based on qpid::sys::Runnable) and the rest. It provides a
+ * join() that will not deadlock against the Windows loader lock for
+ * Qpid threads.
+ *
+ * System thread identifiers are unique per Windows thread; thread
+ * handles are not. Thread identifiers can be recycled, but keeping a
+ * handle open against the thread prevents recycling as long as
+ * shared_ptr references to a ThreadPrivate structure remain.
+ *
+ * There is a 1-1 relationship between Qpid threads and their
+ * ThreadPrivate structure. Non-Qpid threads do not need to find the
+ * qpidThreadDone handle, so there may be a 1-many relationship for
+ * them.
+ *
+ * TLS storage is used for a lockless solution for static library
+ * builds. The special case of LoadLibrary/FreeLibrary requires
+ * additional synchronization variables and resource cleanup in
+ * DllMain. _DLL marks the dynamic case.
+ */
namespace qpid {
namespace sys {
class ThreadPrivate {
+public:
friend class Thread;
+ friend unsigned __stdcall runThreadPrivate(void*);
+ typedef boost::shared_ptr<ThreadPrivate> shared_ptr;
+ ~ThreadPrivate();
- HANDLE threadHandle;
+private:
unsigned threadId;
-
- ThreadPrivate(Runnable* runnable) {
- uintptr_t h = _beginthreadex(0,
- 0,
- runRunnable,
- runnable,
- 0,
- &threadId);
- QPID_WINDOWS_CHECK_CRT_NZ(h);
- threadHandle = reinterpret_cast<HANDLE>(h);
+ HANDLE threadHandle;
+ HANDLE initCompleted;
+ HANDLE qpidThreadDone;
+ Runnable* runnable;
+ shared_ptr keepAlive;
+
+ ThreadPrivate() : threadId(GetCurrentThreadId()), initCompleted(NULL),
+ qpidThreadDone(NULL), runnable(NULL) {
+ threadHandle = OpenThread (SYNCHRONIZE, FALSE, threadId);
+ QPID_WINDOWS_CHECK_CRT_NZ(threadHandle);
}
-
- ThreadPrivate()
- : threadHandle(GetCurrentThread()), threadId(GetCurrentThreadId()) {}
+
+ ThreadPrivate(Runnable* r) : threadHandle(NULL), initCompleted(NULL),
+ qpidThreadDone(NULL), runnable(r) {}
+
+ void start(shared_ptr& p);
+ static shared_ptr createThread(Runnable* r);
};
+}} // namespace qpid::sys
+
+
+namespace {
+using namespace qpid::sys;
+
+#ifdef _DLL
+class ScopedCriticalSection
+{
+ public:
+ ScopedCriticalSection(CRITICAL_SECTION& cs) : criticalSection(cs) { EnterCriticalSection(&criticalSection); }
+ ~ScopedCriticalSection() { LeaveCriticalSection(&criticalSection); }
+ private:
+ CRITICAL_SECTION& criticalSection;
+};
+
+CRITICAL_SECTION threadLock;
+long runningThreads = 0;
+HANDLE threadsDone;
+bool terminating = false;
+#endif
+
+
+DWORD volatile tlsIndex = TLS_OUT_OF_INDEXES;
+
+DWORD getTlsIndex() {
+ if (tlsIndex != TLS_OUT_OF_INDEXES)
+ return tlsIndex; // already set
+
+ DWORD trialIndex = TlsAlloc();
+ QPID_WINDOWS_CHECK_NOT(trialIndex, TLS_OUT_OF_INDEXES); // No OS resource
+
+ // only one thread gets to set the value
+ DWORD actualIndex = (DWORD) InterlockedCompareExchange((LONG volatile *) &tlsIndex, (LONG) trialIndex, (LONG) TLS_OUT_OF_INDEXES);
+ if (actualIndex == TLS_OUT_OF_INDEXES)
+ return trialIndex; // we won the race
+ else {
+ TlsFree(trialIndex);
+ return actualIndex;
+ }
+}
+
+} // namespace
+
+namespace qpid {
+namespace sys {
+
+unsigned __stdcall runThreadPrivate(void* p)
+{
+ ThreadPrivate* threadPrivate = static_cast<ThreadPrivate*>(p);
+ TlsSetValue(getTlsIndex(), threadPrivate);
+
+ WaitForSingleObject (threadPrivate->initCompleted, INFINITE);
+ CloseHandle (threadPrivate->initCompleted);
+ threadPrivate->initCompleted = NULL;
+
+ try {
+ threadPrivate->runnable->run();
+ } catch (...) {
+ // not our concern
+ }
+
+ SetEvent (threadPrivate->qpidThreadDone); // allow join()
+ threadPrivate->keepAlive.reset(); // may run ThreadPrivate destructor
+
+#ifdef _DLL
+ {
+ ScopedCriticalSection l(threadLock);
+ if (--runningThreads == 0)
+ SetEvent(threadsDone);
+ }
+#endif
+ return 0;
+}
+
+
+ThreadPrivate::shared_ptr ThreadPrivate::createThread(Runnable* runnable) {
+ ThreadPrivate::shared_ptr tp(new ThreadPrivate(runnable));
+ tp->start(tp);
+ return tp;
+}
+
+void ThreadPrivate::start(ThreadPrivate::shared_ptr& tp) {
+ getTlsIndex(); // fail here if OS problem, not in new thread
+
+ initCompleted = CreateEvent (NULL, TRUE, FALSE, NULL);
+ QPID_WINDOWS_CHECK_CRT_NZ(initCompleted);
+ qpidThreadDone = CreateEvent (NULL, TRUE, FALSE, NULL);
+ QPID_WINDOWS_CHECK_CRT_NZ(qpidThreadDone);
+
+#ifdef _DLL
+ {
+ ScopedCriticalSection l(threadLock);
+ if (terminating)
+ throw qpid::Exception(QPID_MSG("creating thread after exit/FreeLibrary"));
+ runningThreads++;
+ }
+#endif
+
+ uintptr_t h = _beginthreadex(0,
+ 0,
+ runThreadPrivate,
+ (void *)this,
+ 0,
+ &threadId);
+
+#ifdef _DLL
+ if (h == NULL) {
+ ScopedCriticalSection l(threadLock);
+ if (--runningThreads == 0)
+ SetEvent(threadsDone);
+ }
+#endif
+
+ QPID_WINDOWS_CHECK_CRT_NZ(h);
+
+ // Success
+ keepAlive = tp;
+ threadHandle = reinterpret_cast<HANDLE>(h);
+ SetEvent (initCompleted);
+}
+
+ThreadPrivate::~ThreadPrivate() {
+ if (threadHandle)
+ CloseHandle (threadHandle);
+ if (initCompleted)
+ CloseHandle (initCompleted);
+ if (qpidThreadDone)
+ CloseHandle (qpidThreadDone);
+}
+
+
Thread::Thread() {}
-Thread::Thread(Runnable* runnable) : impl(new ThreadPrivate(runnable)) {}
+Thread::Thread(Runnable* runnable) : impl(ThreadPrivate::createThread(runnable)) {}
-Thread::Thread(Runnable& runnable) : impl(new ThreadPrivate(&runnable)) {}
+Thread::Thread(Runnable& runnable) : impl(ThreadPrivate::createThread(&runnable)) {}
Thread::operator bool() {
return impl;
}
bool Thread::operator==(const Thread& t) const {
+ if (!impl || !t.impl)
+ return false;
return impl->threadId == t.impl->threadId;
}
@@ -79,10 +238,17 @@ bool Thread::operator!=(const Thread& t) const {
void Thread::join() {
if (impl) {
- DWORD status = WaitForSingleObject (impl->threadHandle, INFINITE);
+ DWORD status;
+ if (impl->runnable) {
+ HANDLE handles[2] = {impl->qpidThreadDone, impl->threadHandle};
+ // wait for either. threadHandle not signalled if loader
+ // lock held (FreeLibrary). qpidThreadDone not signalled
+ // if thread terminated by exit().
+ status = WaitForMultipleObjects (2, handles, false, INFINITE);
+ }
+ else
+ status = WaitForSingleObject (impl->threadHandle, INFINITE);
QPID_WINDOWS_CHECK_NOT(status, WAIT_FAILED);
- CloseHandle (impl->threadHandle);
- impl->threadHandle = 0;
}
}
@@ -92,9 +258,70 @@ unsigned long Thread::logId() {
/* static */
Thread Thread::current() {
+ ThreadPrivate* tlsValue = (ThreadPrivate *) TlsGetValue(getTlsIndex());
Thread t;
- t.impl.reset(new ThreadPrivate());
+ if (tlsValue != NULL) {
+ // called from within Runnable->run(), so keepAlive has positive use count
+ t.impl = tlsValue->keepAlive;
+ }
+ else
+ t.impl.reset(new ThreadPrivate());
return t;
}
-}} /* qpid::sys */
+}} // namespace qpid::sys
+
+
+#ifdef _DLL
+
+// DllMain: called possibly many times in a process lifetime if dll
+// loaded and freed repeatedly . Be mindful of Windows loader lock
+// and other DllMain restrictions.
+
+BOOL APIENTRY DllMain(HMODULE hm, DWORD reason, LPVOID reserved) {
+ switch (reason) {
+ case DLL_PROCESS_ATTACH:
+ InitializeCriticalSection(&threadLock);
+ threadsDone = CreateEvent(NULL, TRUE, FALSE, NULL);
+ break;
+
+ case DLL_PROCESS_DETACH:
+ terminating = true;
+ if (reserved != NULL) {
+ // process exit(): threads are stopped arbitrarily and
+ // possibly in an inconsistent state. Not even threadLock
+ // can be trusted. All static destructors have been
+ // called at this point and any resources this unit knows
+ // about will be released as part of process tear down by
+ // the OS. Accordingly, do nothing.
+ return TRUE;
+ }
+ else {
+ // FreeLibrary(): threads are still running and we are
+ // encouraged to clean up to avoid leaks. Mostly we just
+ // want any straggler threads to finish and notify
+ // threadsDone as the last thing they do.
+ while (1) {
+ {
+ ScopedCriticalSection l(threadLock);
+ if (runningThreads == 0)
+ break;
+ ResetEvent(threadsDone);
+ }
+ WaitForSingleObject(threadsDone, INFINITE);
+ }
+ if (tlsIndex != TLS_OUT_OF_INDEXES)
+ TlsFree(getTlsIndex());
+ CloseHandle(threadsDone);
+ DeleteCriticalSection(&threadLock);
+ }
+ break;
+
+ case DLL_THREAD_ATTACH:
+ case DLL_THREAD_DETACH:
+ break;
+ }
+ return TRUE;
+}
+
+#endif
diff --git a/cpp/src/qpid/sys/windows/Time.cpp b/cpp/src/qpid/sys/windows/Time.cpp
index 16d09fcdc0..25c50819cd 100644
--- a/cpp/src/qpid/sys/windows/Time.cpp
+++ b/cpp/src/qpid/sys/windows/Time.cpp
@@ -27,6 +27,17 @@
using namespace boost::posix_time;
+namespace {
+
+// High-res timing support. This will display times since program start,
+// more or less. Keep track of the start value and the conversion factor to
+// seconds.
+bool timeInitialized = false;
+LARGE_INTEGER start;
+double freq = 1.0;
+
+}
+
namespace qpid {
namespace sys {
@@ -91,10 +102,35 @@ void outputFormattedNow(std::ostream& o) {
char time_string[100];
::time( &rawtime );
+#ifdef _MSC_VER
::localtime_s(&timeinfo, &rawtime);
+#else
+ timeinfo = *(::localtime(&rawtime));
+#endif
::strftime(time_string, 100,
"%Y-%m-%d %H:%M:%S",
&timeinfo);
o << time_string << " ";
}
+
+void outputHiresNow(std::ostream& o) {
+ if (!timeInitialized) {
+ start.QuadPart = 0;
+ LARGE_INTEGER iFreq;
+ iFreq.QuadPart = 1;
+ QueryPerformanceCounter(&start);
+ QueryPerformanceFrequency(&iFreq);
+ freq = static_cast<double>(iFreq.QuadPart);
+ timeInitialized = true;
+ }
+ LARGE_INTEGER iNow;
+ iNow.QuadPart = 0;
+ QueryPerformanceCounter(&iNow);
+ iNow.QuadPart -= start.QuadPart;
+ if (iNow.QuadPart < 0)
+ iNow.QuadPart = 0;
+ double now = static_cast<double>(iNow.QuadPart);
+ now /= freq; // now is seconds after this
+ o << std::fixed << std::setprecision(8) << std::setw(16) << std::setfill('0') << now << "s ";
+}
}}
diff --git a/cpp/src/qpid/sys/windows/mingw32_compat.h b/cpp/src/qpid/sys/windows/mingw32_compat.h
new file mode 100644
index 0000000000..51f613cc25
--- /dev/null
+++ b/cpp/src/qpid/sys/windows/mingw32_compat.h
@@ -0,0 +1,39 @@
+#ifndef _sys_windows_mingw32_compat
+#define _sys_windows_mingw32_compat
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifdef WIN32
+#ifndef _MSC_VER
+
+//
+// The following definitions for extension function GUIDs and signatures are taken from
+// MswSock.h in the Windows32 SDK. These rightfully belong in the mingw32 version of
+// mswsock.h, but are not included presently.
+//
+
+#define WSAID_ACCEPTEX {0xb5367df1,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}}
+typedef BOOL (PASCAL *LPFN_ACCEPTEX)(SOCKET,SOCKET,PVOID,DWORD,DWORD,DWORD,LPDWORD,LPOVERLAPPED);
+
+#endif
+#endif
+
+#endif
diff --git a/cpp/src/qpid/sys/windows/uuid.cpp b/cpp/src/qpid/sys/windows/uuid.cpp
index b5360622dc..3316ecbc00 100644
--- a/cpp/src/qpid/sys/windows/uuid.cpp
+++ b/cpp/src/qpid/sys/windows/uuid.cpp
@@ -19,7 +19,7 @@
*
*/
-#include <Rpc.h>
+#include <rpc.h>
#ifdef uuid_t /* Done in rpcdce.h */
# undef uuid_t
#endif
@@ -52,7 +52,11 @@ int uuid_parse (const char *in, uuid_t uu) {
void uuid_unparse (const uuid_t uu, char *out) {
unsigned char *formatted;
if (UuidToString((UUID*)uu, &formatted) == RPC_S_OK) {
+#ifdef _MSC_VER
strncpy_s (out, 36+1, (char*)formatted, _TRUNCATE);
+#else
+ strncpy (out, (char*)formatted, 36+1);
+#endif
RpcStringFree(&formatted);
}
}