diff options
| author | Stephen D. Huston <shuston@apache.org> | 2011-10-21 01:19:00 +0000 |
|---|---|---|
| committer | Stephen D. Huston <shuston@apache.org> | 2011-10-21 01:19:00 +0000 |
| commit | ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (patch) | |
| tree | dcfb94e75656c6c239fc3dcb754cd2015126424d /cpp/src/qpid/sys | |
| parent | 5eb354b338bb8d8fcd35b6ac3fb33f8103e757c3 (diff) | |
| download | qpid-python-ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5.tar.gz | |
Undo bad merge from trunk - merged at wrong level.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187150 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
51 files changed, 499 insertions, 1408 deletions
diff --git a/cpp/src/qpid/sys/AggregateOutput.h b/cpp/src/qpid/sys/AggregateOutput.h index d7c0ff29e3..6dad998bb0 100644 --- a/cpp/src/qpid/sys/AggregateOutput.h +++ b/cpp/src/qpid/sys/AggregateOutput.h @@ -41,7 +41,7 @@ namespace sys { * doOutput is called in another. */ -class QPID_COMMON_CLASS_EXTERN AggregateOutput : public OutputTask, public OutputControl +class AggregateOutput : public OutputTask, public OutputControl { typedef std::deque<OutputTask*> TaskList; diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h index 41f74f7ed0..50da8fa4fc 100644 --- a/cpp/src/qpid/sys/AsynchIO.h +++ b/cpp/src/qpid/sys/AsynchIO.h @@ -64,8 +64,8 @@ public: // deletes. To correctly manage heaps when needed, the allocate and // delete should both be done from the same class/library. QPID_COMMON_EXTERN static AsynchConnector* create(const Socket& s, - const std::string& hostname, - const std::string& port, + std::string hostname, + uint16_t port, ConnectedCallback connCb, FailedCallback failCb); virtual void start(boost::shared_ptr<Poller> poller) = 0; diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h index b9867606c4..e1885bac79 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.h +++ b/cpp/src/qpid/sys/AsynchIOHandler.h @@ -57,7 +57,7 @@ class AsynchIOHandler : public OutputControl { QPID_COMMON_EXTERN ~AsynchIOHandler(); QPID_COMMON_EXTERN void init(AsynchIO* a, int numBuffs); - QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; } + QPID_COMMON_EXTERN void setClient() { isClient = true; } // Output side QPID_COMMON_EXTERN void abort(); diff --git a/cpp/src/qpid/sys/AtomicValue.h b/cpp/src/qpid/sys/AtomicValue.h index bf995f991e..6e90eafead 100644 --- a/cpp/src/qpid/sys/AtomicValue.h +++ b/cpp/src/qpid/sys/AtomicValue.h @@ -22,12 +22,7 @@ * */ -// Have to check for clang before gcc as clang pretends to be gcc too -#if defined( __clang__ ) -// Use the clang doesn't support atomic builtins for 64 bit values, so use the slow versions -#include "qpid/sys/AtomicValue_mutex.h" - -#elif defined( __GNUC__ ) && __GNUC__ >= 4 && ( defined( __i686__ ) || defined( __x86_64__ ) ) +#if defined( __GNUC__ ) && __GNUC__ >= 4 && ( defined( __i686__ ) || defined( __x86_64__ ) ) // Use the Gnu C built-in atomic operations if compiling with gcc on a suitable platform. #include "qpid/sys/AtomicValue_gcc.h" diff --git a/cpp/src/qpid/sys/AtomicValue_gcc.h b/cpp/src/qpid/sys/AtomicValue_gcc.h index 724bae422e..d022b07c1d 100644 --- a/cpp/src/qpid/sys/AtomicValue_gcc.h +++ b/cpp/src/qpid/sys/AtomicValue_gcc.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -39,9 +39,6 @@ class AtomicValue public: AtomicValue(T init=0) : value(init) {} - // Not atomic. Don't call concurrently with atomic ops. - AtomicValue<T>& operator=(T newValue) { value = newValue; return *this; } - // Update and return new value. inline T operator+=(T n) { return __sync_add_and_fetch(&value, n); } inline T operator-=(T n) { return __sync_sub_and_fetch(&value, n); } @@ -57,11 +54,11 @@ class AtomicValue /** If current value == testval then set to newval. Returns the old value. */ T valueCompareAndSwap(T testval, T newval) { return __sync_val_compare_and_swap(&value, testval, newval); } - /** If current value == testval then set to newval. Returns true if the swap was performed. */ + /** If current value == testval then set to newval. Returns true if the swap was performed. */ bool boolCompareAndSwap(T testval, T newval) { return __sync_bool_compare_and_swap(&value, testval, newval); } T get() const { return const_cast<AtomicValue<T>*>(this)->fetchAndAdd(static_cast<T>(0)); } - + private: T value; }; diff --git a/cpp/src/qpid/sys/ClusterSafe.cpp b/cpp/src/qpid/sys/ClusterSafe.cpp index dd37615145..c6b527dfdf 100644 --- a/cpp/src/qpid/sys/ClusterSafe.cpp +++ b/cpp/src/qpid/sys/ClusterSafe.cpp @@ -34,6 +34,8 @@ QPID_TSS bool inContext = false; bool isClusterSafe() { return !inCluster || inContext; } +bool isCluster() { return inCluster; } + void assertClusterSafe() { if (!isClusterSafe()) { QPID_LOG(critical, "Modified cluster state outside of cluster context"); @@ -51,16 +53,6 @@ ClusterSafeScope::~ClusterSafeScope() { inContext = save; } -ClusterUnsafeScope::ClusterUnsafeScope() { - save = inContext; - inContext = false; -} - -ClusterUnsafeScope::~ClusterUnsafeScope() { - assert(!inContext); - inContext = save; -} - void enableClusterSafe() { inCluster = true; } }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/ClusterSafe.h b/cpp/src/qpid/sys/ClusterSafe.h index 27e4eb46a5..15675e8cc5 100644 --- a/cpp/src/qpid/sys/ClusterSafe.h +++ b/cpp/src/qpid/sys/ClusterSafe.h @@ -52,9 +52,14 @@ QPID_COMMON_EXTERN void assertClusterSafe(); */ QPID_COMMON_EXTERN bool isClusterSafe(); +/** Return true in a clustered broker */ +QPID_COMMON_EXTERN bool isCluster(); + /** - * Mark a scope as cluster safe. Sets isClusterSafe in constructor and resets - * to previous value in destructor. + * Base class for classes that encapsulate state which is replicated + * to all members of a cluster. Acts as a marker for clustered state + * and provides functions to assist detecting bugs in cluster + * behavior. */ class ClusterSafeScope { public: @@ -65,18 +70,6 @@ class ClusterSafeScope { }; /** - * Mark a scope as cluster unsafe. Clears isClusterSafe in constructor and resets - * to previous value in destructor. - */ -class ClusterUnsafeScope { - public: - QPID_COMMON_EXTERN ClusterUnsafeScope(); - QPID_COMMON_EXTERN ~ClusterUnsafeScope(); - private: - bool save; -}; - -/** * Enable cluster-safe assertions. By default they are no-ops. * Called by cluster code. */ diff --git a/cpp/src/qpid/sys/CopyOnWriteArray.h b/cpp/src/qpid/sys/CopyOnWriteArray.h index 41384fc5a4..45a231dfd8 100644 --- a/cpp/src/qpid/sys/CopyOnWriteArray.h +++ b/cpp/src/qpid/sys/CopyOnWriteArray.h @@ -43,12 +43,6 @@ public: CopyOnWriteArray() {} CopyOnWriteArray(const CopyOnWriteArray& c) : array(c.array) {} - bool empty() - { - Mutex::ScopedLock l(lock); - return array ? array->empty() : true; - } - void add(T& t) { Mutex::ScopedLock l(lock); diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h index 03b9d0084d..81c2301c1e 100644 --- a/cpp/src/qpid/sys/PollableQueue.h +++ b/cpp/src/qpid/sys/PollableQueue.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -28,8 +28,7 @@ #include <boost/function.hpp> #include <boost/bind.hpp> #include <algorithm> -#include <deque> -#include "qpid/log/Statement.h" // FIXME aconway 2011-08-05: +#include <vector> namespace qpid { namespace sys { @@ -45,7 +44,7 @@ class Poller; template <class T> class PollableQueue { public: - typedef std::deque<T> Batch; + typedef std::vector<T> Batch; typedef T value_type; /** @@ -69,11 +68,11 @@ class PollableQueue { const boost::shared_ptr<sys::Poller>& poller); ~PollableQueue(); - + /** Push a value onto the queue. Thread safe */ void push(const T& t); - /** Start polling. */ + /** Start polling. */ void start(); /** Stop polling and wait for the current callback, if any, to complete. */ @@ -91,14 +90,14 @@ class PollableQueue { * ensure clean shutdown with no events left on the queue. */ void shutdown(); - + private: typedef sys::Monitor::ScopedLock ScopedLock; typedef sys::Monitor::ScopedUnlock ScopedUnlock; void dispatch(PollableCondition& cond); void process(); - + mutable sys::Monitor lock; Callback callback; PollableCondition condition; @@ -108,7 +107,7 @@ class PollableQueue { }; template <class T> PollableQueue<T>::PollableQueue( - const Callback& cb, const boost::shared_ptr<sys::Poller>& p) + const Callback& cb, const boost::shared_ptr<sys::Poller>& p) : callback(cb), condition(boost::bind(&PollableQueue<T>::dispatch, this, _1), p), stopped(true) @@ -152,7 +151,7 @@ template <class T> void PollableQueue<T>::process() { putBack = callback(batch); } // put back unprocessed items. - queue.insert(queue.begin(), putBack, typename Batch::const_iterator(batch.end())); + queue.insert(queue.begin(), putBack, typename Batch::const_iterator(batch.end())); batch.clear(); } } diff --git a/cpp/src/qpid/sys/Poller.h b/cpp/src/qpid/sys/Poller.h index 01ee139ee6..ec53b79bad 100644 --- a/cpp/src/qpid/sys/Poller.h +++ b/cpp/src/qpid/sys/Poller.h @@ -120,7 +120,7 @@ class PollerHandle { friend struct Poller::Event; PollerHandlePrivate* const impl; - QPID_COMMON_INLINE_EXTERN virtual void processEvent(Poller::EventType) {}; + QPID_COMMON_EXTERN virtual void processEvent(Poller::EventType) {}; public: QPID_COMMON_EXTERN PollerHandle(const IOHandle& h); diff --git a/cpp/src/qpid/sys/ProtocolFactory.h b/cpp/src/qpid/sys/ProtocolFactory.h index 4d198a92da..b233b2da1a 100644 --- a/cpp/src/qpid/sys/ProtocolFactory.h +++ b/cpp/src/qpid/sys/ProtocolFactory.h @@ -39,10 +39,11 @@ class ProtocolFactory : public qpid::SharedObject<ProtocolFactory> virtual ~ProtocolFactory() = 0; virtual uint16_t getPort() const = 0; + virtual std::string getHost() const = 0; virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0; virtual void connect( boost::shared_ptr<Poller>, - const std::string& host, const std::string& port, + const std::string& host, int16_t port, ConnectionCodec::Factory* codec, ConnectFailedCallback failed) = 0; virtual bool supports(const std::string& /*capability*/) { return false; } diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp index 6769e5383c..d53db20598 100644 --- a/cpp/src/qpid/sys/RdmaIOPlugin.cpp +++ b/cpp/src/qpid/sys/RdmaIOPlugin.cpp @@ -31,6 +31,7 @@ #include "qpid/sys/SecuritySettings.h" #include <boost/bind.hpp> +#include <boost/lexical_cast.hpp> #include <memory> #include <netdb.h> @@ -211,9 +212,10 @@ void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { if (readError) { return; } + size_t decoded = 0; try { if (codec) { - (void) codec->decode(buff->bytes(), buff->dataCount()); + decoded = codec->decode(buff->bytes(), buff->dataCount()); }else{ // Need to start protocol processing initProtocolIn(buff); @@ -228,7 +230,9 @@ void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) { framing::Buffer in(buff->bytes(), buff->dataCount()); framing::ProtocolInitiation protocolInit; + size_t decoded = 0; if (protocolInit.decode(in)) { + decoded = in.getPosition(); QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")"); codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings()); @@ -250,9 +254,10 @@ class RdmaIOProtocolFactory : public ProtocolFactory { public: RdmaIOProtocolFactory(int16_t port, int backlog); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); - void connect(Poller::shared_ptr, const string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback); + void connect(Poller::shared_ptr, const string& host, int16_t port, ConnectionCodec::Factory*, ConnectFailedCallback); uint16_t getPort() const; + string getHost() const; private: bool request(Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectionCodec::Factory*); @@ -342,7 +347,18 @@ uint16_t RdmaIOProtocolFactory::getPort() const { return listeningPort; // Immutable no need for lock. } +string RdmaIOProtocolFactory::getHost() const { + //return listener.getSockname(); + return ""; +} + void RdmaIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { + ::sockaddr_in sin; + + sin.sin_family = AF_INET; + sin.sin_port = htons(listeningPort); + sin.sin_addr.s_addr = INADDR_ANY; + listener.reset( new Rdma::Listener( Rdma::ConnectionParams(65536, Rdma::DEFAULT_WR_ENTRIES), @@ -371,7 +387,7 @@ void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connectio void RdmaIOProtocolFactory::connect( Poller::shared_ptr poller, - const std::string& host, const std::string& port, + const std::string& host, int16_t port, ConnectionCodec::Factory* f, ConnectFailedCallback failed) { @@ -383,7 +399,7 @@ void RdmaIOProtocolFactory::connect( boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1), boost::bind(&RdmaIOProtocolFactory::rejected, this, _1, _2, failed)); - SocketAddress sa(host, port); + SocketAddress sa(host, boost::lexical_cast<std::string>(port)); c->start(poller, sa); } diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h index 25f1c5fb9d..7d50afc59f 100644 --- a/cpp/src/qpid/sys/Socket.h +++ b/cpp/src/qpid/sys/Socket.h @@ -33,21 +33,21 @@ namespace sys { class Duration; class SocketAddress; -class QPID_COMMON_CLASS_EXTERN Socket : public IOHandle +class Socket : public IOHandle { public: /** Create a socket wrapper for descriptor. */ QPID_COMMON_EXTERN Socket(); - /** Create a new Socket which is the same address family as this one */ - QPID_COMMON_EXTERN Socket* createSameTypeSocket() const; + /** Set timeout for read and write */ + void setTimeout(const Duration& interval) const; /** Set socket non blocking */ void setNonblocking() const; QPID_COMMON_EXTERN void setTcpNoDelay() const; - QPID_COMMON_EXTERN void connect(const std::string& host, const std::string& port) const; + QPID_COMMON_EXTERN void connect(const std::string& host, uint16_t port) const; QPID_COMMON_EXTERN void connect(const SocketAddress&) const; QPID_COMMON_EXTERN void close() const; @@ -57,9 +57,19 @@ public: *@param backlog maximum number of pending connections. *@return The bound port. */ - QPID_COMMON_EXTERN int listen(const std::string& host = "", const std::string& port = "0", int backlog = 10) const; + QPID_COMMON_EXTERN int listen(uint16_t port = 0, int backlog = 10) const; QPID_COMMON_EXTERN int listen(const SocketAddress&, int backlog = 10) const; + /** Returns the "socket name" ie the address bound to + * the near end of the socket + */ + QPID_COMMON_EXTERN std::string getSockname() const; + + /** Returns the "peer name" ie the address bound to + * the remote end of the socket + */ + std::string getPeername() const; + /** * Returns an address (host and port) for the remote end of the * socket @@ -74,13 +84,16 @@ public: /** * Returns the full address of the connection: local and remote host and port. */ - QPID_COMMON_INLINE_EXTERN std::string getFullAddress() const { return getLocalAddress()+"-"+getPeerAddress(); } + QPID_COMMON_EXTERN std::string getFullAddress() const { return getLocalAddress()+"-"+getPeerAddress(); } + + QPID_COMMON_EXTERN uint16_t getLocalPort() const; + uint16_t getRemotePort() const; /** * Returns the error code stored in the socket. This may be used * to determine the result of a non-blocking connect. */ - QPID_COMMON_EXTERN int getError() const; + int getError() const; /** Accept a connection from a socket that is already listening * and has an incoming connection @@ -95,11 +108,8 @@ private: /** Create socket */ void createSocket(const SocketAddress&) const; - /** Construct socket with existing handle */ Socket(IOHandlePrivate*); - - mutable std::string localname; - mutable std::string peername; + mutable std::string connectname; mutable bool nonblocking; mutable bool nodelay; }; diff --git a/cpp/src/qpid/sys/SocketAddress.h b/cpp/src/qpid/sys/SocketAddress.h index dcca109d94..27b9642f2c 100644 --- a/cpp/src/qpid/sys/SocketAddress.h +++ b/cpp/src/qpid/sys/SocketAddress.h @@ -27,7 +27,6 @@ #include <string> struct addrinfo; -struct sockaddr; namespace qpid { namespace sys { @@ -42,19 +41,12 @@ public: QPID_COMMON_EXTERN SocketAddress& operator=(const SocketAddress&); QPID_COMMON_EXTERN ~SocketAddress(); - QPID_COMMON_EXTERN bool nextAddress(); - QPID_COMMON_EXTERN std::string asString(bool numeric=true) const; - QPID_COMMON_EXTERN void setAddrInfoPort(uint16_t port); - - QPID_COMMON_EXTERN static std::string asString(::sockaddr const * const addr, size_t addrlen); - QPID_COMMON_EXTERN static uint16_t getPort(::sockaddr const * const addr); - + std::string asString() const; private: std::string host; std::string port; mutable ::addrinfo* addrInfo; - mutable ::addrinfo* currentAddrInfo; }; }} diff --git a/cpp/src/qpid/sys/SslPlugin.cpp b/cpp/src/qpid/sys/SslPlugin.cpp index 471a0cef60..b0e791d60b 100644 --- a/cpp/src/qpid/sys/SslPlugin.cpp +++ b/cpp/src/qpid/sys/SslPlugin.cpp @@ -66,11 +66,12 @@ class SslProtocolFactory : public ProtocolFactory { public: SslProtocolFactory(const SslServerOptions&, int backlog, bool nodelay); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); - void connect(Poller::shared_ptr, const std::string& host, const std::string& port, + void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*, boost::function2<void, int, std::string> failed); uint16_t getPort() const; + std::string getHost() const; bool supports(const std::string& capability); private: @@ -94,7 +95,7 @@ static struct SslPlugin : public Plugin { // Only provide to a Broker if (broker) { if (options.certDbPath.empty()) { - QPID_LOG(notice, "SSL plugin not enabled, you must set --ssl-cert-db to enable it."); + QPID_LOG(info, "SSL plugin not enabled, you must set --ssl-cert-db to enable it."); } else { try { ssl::initNSS(options, true); @@ -145,6 +146,10 @@ uint16_t SslProtocolFactory::getPort() const { return listeningPort; // Immutable no need for lock. } +std::string SslProtocolFactory::getHost() const { + return listener.getSockname(); +} + void SslProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { acceptor.reset( @@ -155,7 +160,7 @@ void SslProtocolFactory::accept(Poller::shared_ptr poller, void SslProtocolFactory::connect( Poller::shared_ptr poller, - const std::string& host, const std::string& port, + const std::string& host, int16_t port, ConnectionCodec::Factory* fact, ConnectFailedCallback failed) { diff --git a/cpp/src/qpid/sys/StateMonitor.h b/cpp/src/qpid/sys/StateMonitor.h index eac37a8543..5a92756f3a 100644 --- a/cpp/src/qpid/sys/StateMonitor.h +++ b/cpp/src/qpid/sys/StateMonitor.h @@ -41,9 +41,9 @@ class StateMonitor : public Waitable struct Set : public std::bitset<MaxEnum + 1> { Set() {} Set(Enum s) { set(s); } - Set(Enum s, Enum t) { std::bitset<MaxEnum + 1>::set(s).set(t); } - Set(Enum s, Enum t, Enum u) { std::bitset<MaxEnum + 1>::set(s).set(t).set(u); } - Set(Enum s, Enum t, Enum u, Enum v) { std::bitset<MaxEnum + 1>::set(s).set(t).set(u).set(v); } + Set(Enum s, Enum t) { set(s).set(t); } + Set(Enum s, Enum t, Enum u) { set(s).set(t).set(u); } + Set(Enum s, Enum t, Enum u, Enum v) { set(s).set(t).set(u).set(v); } }; @@ -60,13 +60,13 @@ class StateMonitor : public Waitable operator Enum() const { return state; } /** @pre Caller holds a ScopedLock */ - void waitFor(Enum s) { ScopedWait w(*this); while (s != state) wait(); } + void waitFor(Enum s) { ScopedWait(*this); while (s != state) wait(); } /** @pre Caller holds a ScopedLock */ - void waitFor(Set s) { ScopedWait w(*this); while (!s.test(state)) wait(); } + void waitFor(Set s) { ScopedWait(*this); while (!s.test(state)) wait(); } /** @pre Caller holds a ScopedLock */ - void waitNot(Enum s) { ScopedWait w(*this); while (s == state) wait(); } + void waitNot(Enum s) { ScopedWait(*this); while (s == state) wait(); } /** @pre Caller holds a ScopedLock */ - void waitNot(Set s) { ScopedWait w(*this); while (s.test(state)) wait(); } + void waitNot(Set s) { ScopedWait(*this); while (s.test(state)) wait(); } private: Enum state; diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp index 85d8c1db87..a6528f9ad9 100644 --- a/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -25,31 +25,31 @@ #include "qpid/Plugin.h" #include "qpid/sys/Socket.h" -#include "qpid/sys/SocketAddress.h" #include "qpid/sys/Poller.h" #include "qpid/broker/Broker.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> -#include <boost/ptr_container/ptr_vector.hpp> +#include <memory> namespace qpid { namespace sys { class AsynchIOProtocolFactory : public ProtocolFactory { const bool tcpNoDelay; - boost::ptr_vector<Socket> listeners; - boost::ptr_vector<AsynchAcceptor> acceptors; - uint16_t listeningPort; + Socket listener; + const uint16_t listeningPort; + std::auto_ptr<AsynchAcceptor> acceptor; public: - AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay); + AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); - void connect(Poller::shared_ptr, const std::string& host, const std::string& port, + void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*, ConnectFailedCallback); uint16_t getPort() const; + std::string getHost() const; private: void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, @@ -61,49 +61,23 @@ class AsynchIOProtocolFactory : public ProtocolFactory { static class TCPIOPlugin : public Plugin { void earlyInitialize(Target&) { } - + void initialize(Target& target) { broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); // Only provide to a Broker if (broker) { const broker::Broker::Options& opts = broker->getOptions(); - ProtocolFactory::shared_ptr protocolt( - new AsynchIOProtocolFactory( - "", boost::lexical_cast<std::string>(opts.port), - opts.connectionBacklog, - opts.tcpNoDelay)); - QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort()); - broker->registerProtocolFactory("tcp", protocolt); + ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog, + opts.tcpNoDelay)); + QPID_LOG(notice, "Listening on TCP port " << protocol->getPort()); + broker->registerProtocolFactory("tcp", protocol); } } } tcpPlugin; -AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay) : - tcpNoDelay(nodelay) -{ - SocketAddress sa(host, port); - - // We must have at least one resolved address - QPID_LOG(info, "Listening to: " << sa.asString()) - Socket* s = new Socket; - uint16_t lport = s->listen(sa, backlog); - QPID_LOG(debug, "Listened to: " << lport); - listeners.push_back(s); - - listeningPort = lport; - - // Try any other resolved addresses - while (sa.nextAddress()) { - // Hack to ensure that all listening connections are on the same port - sa.setAddrInfoPort(listeningPort); - QPID_LOG(info, "Listening to: " << sa.asString()) - Socket* s = new Socket; - uint16_t lport = s->listen(sa, backlog); - QPID_LOG(debug, "Listened to: " << lport); - listeners.push_back(s); - } - -} +AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay) : + tcpNoDelay(nodelay), listeningPort(listener.listen(port, backlog)) +{} void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f, bool isClient) { @@ -133,14 +107,16 @@ uint16_t AsynchIOProtocolFactory::getPort() const { return listeningPort; // Immutable no need for lock. } +std::string AsynchIOProtocolFactory::getHost() const { + return listener.getSockname(); +} + void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { - for (unsigned i = 0; i<listeners.size(); ++i) { - acceptors.push_back( - AsynchAcceptor::create(listeners[i], - boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false))); - acceptors[i].start(poller); - } + acceptor.reset( + AsynchAcceptor::create(listener, + boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false))); + acceptor->start(poller); } void AsynchIOProtocolFactory::connectFailed( @@ -154,7 +130,7 @@ void AsynchIOProtocolFactory::connectFailed( void AsynchIOProtocolFactory::connect( Poller::shared_ptr poller, - const std::string& host, const std::string& port, + const std::string& host, int16_t port, ConnectionCodec::Factory* fact, ConnectFailedCallback failed) { @@ -163,8 +139,8 @@ void AsynchIOProtocolFactory::connect( // upon connection failure or by the AsynchIO upon connection // shutdown. The allocated AsynchConnector frees itself when it // is no longer needed. + Socket* socket = new Socket(); - try { AsynchConnector* c = AsynchConnector::create( *socket, host, @@ -174,12 +150,6 @@ void AsynchIOProtocolFactory::connect( boost::bind(&AsynchIOProtocolFactory::connectFailed, this, _1, _2, _3, failed)); c->start(poller); - } catch (std::exception&) { - // TODO: Design question - should we do the error callback and also throw? - int errCode = socket->getError(); - connectFailed(*socket, errCode, strError(errCode), failed); - throw; - } } }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/Timer.cpp b/cpp/src/qpid/sys/Timer.cpp index 47752e4584..a97ccd1bd1 100644 --- a/cpp/src/qpid/sys/Timer.cpp +++ b/cpp/src/qpid/sys/Timer.cpp @@ -75,12 +75,6 @@ void TimerTask::cancel() { cancelled = true; } -void TimerTask::setFired() { - // Set nextFireTime to just before now, making readyToFire() true. - nextFireTime = AbsTime(sys::now(), Duration(-1)); -} - - Timer::Timer() : active(false), late(50 * TIME_MSEC), @@ -137,14 +131,12 @@ void Timer::run() bool warningsEnabled; QPID_LOG_TEST(warning, warningsEnabled); if (warningsEnabled) { - if (overrun > overran) { - if (delay > overran) // if delay is significant to an overrun. - warn.lateAndOverran(t->name, delay, overrun, Duration(start, end)); - else - warn.overran(t->name, overrun, Duration(start, end)); - } + if (delay > late && overrun > overran) + warn.lateAndOverran(t->name, delay, overrun, Duration(start, end)); else if (delay > late) warn.late(t->name, delay); + else if (overrun > overran) + warn.overran(t->name, overrun, Duration(start, end)); } continue; } else { @@ -191,11 +183,7 @@ void Timer::stop() // Allow subclasses to override behavior when firing a task. void Timer::fire(boost::intrusive_ptr<TimerTask> t) { - try { - t->fireTask(); - } catch (const std::exception& e) { - QPID_LOG(error, "Exception thrown by timer task " << t->getName() << ": " << e.what()); - } + t->fireTask(); } // Provided for subclasses: called when a task is droped. diff --git a/cpp/src/qpid/sys/Timer.h b/cpp/src/qpid/sys/Timer.h index fccb17dbc2..98ba39ce38 100644 --- a/cpp/src/qpid/sys/Timer.h +++ b/cpp/src/qpid/sys/Timer.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -64,10 +64,6 @@ class TimerTask : public RefCounted { std::string getName() const { return name; } - // Move the nextFireTime so readyToFire is true. - // Used by the cluster, where tasks are fired on cluster events, not on local time. - QPID_COMMON_EXTERN void setFired(); - protected: // Must be overridden with callback virtual void fire() = 0; diff --git a/cpp/src/qpid/sys/TimerWarnings.cpp b/cpp/src/qpid/sys/TimerWarnings.cpp index 85e26da54a..48a56eb472 100644 --- a/cpp/src/qpid/sys/TimerWarnings.cpp +++ b/cpp/src/qpid/sys/TimerWarnings.cpp @@ -56,22 +56,20 @@ void TimerWarnings::log() { std::string task = i->first; TaskStats& stats = i->second; if (stats.lateDelay.count) - QPID_LOG(info, task << " task late " + QPID_LOG(warning, task << " task late " << stats.lateDelay.count << " times by " << stats.lateDelay.average()/TIME_MSEC << "ms on average."); - if (stats.overranOverrun.count) - QPID_LOG(info, task << " task overran " + QPID_LOG(warning, task << " task overran " << stats.overranOverrun.count << " times by " << stats.overranOverrun.average()/TIME_MSEC << "ms (taking " << stats.overranTime.average() << "ns) on average."); - if (stats.lateAndOverranOverrun.count) - QPID_LOG(info, task << " task late and overran " - << stats.lateAndOverranOverrun.count << " times: late " - << stats.lateAndOverranDelay.average()/TIME_MSEC << "ms, overran " - << stats.lateAndOverranOverrun.average()/TIME_MSEC << "ms (taking " - << stats.lateAndOverranTime.average() << "ns) on average."); + if (stats.lateAndOverranDelay.count) + QPID_LOG(warning, task << " task overran " + << stats.overranOverrun.count << " times by " + << stats.overranOverrun.average()/TIME_MSEC << "ms (taking " + << stats.overranTime.average() << "ns) on average."); } nextReport = AbsTime(now(), interval); diff --git a/cpp/src/qpid/sys/alloca.h b/cpp/src/qpid/sys/alloca.h index b3f59b7c3f..e989670e4f 100644 --- a/cpp/src/qpid/sys/alloca.h +++ b/cpp/src/qpid/sys/alloca.h @@ -21,22 +21,19 @@ * */ -#if (defined(_WINDOWS) || defined (WIN32)) -# include <malloc.h> - -# if defined(_MSC_VER) -# ifdef alloc -# undef alloc -# endif -# define alloc _alloc -# ifdef alloca -# undef alloca -# endif -# define alloca _alloca -# endif +#if (defined(_WINDOWS) || defined (WIN32)) && defined(_MSC_VER) +#include <malloc.h> +#ifdef alloc +# undef alloc +#endif +#define alloc _alloc +#ifdef alloca +# undef alloca +#endif +#define alloca _alloca #endif #if !defined _WINDOWS && !defined WIN32 -# include <alloca.h> +#include <alloca.h> #endif #endif /*!QPID_SYS_ALLOCA_H*/ diff --git a/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp b/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp index 249b769051..454ce62495 100644 --- a/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp +++ b/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp @@ -57,7 +57,6 @@ size_t CyrusSecurityLayer::decode(const char* input, size_t size) copied += count; decodeBuffer.position += count; size_t decodedSize = codec->decode(decodeBuffer.data, decodeBuffer.position); - if (decodedSize == 0) break; if (decodedSize < decodeBuffer.position) { ::memmove(decodeBuffer.data, decodeBuffer.data + decodedSize, decodeBuffer.position - decodedSize); } @@ -107,7 +106,7 @@ size_t CyrusSecurityLayer::encode(const char* buffer, size_t size) bool CyrusSecurityLayer::canEncode() { - return codec && (encrypted || codec->canEncode()); + return encrypted || codec->canEncode(); } void CyrusSecurityLayer::init(qpid::sys::Codec* c) diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp index dcc9d9181c..9ad05c71a3 100644 --- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp +++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -384,12 +384,7 @@ void PollerPrivate::resetMode(PollerHandlePrivate& eh) { epe.data.u64 = 0; // Keep valgrind happy epe.data.ptr = &eh; - int rc = ::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd(), &epe); - // If something has closed the fd in the meantime try adding it back - if (rc ==-1 && errno == ENOENT) { - rc = ::epoll_ctl(epollFd, EPOLL_CTL_ADD, eh.fd(), &epe); - } - QPID_POSIX_CHECK(rc); + QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd(), &epe)); eh.setActive(); return; diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index dab8bd09c6..119a6aa8a4 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -149,12 +149,11 @@ private: ConnectedCallback connCallback; FailedCallback failCallback; const Socket& socket; - SocketAddress sa; public: AsynchConnector(const Socket& socket, - const std::string& hostname, - const std::string& port, + std::string hostname, + uint16_t port, ConnectedCallback connCb, FailedCallback failCb); void start(Poller::shared_ptr poller); @@ -162,8 +161,8 @@ public: }; AsynchConnector::AsynchConnector(const Socket& s, - const std::string& hostname, - const std::string& port, + std::string hostname, + uint16_t port, ConnectedCallback connCb, FailedCallback failCb) : DispatchHandle(s, @@ -172,13 +171,11 @@ AsynchConnector::AsynchConnector(const Socket& s, boost::bind(&AsynchConnector::connComplete, this, _1)), connCallback(connCb), failCallback(failCb), - socket(s), - sa(hostname, port) + socket(s) { socket.setNonblocking(); - + SocketAddress sa(hostname, boost::lexical_cast<std::string>(port)); // Note, not catching any exceptions here, also has effect of destructing - QPID_LOG(info, "Connecting: " << sa.asString()); socket.connect(sa); } @@ -194,26 +191,11 @@ void AsynchConnector::stop() void AsynchConnector::connComplete(DispatchHandle& h) { + h.stopWatch(); int errCode = socket.getError(); if (errCode == 0) { - h.stopWatch(); connCallback(socket); } else { - // Retry while we cause an immediate exception - // (asynch failure will be handled by re-entering here at the top) - while (sa.nextAddress()) { - try { - // Try next address without deleting ourselves - QPID_LOG(debug, "Ignored socket connect error: " << strError(errCode)); - QPID_LOG(info, "Retrying connect: " << sa.asString()); - socket.connect(sa); - return; - } catch (const std::exception& e) { - QPID_LOG(debug, "Ignored socket connect exception: " << e.what()); - } - errCode = socket.getError(); - } - h.stopWatch(); failCallback(socket, errCode, strError(errCode)); } DispatchHandle::doDelete(); @@ -607,8 +589,8 @@ AsynchAcceptor* AsynchAcceptor::create(const Socket& s, } AsynchConnector* AsynchConnector::create(const Socket& s, - const std::string& hostname, - const std::string& port, + std::string hostname, + uint16_t port, ConnectedCallback connCb, FailedCallback failCb) { diff --git a/cpp/src/qpid/sys/posix/LockFile.cpp b/cpp/src/qpid/sys/posix/LockFile.cpp index f5a6c292cb..1862ff6ac9 100755 --- a/cpp/src/qpid/sys/posix/LockFile.cpp +++ b/cpp/src/qpid/sys/posix/LockFile.cpp @@ -58,7 +58,8 @@ LockFile::~LockFile() { if (impl) { int f = impl->fd; if (f >= 0) { - (void) ::lockf(f, F_ULOCK, 0); // Suppress warnings about ignoring return value. + int unused_ret; + unused_ret = ::lockf(f, F_ULOCK, 0); // Suppress warnings about ignoring return value. ::close(f); impl->fd = -1; } diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp index 4a6dc66f80..7b906f33e8 100644 --- a/cpp/src/qpid/sys/posix/Socket.cpp +++ b/cpp/src/qpid/sys/posix/Socket.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,35 +34,65 @@ #include <netdb.h> #include <cstdlib> #include <string.h> +#include <iostream> + +#include <boost/format.hpp> +#include <boost/lexical_cast.hpp> namespace qpid { namespace sys { namespace { -std::string getName(int fd, bool local) +std::string getName(int fd, bool local, bool includeService = false) { - ::sockaddr_storage name_s; // big enough for any socket address - ::sockaddr* name = (::sockaddr*)&name_s; - ::socklen_t namelen = sizeof(name_s); - + ::sockaddr_storage name; // big enough for any socket address + ::socklen_t namelen = sizeof(name); + + int result = -1; if (local) { - QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) ); + result = ::getsockname(fd, (::sockaddr*)&name, &namelen); } else { - QPID_POSIX_CHECK( ::getpeername(fd, name, &namelen) ); + result = ::getpeername(fd, (::sockaddr*)&name, &namelen); } - return SocketAddress::asString(name, namelen); + QPID_POSIX_CHECK(result); + + char servName[NI_MAXSERV]; + char dispName[NI_MAXHOST]; + if (includeService) { + if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), + servName, sizeof(servName), + NI_NUMERICHOST | NI_NUMERICSERV) != 0) + throw QPID_POSIX_ERROR(rc); + return std::string(dispName) + ":" + std::string(servName); + + } else { + if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), 0, 0, NI_NUMERICHOST) != 0) + throw QPID_POSIX_ERROR(rc); + return dispName; + } } -uint16_t getLocalPort(int fd) +std::string getService(int fd, bool local) { - ::sockaddr_storage name_s; // big enough for any socket address - ::sockaddr* name = (::sockaddr*)&name_s; - ::socklen_t namelen = sizeof(name_s); + ::sockaddr_storage name; // big enough for any socket address + ::socklen_t namelen = sizeof(name); + + int result = -1; + if (local) { + result = ::getsockname(fd, (::sockaddr*)&name, &namelen); + } else { + result = ::getpeername(fd, (::sockaddr*)&name, &namelen); + } - QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) ); + QPID_POSIX_CHECK(result); - return SocketAddress::getPort(name); + char servName[NI_MAXSERV]; + if (int rc=::getnameinfo((::sockaddr*)&name, namelen, 0, 0, + servName, sizeof(servName), + NI_NUMERICHOST | NI_NUMERICSERV) != 0) + throw QPID_POSIX_ERROR(rc); + return servName; } } @@ -89,11 +119,6 @@ void Socket::createSocket(const SocketAddress& sa) const try { if (nonblocking) setNonblocking(); if (nodelay) setTcpNoDelay(); - if (getAddrInfo(sa).ai_family == AF_INET6) { - int flag = 1; - int result = ::setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&flag, sizeof(flag)); - QPID_POSIX_CHECK(result); - } } catch (std::exception&) { ::close(s); socket = -1; @@ -101,18 +126,13 @@ void Socket::createSocket(const SocketAddress& sa) const } } -Socket* Socket::createSameTypeSocket() const { - int& socket = impl->fd; - // Socket currently has no actual socket attached - if (socket == -1) - return new Socket; - - ::sockaddr_storage sa; - ::socklen_t salen = sizeof(sa); - QPID_POSIX_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen)); - int s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM - if (s < 0) throw QPID_POSIX_ERROR(errno); - return new Socket(new IOHandlePrivate(s)); +void Socket::setTimeout(const Duration& interval) const +{ + const int& socket = impl->fd; + struct timeval tv; + toTimeval(tv, interval); + setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); + setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); } void Socket::setNonblocking() const { @@ -129,27 +149,20 @@ void Socket::setTcpNoDelay() const nodelay = true; if (socket != -1) { int flag = 1; - int result = ::setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)); + int result = setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)); QPID_POSIX_CHECK(result); } } -void Socket::connect(const std::string& host, const std::string& port) const +void Socket::connect(const std::string& host, uint16_t port) const { - SocketAddress sa(host, port); + SocketAddress sa(host, boost::lexical_cast<std::string>(port)); connect(sa); } void Socket::connect(const SocketAddress& addr) const { - // The display name for an outbound connection needs to be the name that was specified - // for the address rather than a resolved IP address as we don't know which of - // the IP addresses is actually the one that will be connected to. - peername = addr.asString(false); - - // However the string we compare with the local port must be numeric or it might not - // match when it should as getLocalAddress() will always be numeric - std::string connectname = addr.asString(); + connectname = addr.asString(); createSocket(addr); @@ -157,24 +170,7 @@ void Socket::connect(const SocketAddress& addr) const // TODO the correct thing to do here is loop on failure until you've used all the returned addresses if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) && (errno != EINPROGRESS)) { - throw Exception(QPID_MSG(strError(errno) << ": " << peername)); - } - // When connecting to a port on the same host which no longer has - // a process associated with it, the OS occasionally chooses the - // remote port (which is unoccupied) as the port to bind the local - // end of the socket, resulting in a "circular" connection. - // - // This seems like something the OS should prevent but I have - // confirmed that sporadic hangs in - // cluster_tests.LongTests.test_failover on RHEL5 are caused by - // such a circular connection. - // - // Raise an error if we see such a connection, since we know there is - // no listener on the peer address. - // - if (getLocalAddress() == connectname) { - close(); - throw Exception(QPID_MSG("Connection refused: " << peername)); + throw Exception(QPID_MSG(strError(errno) << ": " << connectname)); } } @@ -187,9 +183,9 @@ Socket::close() const socket = -1; } -int Socket::listen(const std::string& host, const std::string& port, int backlog) const +int Socket::listen(uint16_t port, int backlog) const { - SocketAddress sa(host, port); + SocketAddress sa("", boost::lexical_cast<std::string>(port)); return listen(sa, backlog); } @@ -199,24 +195,26 @@ int Socket::listen(const SocketAddress& sa, int backlog) const const int& socket = impl->fd; int yes=1; - QPID_POSIX_CHECK(::setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))); + QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))); if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0) throw Exception(QPID_MSG("Can't bind to port " << sa.asString() << ": " << strError(errno))); if (::listen(socket, backlog) < 0) throw Exception(QPID_MSG("Can't listen on port " << sa.asString() << ": " << strError(errno))); - return getLocalPort(socket); + struct sockaddr_in name; + socklen_t namelen = sizeof(name); + if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0) + throw QPID_POSIX_ERROR(errno); + + return ntohs(name.sin_port); } Socket* Socket::accept() const { int afd = ::accept(impl->fd, 0, 0); - if ( afd >= 0) { - Socket* s = new Socket(new IOHandlePrivate(afd)); - s->localname = localname; - return s; - } + if ( afd >= 0) + return new Socket(new IOHandlePrivate(afd)); else if (errno == EAGAIN) return 0; else throw QPID_POSIX_ERROR(errno); @@ -232,20 +230,37 @@ int Socket::write(const void *buf, size_t count) const return ::write(impl->fd, buf, count); } +std::string Socket::getSockname() const +{ + return getName(impl->fd, true); +} + +std::string Socket::getPeername() const +{ + return getName(impl->fd, false); +} + std::string Socket::getPeerAddress() const { - if (peername.empty()) { - peername = getName(impl->fd, false); + if (connectname.empty()) { + connectname = getName(impl->fd, false, true); } - return peername; + return connectname; } std::string Socket::getLocalAddress() const { - if (localname.empty()) { - localname = getName(impl->fd, true); - } - return localname; + return getName(impl->fd, true, true); +} + +uint16_t Socket::getLocalPort() const +{ + return std::atoi(getService(impl->fd, true).c_str()); +} + +uint16_t Socket::getRemotePort() const +{ + return std::atoi(getService(impl->fd, true).c_str()); } int Socket::getError() const diff --git a/cpp/src/qpid/sys/posix/SocketAddress.cpp b/cpp/src/qpid/sys/posix/SocketAddress.cpp index 077942ef2f..8f5f29d793 100644 --- a/cpp/src/qpid/sys/posix/SocketAddress.cpp +++ b/cpp/src/qpid/sys/posix/SocketAddress.cpp @@ -21,13 +21,11 @@ #include "qpid/sys/SocketAddress.h" -#include "qpid/Exception.h" -#include "qpid/Msg.h" +#include "qpid/sys/posix/check.h" #include <sys/socket.h> -#include <netinet/in.h> -#include <netdb.h> #include <string.h> +#include <netdb.h> namespace qpid { namespace sys { @@ -48,9 +46,15 @@ SocketAddress::SocketAddress(const SocketAddress& sa) : SocketAddress& SocketAddress::operator=(const SocketAddress& sa) { - SocketAddress temp(sa); + if (&sa != this) { + host = sa.host; + port = sa.port; - std::swap(temp, *this); + if (addrInfo) { + ::freeaddrinfo(addrInfo); + addrInfo = 0; + } + } return *this; } @@ -61,61 +65,9 @@ SocketAddress::~SocketAddress() } } -std::string SocketAddress::asString(::sockaddr const * const addr, size_t addrlen) -{ - char servName[NI_MAXSERV]; - char dispName[NI_MAXHOST]; - if (int rc=::getnameinfo(addr, addrlen, - dispName, sizeof(dispName), - servName, sizeof(servName), - NI_NUMERICHOST | NI_NUMERICSERV) != 0) - throw qpid::Exception(QPID_MSG(gai_strerror(rc))); - std::string s; - switch (addr->sa_family) { - case AF_INET: s += dispName; break; - case AF_INET6: s += "["; s += dispName; s+= "]"; break; - default: throw Exception(QPID_MSG("Unexpected socket type")); - } - s += ":"; - s += servName; - return s; -} - -uint16_t SocketAddress::getPort(::sockaddr const * const addr) +std::string SocketAddress::asString() const { - switch (addr->sa_family) { - case AF_INET: return ntohs(((::sockaddr_in*)addr)->sin_port); - case AF_INET6: return ntohs(((::sockaddr_in6*)addr)->sin6_port); - default:throw Exception(QPID_MSG("Unexpected socket type")); - } -} - -std::string SocketAddress::asString(bool numeric) const -{ - if (!numeric) - return host + ":" + port; - // Canonicalise into numeric id - const ::addrinfo& ai = getAddrInfo(*this); - - return asString(ai.ai_addr, ai.ai_addrlen); -} - -bool SocketAddress::nextAddress() { - bool r = currentAddrInfo->ai_next != 0; - if (r) - currentAddrInfo = currentAddrInfo->ai_next; - return r; -} - -void SocketAddress::setAddrInfoPort(uint16_t port) { - if (!currentAddrInfo) return; - - ::addrinfo& ai = *currentAddrInfo; - switch (ai.ai_family) { - case AF_INET: ((::sockaddr_in*)ai.ai_addr)->sin_port = htons(port); return; - case AF_INET6:((::sockaddr_in6*)ai.ai_addr)->sin6_port = htons(port); return; - default: throw Exception(QPID_MSG("Unexpected socket type")); - } + return host + ":" + port; } const ::addrinfo& getAddrInfo(const SocketAddress& sa) @@ -123,8 +75,7 @@ const ::addrinfo& getAddrInfo(const SocketAddress& sa) if (!sa.addrInfo) { ::addrinfo hints; ::memset(&hints, 0, sizeof(hints)); - hints.ai_flags = AI_ADDRCONFIG; // Only use protocols that we have configured interfaces for - hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6 + hints.ai_family = AF_INET; // Change this to support IPv6 hints.ai_socktype = SOCK_STREAM; const char* node = 0; @@ -137,11 +88,10 @@ const ::addrinfo& getAddrInfo(const SocketAddress& sa) int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo); if (n != 0) - throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n))); - sa.currentAddrInfo = sa.addrInfo; + throw Exception(QPID_MSG("Cannot resolve " << sa.host << ": " << ::gai_strerror(n))); } - return *sa.currentAddrInfo; + return *sa.addrInfo; } }} diff --git a/cpp/src/qpid/sys/posix/Thread.cpp b/cpp/src/qpid/sys/posix/Thread.cpp index a1d6396763..b466733260 100644 --- a/cpp/src/qpid/sys/posix/Thread.cpp +++ b/cpp/src/qpid/sys/posix/Thread.cpp @@ -37,8 +37,7 @@ void* runRunnable(void* p) } } -class ThreadPrivate { -public: +struct ThreadPrivate { pthread_t thread; ThreadPrivate(Runnable* runnable) { diff --git a/cpp/src/qpid/sys/posix/Time.cpp b/cpp/src/qpid/sys/posix/Time.cpp index 9661f0c5e8..b3858279b4 100644 --- a/cpp/src/qpid/sys/posix/Time.cpp +++ b/cpp/src/qpid/sys/posix/Time.cpp @@ -27,7 +27,6 @@ #include <stdio.h> #include <sys/time.h> #include <unistd.h> -#include <iomanip> namespace { int64_t max_abstime() { return std::numeric_limits<int64_t>::max(); } @@ -104,12 +103,6 @@ void outputFormattedNow(std::ostream& o) { o << " "; } -void outputHiresNow(std::ostream& o) { - ::timespec time; - ::clock_gettime(CLOCK_REALTIME, &time); - o << time.tv_sec << "." << std::setw(9) << std::setfill('0') << time.tv_nsec << "s "; -} - void sleep(int secs) { ::sleep(secs); } diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp index 78bcdec68e..c80c94cba6 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -140,8 +140,8 @@ namespace Rdma { // Prepost recv buffers before we go any further qp->allocateRecvBuffers(recvBufferCount, bufferSize+FrameHeaderSize); - // Create xmit buffers, reserve space for frame header. - qp->createSendBuffers(xmitBufferCount, bufferSize, FrameHeaderSize); + // Create xmit buffers + qp->createSendBuffers(xmitBufferCount, bufferSize+FrameHeaderSize); } AsynchIO::~AsynchIO() { @@ -210,14 +210,12 @@ namespace Rdma { } break; case 1: - if (!buff) - buff = getSendBuffer(); + Buffer* ob = buff ? buff : getSendBuffer(); // Add FrameHeader after frame data FrameHeader header(credit); - assert(buff->dataCount() <= buff->byteCount()); // ensure app data doesn't impinge on reserved space. - ::memcpy(buff->bytes()+buff->dataCount(), &header, FrameHeaderSize); - buff->dataCount(buff->dataCount()+FrameHeaderSize); - qp->postSend(buff); + ::memcpy(ob->bytes()+ob->dataCount(), &header, FrameHeaderSize); + ob->dataCount(ob->dataCount()+FrameHeaderSize); + qp->postSend(ob); break; } } diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp index efe454c5be..6d38c42502 100644 --- a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp +++ b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp @@ -50,9 +50,8 @@ namespace Rdma { return count; } - Buffer::Buffer(uint32_t lkey, char* bytes, const int32_t byteCount, - const int32_t reserve) : - bufferSize(byteCount + reserve), reserved(reserve) + Buffer::Buffer(uint32_t lkey, char* bytes, const int32_t byteCount) : + bufferSize(byteCount) { sge.addr = (uintptr_t) bytes; sge.length = 0; @@ -164,21 +163,21 @@ namespace Rdma { } // Create buffers to use for writing - void QueuePair::createSendBuffers(int sendBufferCount, int bufferSize, int reserved) + void QueuePair::createSendBuffers(int sendBufferCount, int bufferSize) { assert(!smr); // Round up buffersize to cacheline (64 bytes) - int dataLength = (bufferSize+reserved+63) & (~63); + bufferSize = (bufferSize+63) & (~63); // Allocate memory block for all receive buffers - char* mem = new char [sendBufferCount * dataLength]; - smr = regMr(pd.get(), mem, sendBufferCount * dataLength, ::IBV_ACCESS_LOCAL_WRITE); + char* mem = new char [sendBufferCount * bufferSize]; + smr = regMr(pd.get(), mem, sendBufferCount * bufferSize, ::IBV_ACCESS_LOCAL_WRITE); sendBuffers.reserve(sendBufferCount); freeBuffers.reserve(sendBufferCount); for (int i = 0; i<sendBufferCount; ++i) { // Allocate xmit buffer - sendBuffers.push_back(Buffer(smr->lkey, &mem[i*dataLength], bufferSize, reserved)); + sendBuffers.push_back(Buffer(smr->lkey, &mem[i*bufferSize], bufferSize)); freeBuffers.push_back(i); } } diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.h b/cpp/src/qpid/sys/rdma/rdma_wrap.h index 8e3429027b..28bddd2165 100644 --- a/cpp/src/qpid/sys/rdma/rdma_wrap.h +++ b/cpp/src/qpid/sys/rdma/rdma_wrap.h @@ -57,9 +57,8 @@ namespace Rdma { void dataCount(int32_t); private: - Buffer(uint32_t lkey, char* bytes, const int32_t byteCount, const int32_t reserve=0); + Buffer(uint32_t lkey, char* bytes, const int32_t byteCount); int32_t bufferSize; - int32_t reserved; // for framing header ::ibv_sge sge; }; @@ -67,9 +66,8 @@ namespace Rdma { return (char*) sge.addr; } - /** return the number of bytes available for application data */ inline int32_t Buffer::byteCount() const { - return bufferSize - reserved; + return bufferSize; } inline int32_t Buffer::dataCount() const { @@ -77,8 +75,6 @@ namespace Rdma { } inline void Buffer::dataCount(int32_t s) { - // catch any attempt to overflow a buffer - assert(s <= bufferSize + reserved); sge.length = s; } @@ -140,7 +136,7 @@ namespace Rdma { typedef boost::intrusive_ptr<QueuePair> intrusive_ptr; // Create a buffers to use for writing - void createSendBuffers(int sendBufferCount, int dataSize, int headerSize); + void createSendBuffers(int sendBufferCount, int bufferSize); // Get a send buffer Buffer* getSendBuffer(); diff --git a/cpp/src/qpid/sys/ssl/SslHandler.h b/cpp/src/qpid/sys/ssl/SslHandler.h index 400fa317fd..a340109966 100644 --- a/cpp/src/qpid/sys/ssl/SslHandler.h +++ b/cpp/src/qpid/sys/ssl/SslHandler.h @@ -35,7 +35,7 @@ namespace sys { namespace ssl { class SslIO; -struct SslIOBufferBase; +class SslIOBufferBase; class SslSocket; class SslHandler : public OutputControl { diff --git a/cpp/src/qpid/sys/ssl/SslIo.cpp b/cpp/src/qpid/sys/ssl/SslIo.cpp index 734ebb483a..a58a137473 100644 --- a/cpp/src/qpid/sys/ssl/SslIo.cpp +++ b/cpp/src/qpid/sys/ssl/SslIo.cpp @@ -117,7 +117,7 @@ void SslAcceptor::readable(DispatchHandle& h) { SslConnector::SslConnector(const SslSocket& s, Poller::shared_ptr poller, std::string hostname, - std::string port, + uint16_t port, ConnectedCallback connCb, FailedCallback failCb) : DispatchHandle(s, diff --git a/cpp/src/qpid/sys/ssl/SslIo.h b/cpp/src/qpid/sys/ssl/SslIo.h index 8785852c24..53ac69d8d6 100644 --- a/cpp/src/qpid/sys/ssl/SslIo.h +++ b/cpp/src/qpid/sys/ssl/SslIo.h @@ -73,7 +73,7 @@ public: SslConnector(const SslSocket& socket, Poller::shared_ptr poller, std::string hostname, - std::string port, + uint16_t port, ConnectedCallback connCb, FailedCallback failCb = 0); diff --git a/cpp/src/qpid/sys/ssl/SslSocket.cpp b/cpp/src/qpid/sys/ssl/SslSocket.cpp index f7483a220c..01e2658877 100644 --- a/cpp/src/qpid/sys/ssl/SslSocket.cpp +++ b/cpp/src/qpid/sys/ssl/SslSocket.cpp @@ -158,7 +158,7 @@ void SslSocket::setNonblocking() const PR_SetSocketOption(socket, &option); } -void SslSocket::connect(const std::string& host, const std::string& port) const +void SslSocket::connect(const std::string& host, uint16_t port) const { std::stringstream namestream; namestream << host << ":" << port; @@ -180,7 +180,7 @@ void SslSocket::connect(const std::string& host, const std::string& port) const PRHostEnt hostEntry; PR_CHECK(PR_GetHostByName(host.data(), hostBuffer, PR_NETDB_BUF_SIZE, &hostEntry)); PRNetAddr address; - int value = PR_EnumerateHostEnt(0, &hostEntry, boost::lexical_cast<PRUint16>(port), &address); + int value = PR_EnumerateHostEnt(0, &hostEntry, port, &address); if (value < 0) { throw Exception(QPID_MSG("Error getting address for host: " << ErrorString())); } else if (value == 0) { diff --git a/cpp/src/qpid/sys/ssl/SslSocket.h b/cpp/src/qpid/sys/ssl/SslSocket.h index 993859495b..25712c98d5 100644 --- a/cpp/src/qpid/sys/ssl/SslSocket.h +++ b/cpp/src/qpid/sys/ssl/SslSocket.h @@ -53,7 +53,7 @@ public: * NSSInit().*/ void setCertName(const std::string& certName); - void connect(const std::string& host, const std::string& port) const; + void connect(const std::string& host, uint16_t port) const; void close() const; diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp index 30378d4c5f..38d8842521 100644 --- a/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -30,7 +30,6 @@ #include "qpid/log/Statement.h" #include "qpid/sys/windows/check.h" -#include "qpid/sys/windows/mingw32_compat.h" #include <boost/thread/once.hpp> @@ -47,13 +46,16 @@ namespace { /* * The function pointers for AcceptEx and ConnectEx need to be looked up - * at run time. + * at run time. Make sure this is done only once. */ -const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::Socket& s) { - SOCKET h = toSocketHandle(s); +boost::once_flag lookUpAcceptExOnce = BOOST_ONCE_INIT; +LPFN_ACCEPTEX fnAcceptEx = 0; +typedef void (*lookUpFunc)(const qpid::sys::Socket &); + +void lookUpAcceptEx() { + SOCKET h = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); GUID guidAcceptEx = WSAID_ACCEPTEX; DWORD dwBytes = 0; - LPFN_ACCEPTEX fnAcceptEx; WSAIoctl(h, SIO_GET_EXTENSION_FUNCTION_POINTER, &guidAcceptEx, @@ -63,9 +65,9 @@ const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::Socket& s) { &dwBytes, NULL, NULL); + closesocket(h); if (fnAcceptEx == 0) throw qpid::Exception(QPID_MSG("Failed to look up AcceptEx")); - return fnAcceptEx; } } @@ -92,15 +94,18 @@ private: AsynchAcceptor::Callback acceptedCallback; const Socket& socket; - const LPFN_ACCEPTEX fnAcceptEx; }; AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) : acceptedCallback(callback), - socket(s), - fnAcceptEx(lookUpAcceptEx(s)) { + socket(s) { s.setNonblocking(); +#if (BOOST_VERSION >= 103500) /* boost 1.35 or later reversed the args */ + boost::call_once(lookUpAcceptExOnce, lookUpAcceptEx); +#else + boost::call_once(lookUpAcceptEx, lookUpAcceptExOnce); +#endif } AsynchAcceptor::~AsynchAcceptor() @@ -109,8 +114,7 @@ AsynchAcceptor::~AsynchAcceptor() } void AsynchAcceptor::start(Poller::shared_ptr poller) { - PollerHandle ph = PollerHandle(socket); - poller->monitorHandle(ph, Poller::INPUT); + poller->monitorHandle(PollerHandle(socket), Poller::INPUT); restart (); } @@ -118,26 +122,25 @@ void AsynchAcceptor::restart(void) { DWORD bytesReceived = 0; // Not used, needed for AcceptEx API AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback, this, - socket); + toSocketHandle(socket)); BOOL status; - status = fnAcceptEx(toSocketHandle(socket), - toSocketHandle(*result->newSocket), - result->addressBuffer, - 0, - AsynchAcceptResult::SOCKADDRMAXLEN, - AsynchAcceptResult::SOCKADDRMAXLEN, - &bytesReceived, - result->overlapped()); + status = ::fnAcceptEx(toSocketHandle(socket), + toSocketHandle(*result->newSocket), + result->addressBuffer, + 0, + AsynchAcceptResult::SOCKADDRMAXLEN, + AsynchAcceptResult::SOCKADDRMAXLEN, + &bytesReceived, + result->overlapped()); QPID_WINDOWS_CHECK_ASYNC_START(status); } AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb, AsynchAcceptor *acceptor, - const Socket& listener) - : callback(cb), acceptor(acceptor), - listener(toSocketHandle(listener)), - newSocket(listener.createSameTypeSocket()) { + SOCKET listener) + : callback(cb), acceptor(acceptor), listener(listener) { + newSocket.reset (new Socket()); } void AsynchAcceptResult::success(size_t /*bytesTransferred*/) { @@ -151,7 +154,7 @@ void AsynchAcceptResult::success(size_t /*bytesTransferred*/) { delete this; } -void AsynchAcceptResult::failure(int /*status*/) { +void AsynchAcceptResult::failure(int status) { //if (status != WSA_OPERATION_ABORTED) // Can there be anything else? ; delete this; @@ -170,20 +173,20 @@ private: FailedCallback failCallback; const Socket& socket; const std::string hostname; - const std::string port; + const uint16_t port; public: AsynchConnector(const Socket& socket, - const std::string& hostname, - const std::string& port, + std::string hostname, + uint16_t port, ConnectedCallback connCb, FailedCallback failCb = 0); void start(Poller::shared_ptr poller); }; AsynchConnector::AsynchConnector(const Socket& sock, - const std::string& hname, - const std::string& p, + std::string hname, + uint16_t p, ConnectedCallback connCb, FailedCallback failCb) : connCallback(connCb), failCallback(failCb), socket(sock), @@ -213,8 +216,8 @@ AsynchAcceptor* AsynchAcceptor::create(const Socket& s, } AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s, - const std::string& hostname, - const std::string& port, + std::string hostname, + uint16_t port, ConnectedCallback connCb, FailedCallback failCb) { @@ -407,9 +410,8 @@ void AsynchIO::queueForDeletion() { } void AsynchIO::start(Poller::shared_ptr poller0) { - PollerHandle ph = PollerHandle(socket); poller = poller0; - poller->monitorHandle(ph, Poller::INPUT); + poller->monitorHandle(PollerHandle(socket), Poller::INPUT); if (writeQueue.size() > 0) // Already have data queued for write notifyPendingWrite(); startReading(); @@ -582,6 +584,7 @@ void AsynchIO::notifyIdle(void) { void AsynchIO::startWrite(AsynchIO::BufferBase* buff) { writeInProgress = true; InterlockedIncrement(&opsInProgress); + int writeCount = buff->byteCount-buff->dataCount; AsynchWriteResult *result = new AsynchWriteResult(boost::bind(&AsynchIO::completion, this, _1), buff, diff --git a/cpp/src/qpid/sys/windows/AsynchIoResult.h b/cpp/src/qpid/sys/windows/AsynchIoResult.h index 27e4c22138..66c89efc11 100755 --- a/cpp/src/qpid/sys/windows/AsynchIoResult.h +++ b/cpp/src/qpid/sys/windows/AsynchIoResult.h @@ -83,22 +83,22 @@ class AsynchAcceptResult : public AsynchResult { public: AsynchAcceptResult(qpid::sys::AsynchAcceptor::Callback cb, AsynchAcceptor *acceptor, - const qpid::sys::Socket& listener); + SOCKET listener); virtual void success (size_t bytesTransferred); virtual void failure (int error); private: virtual void complete(void) {} // No-op for this class. + std::auto_ptr<qpid::sys::Socket> newSocket; qpid::sys::AsynchAcceptor::Callback callback; AsynchAcceptor *acceptor; SOCKET listener; - std::auto_ptr<qpid::sys::Socket> newSocket; // AcceptEx needs a place to write the local and remote addresses // when accepting the connection. Place those here; get enough for // IPv6 addresses, even if the socket is IPv4. - enum { SOCKADDRMAXLEN = sizeof(sockaddr_in6) + 16, + enum { SOCKADDRMAXLEN = sizeof sockaddr_in6 + 16, SOCKADDRBUFLEN = 2 * SOCKADDRMAXLEN }; char addressBuffer[SOCKADDRBUFLEN]; }; diff --git a/cpp/src/qpid/sys/windows/IocpPoller.cpp b/cpp/src/qpid/sys/windows/IocpPoller.cpp index 1805dd2cd8..d326ab02ac 100755 --- a/cpp/src/qpid/sys/windows/IocpPoller.cpp +++ b/cpp/src/qpid/sys/windows/IocpPoller.cpp @@ -152,9 +152,9 @@ void Poller::monitorHandle(PollerHandle& handle, Direction dir) { } // All no-ops... -void Poller::unmonitorHandle(PollerHandle& /*handle*/, Direction /*dir*/) {} -void Poller::registerHandle(PollerHandle& /*handle*/) {} -void Poller::unregisterHandle(PollerHandle& /*handle*/) {} +void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {} +void Poller::registerHandle(PollerHandle& handle) {} +void Poller::unregisterHandle(PollerHandle& handle) {} Poller::Event Poller::wait(Duration timeout) { DWORD timeoutMs = 0; diff --git a/cpp/src/qpid/sys/windows/SCM.cpp b/cpp/src/qpid/sys/windows/SCM.cpp deleted file mode 100644 index 4d2c74d4b9..0000000000 --- a/cpp/src/qpid/sys/windows/SCM.cpp +++ /dev/null @@ -1,332 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/log/Statement.h" -#include "qpid/sys/windows/check.h" -#include "SCM.h" - -#pragma comment(lib, "advapi32.lib") - -namespace { - -// Container that will close a SC_HANDLE upon destruction. -class AutoServiceHandle { -public: - AutoServiceHandle(SC_HANDLE h_ = NULL) : h(h_) {} - ~AutoServiceHandle() { if (h != NULL) ::CloseServiceHandle(h); } - void release() { h = NULL; } - void reset(SC_HANDLE newHandle) - { - if (h != NULL) - ::CloseServiceHandle(h); - h = newHandle; - } - operator SC_HANDLE() const { return h; } - -private: - SC_HANDLE h; -}; - -} - -namespace qpid { -namespace windows { - -SCM::SCM() : scmHandle(NULL) -{ -} - -SCM::~SCM() -{ - if (NULL != scmHandle) - ::CloseServiceHandle(scmHandle); -} - -/** - * Install this executable as a service - */ -void SCM::install(const string& serviceName, - const string& serviceDesc, - const string& args, - DWORD startType, - const string& account, - const string& password, - const string& depends) -{ - // Handle dependent service name list; Windows wants a set of nul-separated - // names ending with a double nul. - string depends2 = depends; - if (!depends2.empty()) { - // CDL to null delimiter w/ trailing double null - size_t p = 0; - while ((p = depends2.find_first_of( ',', p)) != string::npos) - depends2.replace(p, 1, 1, '\0'); - depends2.push_back('\0'); - depends2.push_back('\0'); - } - -#if 0 - // I'm nervous about adding a user/password check here. Is this a - // potential attack vector, letting users check passwords without - // control? -Steve Huston, Feb 24, 2011 - - // Validate account, password - HANDLE hToken = NULL; - bool logStatus = false; - if (!account.empty() && !password.empty() && - !(logStatus = ::LogonUserA(account.c_str(), - "", - password.c_str(), - LOGON32_LOGON_NETWORK, - LOGON32_PROVIDER_DEFAULT, - &hToken ) != 0)) - std::cout << "warning: supplied account & password failed with LogonUser." << std::endl; - if (logStatus) - ::CloseHandle(hToken); -#endif - - // Get fully qualified .exe name - char myPath[MAX_PATH]; - DWORD myPathLength = ::GetModuleFileName(NULL, myPath, MAX_PATH); - QPID_WINDOWS_CHECK_NOT(myPathLength, 0); - string imagePath(myPath, myPathLength); - if (!args.empty()) - imagePath += " " + args; - - // Ensure there's a handle to the SCM database. - openSvcManager(); - - // Create the service - SC_HANDLE svcHandle; - svcHandle = ::CreateService(scmHandle, // SCM database - serviceName.c_str(), // name of service - serviceDesc.c_str(), // name to display - SERVICE_ALL_ACCESS, // desired access - SERVICE_WIN32_OWN_PROCESS, // service type - startType, // start type - SERVICE_ERROR_NORMAL, // error cntrl type - imagePath.c_str(), // path to service's binary w/ optional arguments - NULL, // no load ordering group - NULL, // no tag identifier - depends2.empty() ? NULL : depends2.c_str(), - account.empty() ? NULL : account.c_str(), // account name, or NULL for LocalSystem - password.empty() ? NULL : password.c_str()); // password, or NULL for none - QPID_WINDOWS_CHECK_NULL(svcHandle); - ::CloseServiceHandle(svcHandle); - QPID_LOG(info, "Service installed successfully"); -} - -/** - * - */ -void SCM::uninstall(const string& serviceName) -{ - // Ensure there's a handle to the SCM database. - openSvcManager(); - AutoServiceHandle svc(::OpenService(scmHandle, - serviceName.c_str(), - DELETE)); - QPID_WINDOWS_CHECK_NULL((SC_HANDLE)svc); - QPID_WINDOWS_CHECK_NOT(::DeleteService(svc), 0); - QPID_LOG(info, "Service deleted successfully."); -} - -/** - * Attempt to start the service. - */ -void SCM::start(const string& serviceName) -{ - // Ensure we have a handle to the SCM database. - openSvcManager(); - - // Get a handle to the service. - AutoServiceHandle svc(::OpenService(scmHandle, - serviceName.c_str(), - SERVICE_ALL_ACCESS)); - QPID_WINDOWS_CHECK_NULL(svc); - - // Check the status in case the service is not stopped. - DWORD state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING); - if (state == SERVICE_STOP_PENDING) - throw qpid::Exception("Timed out waiting for running service to stop."); - - // Attempt to start the service. - QPID_WINDOWS_CHECK_NOT(::StartService(svc, 0, NULL), 0); - - QPID_LOG(info, "Service start pending..."); - - // Check the status until the service is no longer start pending. - state = waitForStateChangeFrom(svc, SERVICE_START_PENDING); - // Determine whether the service is running. - if (state == SERVICE_RUNNING) { - QPID_LOG(info, "Service started successfully"); - } - else { - throw qpid::Exception(QPID_MSG("Service not yet running; state now " << state)); - } -} - -/** - * - */ -void SCM::stop(const string& serviceName) -{ - // Ensure a handle to the SCM database. - openSvcManager(); - - // Get a handle to the service. - AutoServiceHandle svc(::OpenService(scmHandle, - serviceName.c_str(), - SERVICE_STOP | SERVICE_QUERY_STATUS | - SERVICE_ENUMERATE_DEPENDENTS)); - QPID_WINDOWS_CHECK_NULL(svc); - - // Make sure the service is not already stopped; if it's stop-pending, - // wait for it to finalize. - DWORD state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING); - if (state == SERVICE_STOPPED) { - QPID_LOG(info, "Service is already stopped"); - return; - } - - // If the service is running, dependencies must be stopped first. - std::auto_ptr<ENUM_SERVICE_STATUS> deps; - DWORD numDeps = getDependentServices(svc, deps); - for (DWORD i = 0; i < numDeps; i++) - stop(deps.get()[i].lpServiceName); - - // Dependents stopped; send a stop code to the service. - SERVICE_STATUS_PROCESS ssp; - if (!::ControlService(svc, SERVICE_CONTROL_STOP, (LPSERVICE_STATUS)&ssp)) - throw qpid::Exception(QPID_MSG("Stopping " << serviceName << ": " << - qpid::sys::strError(::GetLastError()))); - - // Wait for the service to stop. - state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING); - if (state == SERVICE_STOPPED) - QPID_LOG(info, QPID_MSG("Service " << serviceName << - " stopped successfully.")); -} - -/** - * - */ -void SCM::openSvcManager() -{ - if (NULL != scmHandle) - return; - - scmHandle = ::OpenSCManager(NULL, // local computer - NULL, // ServicesActive database - SC_MANAGER_ALL_ACCESS); // Rights - QPID_WINDOWS_CHECK_NULL(scmHandle); -} - -DWORD SCM::waitForStateChangeFrom(SC_HANDLE svc, DWORD originalState) -{ - SERVICE_STATUS_PROCESS ssStatus; - DWORD bytesNeeded; - DWORD waitTime; - if (!::QueryServiceStatusEx(svc, // handle to service - SC_STATUS_PROCESS_INFO, // information level - (LPBYTE)&ssStatus, // address of structure - sizeof(ssStatus), // size of structure - &bytesNeeded)) // size needed if buffer is too small - throw QPID_WINDOWS_ERROR(::GetLastError()); - - // Save the tick count and initial checkpoint. - DWORD startTickCount = ::GetTickCount(); - DWORD oldCheckPoint = ssStatus.dwCheckPoint; - - // Wait for the service to change out of the noted state. - while (ssStatus.dwCurrentState == originalState) { - // Do not wait longer than the wait hint. A good interval is - // one-tenth of the wait hint but not less than 1 second - // and not more than 10 seconds. - waitTime = ssStatus.dwWaitHint / 10; - if (waitTime < 1000) - waitTime = 1000; - else if (waitTime > 10000) - waitTime = 10000; - - ::Sleep(waitTime); - - // Check the status until the service is no longer stop pending. - if (!::QueryServiceStatusEx(svc, - SC_STATUS_PROCESS_INFO, - (LPBYTE) &ssStatus, - sizeof(ssStatus), - &bytesNeeded)) - throw QPID_WINDOWS_ERROR(::GetLastError()); - - if (ssStatus.dwCheckPoint > oldCheckPoint) { - // Continue to wait and check. - startTickCount = ::GetTickCount(); - oldCheckPoint = ssStatus.dwCheckPoint; - } else { - if ((::GetTickCount() - startTickCount) > ssStatus.dwWaitHint) - break; - } - } - return ssStatus.dwCurrentState; -} - -/** - * Get the services that depend on @arg svc. All dependent service info - * is returned in an array of ENUM_SERVICE_STATUS structures via @arg deps. - * - * @retval The number of dependent services. - */ -DWORD SCM::getDependentServices(SC_HANDLE svc, - std::auto_ptr<ENUM_SERVICE_STATUS>& deps) -{ - DWORD bytesNeeded; - DWORD numEntries; - - // Pass a zero-length buffer to get the required buffer size. - if (::EnumDependentServices(svc, - SERVICE_ACTIVE, - 0, - 0, - &bytesNeeded, - &numEntries)) { - // If the Enum call succeeds, then there are no dependent - // services, so do nothing. - return 0; - } - - if (::GetLastError() != ERROR_MORE_DATA) - throw QPID_WINDOWS_ERROR((::GetLastError())); - - // Allocate a buffer for the dependencies. - deps.reset((LPENUM_SERVICE_STATUS)(new char[bytesNeeded])); - // Enumerate the dependencies. - if (!::EnumDependentServices(svc, - SERVICE_ACTIVE, - deps.get(), - bytesNeeded, - &bytesNeeded, - &numEntries)) - throw QPID_WINDOWS_ERROR((::GetLastError())); - return numEntries; -} - -} } // namespace qpid::windows diff --git a/cpp/src/qpid/sys/windows/SCM.h b/cpp/src/qpid/sys/windows/SCM.h deleted file mode 100644 index 8e94ef83c7..0000000000 --- a/cpp/src/qpid/sys/windows/SCM.h +++ /dev/null @@ -1,111 +0,0 @@ -#ifndef WINDOWS_SCM_H -#define WINDOWS_SCM_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <memory> -#include <string> -using std::string; - -#ifdef UNICODE -#undef UNICODE -#endif - -#ifndef WIN32_LEAN_AND_MEAN -#define WIN32_LEAN_AND_MEAN -#endif - -#include <windows.h> - -#include "qpid/CommonImportExport.h" - -namespace qpid { -namespace windows { - -/** - * @class SCM - * - * Access the Windows Service Control Manager. - */ -class SCM -{ -public: - QPID_COMMON_EXTERN SCM(); - QPID_COMMON_EXTERN ~SCM(); - - /** - * Install this executable as a service - * - * @param serviceName The name of the service - * @param serviceDesc Description of the service's purpose - * @param args The argument list to pass into the service - * @param startType The start type: SERVICE_DEMAND_START, - * SERVICE_AUTO_START, SERVICE_DISABLED - * @param account If not empty, the account name to install this - * service under - * @param password If not empty, the account password to install this - * service with - * @param depends If not empty, a comma delimited list of services - * that must start before this one - */ - QPID_COMMON_EXTERN void install(const string& serviceName, - const string& serviceDesc, - const string& args, - DWORD startType = SERVICE_DEMAND_START, - const string& account = "NT AUTHORITY\\LocalSystem", - const string& password = "", - const string& depends = ""); - - /** - * Uninstall this executable as a service - * - * @param serviceName the name of the service - */ - QPID_COMMON_EXTERN void uninstall(const string& serviceName); - - /** - * Start the specified service - * - * @param serviceName the name of the service - */ - QPID_COMMON_EXTERN void start(const string& serviceName); - - /** - * Stop the specified service - * - * @param serviceName the name of the service - */ - QPID_COMMON_EXTERN void stop(const string &serviceName); - -private: - SC_HANDLE scmHandle; - - void openSvcManager(); - DWORD waitForStateChangeFrom(SC_HANDLE svc, DWORD originalState); - DWORD getDependentServices(SC_HANDLE svc, - std::auto_ptr<ENUM_SERVICE_STATUS>& deps); - -}; - -}} // namespace qpid::windows - -#endif /* #ifndef WINDOWS_SCM_H */ diff --git a/cpp/src/qpid/sys/windows/Shlib.cpp b/cpp/src/qpid/sys/windows/Shlib.cpp index ba18747eb4..38027de93f 100644 --- a/cpp/src/qpid/sys/windows/Shlib.cpp +++ b/cpp/src/qpid/sys/windows/Shlib.cpp @@ -44,8 +44,7 @@ void Shlib::unload() { } void* Shlib::getSymbol(const char* name) { - // Double cast avoids warning about casting function pointer to object - void *sym = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(GetProcAddress(static_cast<HMODULE>(handle), name))); + void* sym = GetProcAddress(static_cast<HMODULE>(handle), name); if (sym == NULL) throw QPID_WINDOWS_ERROR(GetLastError()); return sym; diff --git a/cpp/src/qpid/sys/windows/Socket.cpp b/cpp/src/qpid/sys/windows/Socket.cpp index 1fa4768329..11fb8b4133 100644..100755 --- a/cpp/src/qpid/sys/windows/Socket.cpp +++ b/cpp/src/qpid/sys/windows/Socket.cpp @@ -20,18 +20,19 @@ */ #include "qpid/sys/Socket.h" - #include "qpid/sys/SocketAddress.h" -#include "qpid/sys/windows/check.h" #include "qpid/sys/windows/IoHandlePrivate.h" +#include "qpid/sys/windows/check.h" +#include "qpid/sys/Time.h" -// Ensure we get all of winsock2.h -#ifndef _WIN32_WINNT -#define _WIN32_WINNT 0x0501 -#endif +#include <cstdlib> +#include <string.h> #include <winsock2.h> +#include <boost/format.hpp> +#include <boost/lexical_cast.hpp> + // Need to initialize WinSock. Ideally, this would be a singleton or embedded // in some one-time initialization function. I tried boost singleton and could // not get it to compile (and others located in google had the same problem). @@ -83,30 +84,53 @@ namespace sys { namespace { -std::string getName(SOCKET fd, bool local) +std::string getName(SOCKET fd, bool local, bool includeService = false) { - ::sockaddr_storage name_s; // big enough for any socket address - ::sockaddr* name = (::sockaddr*)&name_s; - ::socklen_t namelen = sizeof(name_s); - + sockaddr_in name; // big enough for any socket address + socklen_t namelen = sizeof(name); if (local) { - QPID_WINSOCK_CHECK(::getsockname(fd, name, &namelen)); + QPID_WINSOCK_CHECK(::getsockname(fd, (sockaddr*)&name, &namelen)); } else { - QPID_WINSOCK_CHECK(::getpeername(fd, name, &namelen)); + QPID_WINSOCK_CHECK(::getpeername(fd, (sockaddr*)&name, &namelen)); } - return SocketAddress::asString(name, namelen); + char servName[NI_MAXSERV]; + char dispName[NI_MAXHOST]; + if (includeService) { + if (int rc = ::getnameinfo((sockaddr*)&name, namelen, + dispName, sizeof(dispName), + servName, sizeof(servName), + NI_NUMERICHOST | NI_NUMERICSERV) != 0) + throw qpid::Exception(QPID_MSG(gai_strerror(rc))); + return std::string(dispName) + ":" + std::string(servName); + } else { + if (int rc = ::getnameinfo((sockaddr*)&name, namelen, + dispName, sizeof(dispName), + 0, 0, + NI_NUMERICHOST) != 0) + throw qpid::Exception(QPID_MSG(gai_strerror(rc))); + return dispName; + } } -uint16_t getLocalPort(int fd) +std::string getService(SOCKET fd, bool local) { - ::sockaddr_storage name_s; // big enough for any socket address - ::sockaddr* name = (::sockaddr*)&name_s; - ::socklen_t namelen = sizeof(name_s); - - QPID_WINSOCK_CHECK(::getsockname(fd, name, &namelen)); + sockaddr_in name; // big enough for any socket address + socklen_t namelen = sizeof(name); + + if (local) { + QPID_WINSOCK_CHECK(::getsockname(fd, (sockaddr*)&name, &namelen)); + } else { + QPID_WINSOCK_CHECK(::getpeername(fd, (sockaddr*)&name, &namelen)); + } - return SocketAddress::getPort(name); + char servName[NI_MAXSERV]; + if (int rc = ::getnameinfo((sockaddr*)&name, namelen, + 0, 0, + servName, sizeof(servName), + NI_NUMERICHOST | NI_NUMERICSERV) != 0) + throw qpid::Exception(QPID_MSG(gai_strerror(rc))); + return servName; } } // namespace @@ -114,7 +138,13 @@ Socket::Socket() : IOHandle(new IOHandlePrivate), nonblocking(false), nodelay(false) -{} +{ + SOCKET& socket = impl->fd; + if (socket != INVALID_SOCKET) Socket::close(); + SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0); + if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); + socket = s; +} Socket::Socket(IOHandlePrivate* h) : IOHandle(h), @@ -122,7 +152,8 @@ Socket::Socket(IOHandlePrivate* h) : nodelay(false) {} -void Socket::createSocket(const SocketAddress& sa) const +void +Socket::createSocket(const SocketAddress& sa) const { SOCKET& socket = impl->fd; if (socket != INVALID_SOCKET) Socket::close(); @@ -137,24 +168,24 @@ void Socket::createSocket(const SocketAddress& sa) const if (nonblocking) setNonblocking(); if (nodelay) setTcpNoDelay(); } catch (std::exception&) { - ::closesocket(s); + closesocket(s); socket = INVALID_SOCKET; throw; } } -Socket* Socket::createSameTypeSocket() const { - SOCKET& socket = impl->fd; - // Socket currently has no actual socket attached - if (socket == INVALID_SOCKET) - return new Socket; - - ::sockaddr_storage sa; - ::socklen_t salen = sizeof(sa); - QPID_WINSOCK_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen)); - SOCKET s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM - if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); - return new Socket(new IOHandlePrivate(s)); +void Socket::setTimeout(const Duration& interval) const +{ + const SOCKET& socket = impl->fd; + int64_t nanosecs = interval; + nanosecs /= (1000 * 1000); // nsecs -> usec -> msec + int msec = 0; + if (nanosecs > std::numeric_limits<int>::max()) + msec = std::numeric_limits<int>::max(); + else + msec = static_cast<int>(nanosecs); + setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&msec, sizeof(msec)); + setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&msec, sizeof(msec)); } void Socket::setNonblocking() const { @@ -162,25 +193,30 @@ void Socket::setNonblocking() const { QPID_WINSOCK_CHECK(ioctlsocket(impl->fd, FIONBIO, &nonblock)); } -void Socket::connect(const std::string& host, const std::string& port) const +void Socket::connect(const std::string& host, uint16_t port) const { - SocketAddress sa(host, port); + SocketAddress sa(host, boost::lexical_cast<std::string>(port)); connect(sa); } void Socket::connect(const SocketAddress& addr) const { - peername = addr.asString(false); - - createSocket(addr); - const SOCKET& socket = impl->fd; - int err; + const addrinfo *addrs = &(getAddrInfo(addr)); + int error = 0; WSASetLastError(0); - if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) != 0) && - ((err = ::WSAGetLastError()) != WSAEWOULDBLOCK)) - throw qpid::Exception(QPID_MSG(strError(err) << ": " << peername)); + while (addrs != 0) { + if ((::connect(socket, addrs->ai_addr, addrs->ai_addrlen) == 0) || + (WSAGetLastError() == WSAEWOULDBLOCK)) + break; + // Error... save this error code and see if there are other address + // to try before throwing the exception. + error = WSAGetLastError(); + addrs = addrs->ai_next; + } + if (error) + throw qpid::Exception(QPID_MSG(strError(error) << ": " << connectname)); } void @@ -211,26 +247,24 @@ int Socket::read(void *buf, size_t count) const return received; } -int Socket::listen(const std::string& host, const std::string& port, int backlog) const -{ - SocketAddress sa(host, port); - return listen(sa, backlog); -} - -int Socket::listen(const SocketAddress& addr, int backlog) const +int Socket::listen(uint16_t port, int backlog) const { - createSocket(addr); - const SOCKET& socket = impl->fd; BOOL yes=1; QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, sizeof(yes))); - - if (::bind(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) == SOCKET_ERROR) - throw Exception(QPID_MSG("Can't bind to " << addr.asString() << ": " << strError(WSAGetLastError()))); + struct sockaddr_in name; + memset(&name, 0, sizeof(name)); + name.sin_family = AF_INET; + name.sin_port = htons(port); + name.sin_addr.s_addr = 0; + if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) == SOCKET_ERROR) + throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(WSAGetLastError()))); if (::listen(socket, backlog) == SOCKET_ERROR) - throw Exception(QPID_MSG("Can't listen on " <<addr.asString() << ": " << strError(WSAGetLastError()))); - - return getLocalPort(socket); + throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(WSAGetLastError()))); + + socklen_t namelen = sizeof(name); + QPID_WINSOCK_CHECK(::getsockname(socket, (struct sockaddr*)&name, &namelen)); + return ntohs(name.sin_port); } Socket* Socket::accept() const @@ -243,20 +277,36 @@ Socket* Socket::accept() const else throw QPID_WINDOWS_ERROR(WSAGetLastError()); } +std::string Socket::getSockname() const +{ + return getName(impl->fd, true); +} + +std::string Socket::getPeername() const +{ + return getName(impl->fd, false); +} + std::string Socket::getPeerAddress() const { - if (peername.empty()) { - peername = getName(impl->fd, false); - } - return peername; + if (!connectname.empty()) + return std::string (connectname); + return getName(impl->fd, false, true); } std::string Socket::getLocalAddress() const { - if (localname.empty()) { - localname = getName(impl->fd, true); - } - return localname; + return getName(impl->fd, true, true); +} + +uint16_t Socket::getLocalPort() const +{ + return atoi(getService(impl->fd, true).c_str()); +} + +uint16_t Socket::getRemotePort() const +{ + return atoi(getService(impl->fd, true).c_str()); } int Socket::getError() const diff --git a/cpp/src/qpid/sys/windows/SocketAddress.cpp b/cpp/src/qpid/sys/windows/SocketAddress.cpp index 77bbf85810..501cff1297 100644 --- a/cpp/src/qpid/sys/windows/SocketAddress.cpp +++ b/cpp/src/qpid/sys/windows/SocketAddress.cpp @@ -21,13 +21,7 @@ #include "qpid/sys/SocketAddress.h" -#include "qpid/Exception.h" -#include "qpid/Msg.h" - -// Ensure we get all of winsock2.h -#ifndef _WIN32_WINNT -#define _WIN32_WINNT 0x0501 -#endif +#include "qpid/sys/windows/check.h" #include <winsock2.h> #include <ws2tcpip.h> @@ -41,111 +35,37 @@ SocketAddress::SocketAddress(const std::string& host0, const std::string& port0) port(port0), addrInfo(0) { -} - -SocketAddress::SocketAddress(const SocketAddress& sa) : - host(sa.host), - port(sa.port), - addrInfo(0) -{ -} - -SocketAddress& SocketAddress::operator=(const SocketAddress& sa) -{ - SocketAddress temp(sa); - - std::swap(temp, *this); - return *this; -} - -SocketAddress::~SocketAddress() -{ - if (addrInfo) { - ::freeaddrinfo(addrInfo); + ::addrinfo hints; + ::memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well + hints.ai_socktype = SOCK_STREAM; + + const char* node = 0; + if (host.empty()) { + hints.ai_flags |= AI_PASSIVE; + } else { + node = host.c_str(); } -} + const char* service = port.empty() ? "0" : port.c_str(); -std::string SocketAddress::asString(::sockaddr const * const addr, size_t addrlen) -{ - char servName[NI_MAXSERV]; - char dispName[NI_MAXHOST]; - if (int rc=::getnameinfo(addr, addrlen, - dispName, sizeof(dispName), - servName, sizeof(servName), - NI_NUMERICHOST | NI_NUMERICSERV) != 0) - throw qpid::Exception(QPID_MSG(gai_strerror(rc))); - std::string s; - switch (addr->sa_family) { - case AF_INET: s += dispName; break; - case AF_INET6: s += "["; s += dispName; s+= "]"; break; - default: throw Exception(QPID_MSG("Unexpected socket type")); - } - s += ":"; - s += servName; - return s; + int n = ::getaddrinfo(node, service, &hints, &addrInfo); + if (n != 0) + throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n))); } -uint16_t SocketAddress::getPort(::sockaddr const * const addr) +SocketAddress::~SocketAddress() { - switch (addr->sa_family) { - case AF_INET: return ntohs(((::sockaddr_in*)addr)->sin_port); - case AF_INET6: return ntohs(((::sockaddr_in6*)addr)->sin6_port); - default:throw Exception(QPID_MSG("Unexpected socket type")); - } + ::freeaddrinfo(addrInfo); } -std::string SocketAddress::asString(bool numeric) const +std::string SocketAddress::asString() const { - if (!numeric) - return host + ":" + port; - // Canonicalise into numeric id - const ::addrinfo& ai = getAddrInfo(*this); - - return asString(ai.ai_addr, ai.ai_addrlen); -} - -bool SocketAddress::nextAddress() { - bool r = currentAddrInfo->ai_next != 0; - if (r) - currentAddrInfo = currentAddrInfo->ai_next; - return r; -} - -void SocketAddress::setAddrInfoPort(uint16_t port) { - if (!currentAddrInfo) return; - - ::addrinfo& ai = *currentAddrInfo; - switch (ai.ai_family) { - case AF_INET: ((::sockaddr_in*)ai.ai_addr)->sin_port = htons(port); return; - case AF_INET6:((::sockaddr_in6*)ai.ai_addr)->sin6_port = htons(port); return; - default: throw Exception(QPID_MSG("Unexpected socket type")); - } + return host + ":" + port; } const ::addrinfo& getAddrInfo(const SocketAddress& sa) { - if (!sa.addrInfo) { - ::addrinfo hints; - ::memset(&hints, 0, sizeof(hints)); - hints.ai_flags = AI_ADDRCONFIG; // Only use protocols that we have configured interfaces for - hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6 - hints.ai_socktype = SOCK_STREAM; - - const char* node = 0; - if (sa.host.empty()) { - hints.ai_flags |= AI_PASSIVE; - } else { - node = sa.host.c_str(); - } - const char* service = sa.port.empty() ? "0" : sa.port.c_str(); - - int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo); - if (n != 0) - throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n))); - sa.currentAddrInfo = sa.addrInfo; - } - - return *sa.currentAddrInfo; + return *sa.addrInfo; } }} diff --git a/cpp/src/qpid/sys/windows/SslAsynchIO.h b/cpp/src/qpid/sys/windows/SslAsynchIO.h index edec081ced..3cdf2c8f08 100644 --- a/cpp/src/qpid/sys/windows/SslAsynchIO.h +++ b/cpp/src/qpid/sys/windows/SslAsynchIO.h @@ -39,6 +39,9 @@ namespace qpid { namespace sys { namespace windows { +class Socket; +class Poller; + /* * SSL/Schannel shim between the frame-handling and AsynchIO layers. * SslAsynchIO creates a regular AsynchIO object to handle I/O and this class diff --git a/cpp/src/qpid/sys/windows/StrError.cpp b/cpp/src/qpid/sys/windows/StrError.cpp index 546d399d16..9c1bfcd79c 100755 --- a/cpp/src/qpid/sys/windows/StrError.cpp +++ b/cpp/src/qpid/sys/windows/StrError.cpp @@ -30,7 +30,6 @@ namespace sys { std::string strError(int err) { const size_t bufsize = 512; char buf[bufsize]; - buf[0] = 0; if (0 == FormatMessage (FORMAT_MESSAGE_MAX_WIDTH_MASK | FORMAT_MESSAGE_FROM_SYSTEM, 0, @@ -40,11 +39,7 @@ std::string strError(int err) { bufsize, 0)) { -#ifdef _MSC_VER - strerror_s(buf, bufsize, err); -#else - return std::string(strerror(err)); -#endif + strerror_s (buf, bufsize, err); } return std::string(buf); } diff --git a/cpp/src/qpid/sys/windows/Thread.cpp b/cpp/src/qpid/sys/windows/Thread.cpp index 23b0033be4..583a9613a3 100755 --- a/cpp/src/qpid/sys/windows/Thread.cpp +++ b/cpp/src/qpid/sys/windows/Thread.cpp @@ -19,11 +19,6 @@ * */ -// Ensure definition of OpenThread in mingw -#ifndef _WIN32_WINNT -#define _WIN32_WINNT 0x0501 -#endif - #include "qpid/sys/Thread.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/windows/check.h" @@ -31,204 +26,50 @@ #include <process.h> #include <windows.h> -/* - * This implementation distinguishes between two types of thread: Qpid - * threads (based on qpid::sys::Runnable) and the rest. It provides a - * join() that will not deadlock against the Windows loader lock for - * Qpid threads. - * - * System thread identifiers are unique per Windows thread; thread - * handles are not. Thread identifiers can be recycled, but keeping a - * handle open against the thread prevents recycling as long as - * shared_ptr references to a ThreadPrivate structure remain. - * - * There is a 1-1 relationship between Qpid threads and their - * ThreadPrivate structure. Non-Qpid threads do not need to find the - * qpidThreadDone handle, so there may be a 1-many relationship for - * them. - * - * TLS storage is used for a lockless solution for static library - * builds. The special case of LoadLibrary/FreeLibrary requires - * additional synchronization variables and resource cleanup in - * DllMain. _DLL marks the dynamic case. - */ +namespace { +unsigned __stdcall runRunnable(void* p) +{ + static_cast<qpid::sys::Runnable*>(p)->run(); + _endthreadex(0); + return 0; +} +} namespace qpid { namespace sys { class ThreadPrivate { -public: friend class Thread; - friend unsigned __stdcall runThreadPrivate(void*); - typedef boost::shared_ptr<ThreadPrivate> shared_ptr; - ~ThreadPrivate(); -private: - unsigned threadId; HANDLE threadHandle; - HANDLE initCompleted; - HANDLE qpidThreadDone; - Runnable* runnable; - shared_ptr keepAlive; - - ThreadPrivate() : threadId(GetCurrentThreadId()), initCompleted(NULL), - qpidThreadDone(NULL), runnable(NULL) { - threadHandle = OpenThread (SYNCHRONIZE, FALSE, threadId); - QPID_WINDOWS_CHECK_CRT_NZ(threadHandle); - } - - ThreadPrivate(Runnable* r) : threadHandle(NULL), initCompleted(NULL), - qpidThreadDone(NULL), runnable(r) {} - - void start(shared_ptr& p); - static shared_ptr createThread(Runnable* r); -}; - -}} // namespace qpid::sys - - -namespace { -using namespace qpid::sys; - -#ifdef _DLL -class ScopedCriticalSection -{ - public: - ScopedCriticalSection(CRITICAL_SECTION& cs) : criticalSection(cs) { EnterCriticalSection(&criticalSection); } - ~ScopedCriticalSection() { LeaveCriticalSection(&criticalSection); } - private: - CRITICAL_SECTION& criticalSection; -}; - -CRITICAL_SECTION threadLock; -long runningThreads = 0; -HANDLE threadsDone; -bool terminating = false; -#endif - - -DWORD volatile tlsIndex = TLS_OUT_OF_INDEXES; - -DWORD getTlsIndex() { - if (tlsIndex != TLS_OUT_OF_INDEXES) - return tlsIndex; // already set - - DWORD trialIndex = TlsAlloc(); - QPID_WINDOWS_CHECK_NOT(trialIndex, TLS_OUT_OF_INDEXES); // No OS resource + unsigned threadId; - // only one thread gets to set the value - DWORD actualIndex = (DWORD) InterlockedCompareExchange((LONG volatile *) &tlsIndex, (LONG) trialIndex, (LONG) TLS_OUT_OF_INDEXES); - if (actualIndex == TLS_OUT_OF_INDEXES) - return trialIndex; // we won the race - else { - TlsFree(trialIndex); - return actualIndex; + ThreadPrivate(Runnable* runnable) { + uintptr_t h = _beginthreadex(0, + 0, + runRunnable, + runnable, + 0, + &threadId); + QPID_WINDOWS_CHECK_CRT_NZ(h); + threadHandle = reinterpret_cast<HANDLE>(h); } -} - -} // namespace - -namespace qpid { -namespace sys { - -unsigned __stdcall runThreadPrivate(void* p) -{ - ThreadPrivate* threadPrivate = static_cast<ThreadPrivate*>(p); - TlsSetValue(getTlsIndex(), threadPrivate); - - WaitForSingleObject (threadPrivate->initCompleted, INFINITE); - CloseHandle (threadPrivate->initCompleted); - threadPrivate->initCompleted = NULL; - - try { - threadPrivate->runnable->run(); - } catch (...) { - // not our concern - } - - SetEvent (threadPrivate->qpidThreadDone); // allow join() - threadPrivate->keepAlive.reset(); // may run ThreadPrivate destructor - -#ifdef _DLL - { - ScopedCriticalSection l(threadLock); - if (--runningThreads == 0) - SetEvent(threadsDone); - } -#endif - return 0; -} - - -ThreadPrivate::shared_ptr ThreadPrivate::createThread(Runnable* runnable) { - ThreadPrivate::shared_ptr tp(new ThreadPrivate(runnable)); - tp->start(tp); - return tp; -} - -void ThreadPrivate::start(ThreadPrivate::shared_ptr& tp) { - getTlsIndex(); // fail here if OS problem, not in new thread - - initCompleted = CreateEvent (NULL, TRUE, FALSE, NULL); - QPID_WINDOWS_CHECK_CRT_NZ(initCompleted); - qpidThreadDone = CreateEvent (NULL, TRUE, FALSE, NULL); - QPID_WINDOWS_CHECK_CRT_NZ(qpidThreadDone); - -#ifdef _DLL - { - ScopedCriticalSection l(threadLock); - if (terminating) - throw qpid::Exception(QPID_MSG("creating thread after exit/FreeLibrary")); - runningThreads++; - } -#endif - - uintptr_t h = _beginthreadex(0, - 0, - runThreadPrivate, - (void *)this, - 0, - &threadId); - -#ifdef _DLL - if (h == NULL) { - ScopedCriticalSection l(threadLock); - if (--runningThreads == 0) - SetEvent(threadsDone); - } -#endif - - QPID_WINDOWS_CHECK_CRT_NZ(h); - - // Success - keepAlive = tp; - threadHandle = reinterpret_cast<HANDLE>(h); - SetEvent (initCompleted); -} - -ThreadPrivate::~ThreadPrivate() { - if (threadHandle) - CloseHandle (threadHandle); - if (initCompleted) - CloseHandle (initCompleted); - if (qpidThreadDone) - CloseHandle (qpidThreadDone); -} - + + ThreadPrivate() + : threadHandle(GetCurrentThread()), threadId(GetCurrentThreadId()) {} +}; Thread::Thread() {} -Thread::Thread(Runnable* runnable) : impl(ThreadPrivate::createThread(runnable)) {} +Thread::Thread(Runnable* runnable) : impl(new ThreadPrivate(runnable)) {} -Thread::Thread(Runnable& runnable) : impl(ThreadPrivate::createThread(&runnable)) {} +Thread::Thread(Runnable& runnable) : impl(new ThreadPrivate(&runnable)) {} Thread::operator bool() { return impl; } bool Thread::operator==(const Thread& t) const { - if (!impl || !t.impl) - return false; return impl->threadId == t.impl->threadId; } @@ -238,17 +79,10 @@ bool Thread::operator!=(const Thread& t) const { void Thread::join() { if (impl) { - DWORD status; - if (impl->runnable) { - HANDLE handles[2] = {impl->qpidThreadDone, impl->threadHandle}; - // wait for either. threadHandle not signalled if loader - // lock held (FreeLibrary). qpidThreadDone not signalled - // if thread terminated by exit(). - status = WaitForMultipleObjects (2, handles, false, INFINITE); - } - else - status = WaitForSingleObject (impl->threadHandle, INFINITE); + DWORD status = WaitForSingleObject (impl->threadHandle, INFINITE); QPID_WINDOWS_CHECK_NOT(status, WAIT_FAILED); + CloseHandle (impl->threadHandle); + impl->threadHandle = 0; } } @@ -258,70 +92,9 @@ unsigned long Thread::logId() { /* static */ Thread Thread::current() { - ThreadPrivate* tlsValue = (ThreadPrivate *) TlsGetValue(getTlsIndex()); Thread t; - if (tlsValue != NULL) { - // called from within Runnable->run(), so keepAlive has positive use count - t.impl = tlsValue->keepAlive; - } - else - t.impl.reset(new ThreadPrivate()); + t.impl.reset(new ThreadPrivate()); return t; } -}} // namespace qpid::sys - - -#ifdef _DLL - -// DllMain: called possibly many times in a process lifetime if dll -// loaded and freed repeatedly . Be mindful of Windows loader lock -// and other DllMain restrictions. - -BOOL APIENTRY DllMain(HMODULE hm, DWORD reason, LPVOID reserved) { - switch (reason) { - case DLL_PROCESS_ATTACH: - InitializeCriticalSection(&threadLock); - threadsDone = CreateEvent(NULL, TRUE, FALSE, NULL); - break; - - case DLL_PROCESS_DETACH: - terminating = true; - if (reserved != NULL) { - // process exit(): threads are stopped arbitrarily and - // possibly in an inconsistent state. Not even threadLock - // can be trusted. All static destructors have been - // called at this point and any resources this unit knows - // about will be released as part of process tear down by - // the OS. Accordingly, do nothing. - return TRUE; - } - else { - // FreeLibrary(): threads are still running and we are - // encouraged to clean up to avoid leaks. Mostly we just - // want any straggler threads to finish and notify - // threadsDone as the last thing they do. - while (1) { - { - ScopedCriticalSection l(threadLock); - if (runningThreads == 0) - break; - ResetEvent(threadsDone); - } - WaitForSingleObject(threadsDone, INFINITE); - } - if (tlsIndex != TLS_OUT_OF_INDEXES) - TlsFree(getTlsIndex()); - CloseHandle(threadsDone); - DeleteCriticalSection(&threadLock); - } - break; - - case DLL_THREAD_ATTACH: - case DLL_THREAD_DETACH: - break; - } - return TRUE; -} - -#endif +}} /* qpid::sys */ diff --git a/cpp/src/qpid/sys/windows/Time.cpp b/cpp/src/qpid/sys/windows/Time.cpp index 25c50819cd..16d09fcdc0 100644 --- a/cpp/src/qpid/sys/windows/Time.cpp +++ b/cpp/src/qpid/sys/windows/Time.cpp @@ -27,17 +27,6 @@ using namespace boost::posix_time; -namespace { - -// High-res timing support. This will display times since program start, -// more or less. Keep track of the start value and the conversion factor to -// seconds. -bool timeInitialized = false; -LARGE_INTEGER start; -double freq = 1.0; - -} - namespace qpid { namespace sys { @@ -102,35 +91,10 @@ void outputFormattedNow(std::ostream& o) { char time_string[100]; ::time( &rawtime ); -#ifdef _MSC_VER ::localtime_s(&timeinfo, &rawtime); -#else - timeinfo = *(::localtime(&rawtime)); -#endif ::strftime(time_string, 100, "%Y-%m-%d %H:%M:%S", &timeinfo); o << time_string << " "; } - -void outputHiresNow(std::ostream& o) { - if (!timeInitialized) { - start.QuadPart = 0; - LARGE_INTEGER iFreq; - iFreq.QuadPart = 1; - QueryPerformanceCounter(&start); - QueryPerformanceFrequency(&iFreq); - freq = static_cast<double>(iFreq.QuadPart); - timeInitialized = true; - } - LARGE_INTEGER iNow; - iNow.QuadPart = 0; - QueryPerformanceCounter(&iNow); - iNow.QuadPart -= start.QuadPart; - if (iNow.QuadPart < 0) - iNow.QuadPart = 0; - double now = static_cast<double>(iNow.QuadPart); - now /= freq; // now is seconds after this - o << std::fixed << std::setprecision(8) << std::setw(16) << std::setfill('0') << now << "s "; -} }} diff --git a/cpp/src/qpid/sys/windows/mingw32_compat.h b/cpp/src/qpid/sys/windows/mingw32_compat.h deleted file mode 100644 index 51f613cc25..0000000000 --- a/cpp/src/qpid/sys/windows/mingw32_compat.h +++ /dev/null @@ -1,39 +0,0 @@ -#ifndef _sys_windows_mingw32_compat -#define _sys_windows_mingw32_compat -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifdef WIN32 -#ifndef _MSC_VER - -// -// The following definitions for extension function GUIDs and signatures are taken from -// MswSock.h in the Windows32 SDK. These rightfully belong in the mingw32 version of -// mswsock.h, but are not included presently. -// - -#define WSAID_ACCEPTEX {0xb5367df1,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}} -typedef BOOL (PASCAL *LPFN_ACCEPTEX)(SOCKET,SOCKET,PVOID,DWORD,DWORD,DWORD,LPDWORD,LPOVERLAPPED); - -#endif -#endif - -#endif diff --git a/cpp/src/qpid/sys/windows/uuid.cpp b/cpp/src/qpid/sys/windows/uuid.cpp index 3316ecbc00..b5360622dc 100644 --- a/cpp/src/qpid/sys/windows/uuid.cpp +++ b/cpp/src/qpid/sys/windows/uuid.cpp @@ -19,7 +19,7 @@ * */ -#include <rpc.h> +#include <Rpc.h> #ifdef uuid_t /* Done in rpcdce.h */ # undef uuid_t #endif @@ -52,11 +52,7 @@ int uuid_parse (const char *in, uuid_t uu) { void uuid_unparse (const uuid_t uu, char *out) { unsigned char *formatted; if (UuidToString((UUID*)uu, &formatted) == RPC_S_OK) { -#ifdef _MSC_VER strncpy_s (out, 36+1, (char*)formatted, _TRUNCATE); -#else - strncpy (out, (char*)formatted, 36+1); -#endif RpcStringFree(&formatted); } } |
