summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/windows
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2008-10-21 18:29:44 +0000
committerStephen D. Huston <shuston@apache.org>2008-10-21 18:29:44 +0000
commita456228771847475234d60f58307fff1ba5ecc43 (patch)
tree41ee422be98bedec96969b4785ffb34c15e128c1 /cpp/src/qpid/sys/windows
parentf3c32c4937b95a0fafbbb36aa3e7d43471d77be3 (diff)
downloadqpid-python-a456228771847475234d60f58307fff1ba5ecc43.tar.gz
Refactor sys::AsynchIO class to allow reimplementing on other platforms without affecting upper level usage. Resolves QPID-1377 and supplies Windows AsynchIO.cpp
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@706709 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/windows')
-rw-r--r--cpp/src/qpid/sys/windows/AsynchIO.cpp729
-rwxr-xr-xcpp/src/qpid/sys/windows/AsynchIoResult.h187
-rwxr-xr-xcpp/src/qpid/sys/windows/IOHandle.cpp42
-rwxr-xr-xcpp/src/qpid/sys/windows/IoHandlePrivate.h52
-rwxr-xr-xcpp/src/qpid/sys/windows/IocpDispatcher.cpp54
-rwxr-xr-xcpp/src/qpid/sys/windows/IocpPoller.cpp176
-rwxr-xr-xcpp/src/qpid/sys/windows/Socket.cpp329
7 files changed, 1569 insertions, 0 deletions
diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp
new file mode 100644
index 0000000000..bde1213131
--- /dev/null
+++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -0,0 +1,729 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AsynchIoResult.h"
+#include "IoHandlePrivate.h"
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Time.h"
+#include "qpid/log/Statement.h"
+
+#include "check.h"
+
+#include <boost/thread/once.hpp>
+
+#include <winsock2.h>
+#include <mswsock.h>
+#include <windows.h>
+
+#include <boost/bind.hpp>
+
+namespace {
+
+ typedef qpid::sys::ScopedLock<qpid::sys::Mutex> QLock;
+
+/*
+ * We keep per thread state to avoid locking overhead. The assumption is that
+ * on average all the connections are serviced by all the threads so the state
+ * recorded in each thread is about the same. If this turns out not to be the
+ * case we could rebalance the info occasionally.
+ */
+QPID_TSS int threadReadTotal = 0;
+QPID_TSS int threadMaxRead = 0;
+QPID_TSS int threadReadCount = 0;
+QPID_TSS int threadWriteTotal = 0;
+QPID_TSS int threadWriteCount = 0;
+QPID_TSS int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms
+
+/*
+ * The function pointers for AcceptEx and ConnectEx need to be looked up
+ * at run time. Make sure this is done only once.
+ */
+boost::once_flag lookUpAcceptExOnce = BOOST_ONCE_INIT;
+LPFN_ACCEPTEX fnAcceptEx = 0;
+typedef void (*lookUpFunc)(const qpid::sys::Socket &);
+
+void lookUpAcceptEx() {
+ SOCKET h = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ GUID guidAcceptEx = WSAID_ACCEPTEX;
+ DWORD dwBytes = 0;
+ WSAIoctl(h,
+ SIO_GET_EXTENSION_FUNCTION_POINTER,
+ &guidAcceptEx,
+ sizeof(guidAcceptEx),
+ &fnAcceptEx,
+ sizeof(fnAcceptEx),
+ &dwBytes,
+ NULL,
+ NULL);
+ closesocket(h);
+ if (fnAcceptEx == 0)
+ throw qpid::Exception(QPID_MSG("Failed to look up AcceptEx"));
+}
+
+}
+
+namespace qpid {
+namespace sys {
+
+/*
+ * Asynch Acceptor
+ *
+ * This implementation uses knowledge that the DispatchHandle handle member
+ * is derived from PollerHandle, which has a reference to the Socket.
+ * No dispatching features of DispatchHandle are used - we just use the
+ * conduit to the Socket.
+ *
+ * AsynchAcceptor uses an AsynchAcceptResult object to track completion
+ * and status of each accept operation outstanding.
+ */
+
+class AsynchAcceptorPrivate {
+
+ friend class AsynchAcceptResult;
+
+public:
+ AsynchAcceptorPrivate(const Socket& s, AsynchAcceptor::Callback callback);
+ ~AsynchAcceptorPrivate();
+ void start(Poller::shared_ptr poller);
+
+private:
+ void restart(void);
+
+ AsynchAcceptor::Callback acceptedCallback;
+ const Socket& socket;
+};
+
+AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) :
+ impl(new AsynchAcceptorPrivate(s, callback))
+{}
+
+AsynchAcceptor::~AsynchAcceptor()
+{ delete impl; }
+
+void AsynchAcceptor::start(Poller::shared_ptr poller) {
+ impl->start(poller);
+}
+
+AsynchAcceptorPrivate::AsynchAcceptorPrivate(const Socket& s,
+ AsynchAcceptor::Callback callback)
+ : acceptedCallback(callback),
+ socket(s) {
+
+ s.setNonblocking();
+#if (BOOST_VERSION >= 103500) /* boost 1.35 or later reversed the args */
+ boost::call_once(lookUpAcceptExOnce, lookUpAcceptEx);
+#else
+ boost::call_once(lookUpAcceptEx, lookUpAcceptExOnce);
+#endif
+}
+
+AsynchAcceptorPrivate::~AsynchAcceptorPrivate(void) {
+ socket.close();
+}
+
+void AsynchAcceptorPrivate::start(Poller::shared_ptr poller) {
+ poller->addFd(PollerHandle(socket), Poller::INPUT);
+ restart ();
+}
+
+void AsynchAcceptor::restart(void) {
+ DWORD bytesReceived = 0; // Not used, needed for AcceptEx API
+ AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback,
+ this,
+ toFd(socket.impl));
+ BOOL status;
+ status = ::fnAcceptEx(toFd(socket.impl),
+ toFd(result->newSocket->impl),
+ result->addressBuffer,
+ 0,
+ AsynchAcceptResult::SOCKADDRMAXLEN,
+ AsynchAcceptResult::SOCKADDRMAXLEN,
+ &bytesReceived,
+ result->overlapped());
+ QPID_WINDOWS_CHECK_ASYNC_START(status);
+}
+
+
+AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb,
+ AsynchAcceptor *acceptor,
+ SOCKET listener)
+ : callback(cb), acceptor(acceptor), listener(listener) {
+ newSocket.reset (new Socket());
+}
+
+void AsynchAcceptResult::success(size_t /*bytesTransferred*/) {
+ ::setsockopt (toFd(newSocket->impl),
+ SOL_SOCKET,
+ SO_UPDATE_ACCEPT_CONTEXT,
+ (char*)&listener,
+ sizeof (listener));
+ callback(*(newSocket.release()));
+ acceptor->restart ();
+ delete this;
+}
+
+void AsynchAcceptResult::failure(int status) {
+ if (status != WSA_OPERATION_ABORTED)
+ ;
+ delete this;
+}
+
+namespace windows {
+
+/*
+ * AsynchConnector does synchronous connects for now... to do asynch the
+ * IocpPoller will need some extension to register an event handle as a
+ * CONNECT-type "direction", the connect completion/result will need an
+ * event handle to associate with the connecting handle. But there's no
+ * time for that right now...
+ */
+class AsynchConnector : public qpid::sys::AsynchConnector {
+private:
+ ConnectedCallback connCallback;
+ FailedCallback failCallback;
+ const Socket& socket;
+
+public:
+ AsynchConnector(const Socket& socket,
+ Poller::shared_ptr poller,
+ std::string hostname,
+ uint16_t port,
+ ConnectedCallback connCb,
+ FailedCallback failCb = 0);
+};
+
+AsynchConnector::AsynchConnector(const Socket& sock,
+ Poller::shared_ptr poller,
+ std::string hostname,
+ uint16_t port,
+ ConnectedCallback connCb,
+ FailedCallback failCb)
+ : connCallback(connCb), failCallback(failCb), socket(sock) {
+ socket.setNonblocking();
+ try {
+ socket.connect(hostname, port);
+ connCallback(socket);
+ } catch(std::exception& e) {
+ if (failCallback)
+ failCallback(-1, std::string(e.what()));
+ socket.close();
+ delete &socket;
+ }
+}
+
+} // namespace windows
+
+AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s,
+ Poller::shared_ptr poller,
+ std::string hostname,
+ uint16_t port,
+ ConnectedCallback connCb,
+ FailedCallback failCb)
+{
+ return new qpid::sys::windows::AsynchConnector(s,
+ poller,
+ hostname,
+ port,
+ connCb,
+ failCb);
+}
+
+
+/*
+ * Asynch reader/writer
+ */
+
+namespace windows {
+
+class AsynchIO : public qpid::sys::AsynchIO {
+public:
+ AsynchIO(const Socket& s,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb = 0,
+ BuffersEmptyCallback eCb = 0,
+ IdleCallback iCb = 0);
+ ~AsynchIO();
+
+ // Methods inherited from qpid::sys::AsynchIO
+
+ /**
+ * Notify the object is should delete itself as soon as possible.
+ */
+ virtual void queueForDeletion();
+
+ /// Take any actions needed to prepare for working with the poller.
+ virtual void start(Poller::shared_ptr poller);
+ virtual void queueReadBuffer(BufferBase* buff);
+ virtual void unread(BufferBase* buff);
+ virtual void queueWrite(BufferBase* buff);
+ virtual void notifyPendingWrite();
+ virtual void queueWriteClose();
+ virtual bool writeQueueEmpty();
+
+ /**
+ * getQueuedBuffer returns a buffer from the buffer queue, if one is
+ * available.
+ *
+ * @retval Pointer to BufferBase buffer; 0 if none is available.
+ */
+ virtual BufferBase* getQueuedBuffer();
+
+private:
+ ReadCallback readCallback;
+ EofCallback eofCallback;
+ DisconnectCallback disCallback;
+ ClosedCallback closedCallback;
+ BuffersEmptyCallback emptyCallback;
+ IdleCallback idleCallback;
+ const Socket& socket;
+ Poller::shared_ptr poller;
+
+ std::deque<BufferBase*> bufferQueue;
+ std::deque<BufferBase*> writeQueue;
+ /* The MSVC-supplied deque is not thread-safe; keep locks to serialize
+ * access to the buffer queue and write queue.
+ */
+ Mutex bufferQueueLock;
+
+ // Number of outstanding I/O operations.
+ volatile LONG opsInProgress;
+ // Is there a write in progress?
+ volatile bool writeInProgress;
+ // Deletion requested, but there are callbacks in progress.
+ volatile bool queuedDelete;
+ // Socket close requested, but there are operations in progress.
+ volatile bool queuedClose;
+
+private:
+ void close(void);
+
+ /**
+ * Initiate a read operation. AsynchIO::dispatchReadComplete() will be
+ * called when the read is complete and data is available.
+ */
+ virtual void startRead(void);
+
+ /**
+ * Initiate a write of the specified buffer. There's no callback for
+ * write completion to the AsynchIO object.
+ */
+ virtual void startWrite(AsynchIO::BufferBase* buff);
+
+ virtual bool writesNotComplete();
+
+ /**
+ * readComplete is called when a read request is complete.
+ *
+ * @param result Results of the operation.
+ */
+ void readComplete(AsynchReadResult *result);
+
+ /**
+ * writeComplete is called when a write request is complete.
+ *
+ * @param result Results of the operation.
+ */
+ void writeComplete(AsynchWriteResult *result);
+
+ /**
+ * Queue of completions to run. This queue enforces the requirement
+ * from upper layers that only one thread at a time is allowed to act
+ * on any given connection. Once a thread is busy processing a completion
+ * on this object, other threads that dispatch completions queue the
+ * completions here for the in-progress thread to handle when done.
+ * Thus, any threads can dispatch a completion from the IocpPoller, but
+ * this class ensures that actual processing at the connection level is
+ * only on one thread at a time.
+ */
+ std::queue<AsynchIoResult *> completionQueue;
+ volatile bool working;
+ Mutex completionLock;
+
+ /**
+ * Called when there's a completion to process.
+ */
+ void completion(AsynchIoResult *result);
+};
+
+AsynchIO::AsynchIO(const Socket& s,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb,
+ BuffersEmptyCallback eCb,
+ IdleCallback iCb) :
+
+ readCallback(rCb),
+ eofCallback(eofCb),
+ disCallback(disCb),
+ closedCallback(cCb),
+ emptyCallback(eCb),
+ idleCallback(iCb),
+ socket(s),
+ opsInProgress(0),
+ writeInProgress(false),
+ queuedDelete(false),
+ queuedClose(false),
+ working(false) {
+}
+
+struct deleter
+{
+ template <typename T>
+ void operator()(T *ptr){ delete ptr;}
+};
+
+AsynchIO::~AsynchIO() {
+ std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter());
+ std::for_each( writeQueue.begin(), writeQueue.end(), deleter());
+}
+
+void AsynchIO::queueForDeletion() {
+ queuedDelete = true;
+ if (opsInProgress > 0) {
+ QPID_LOG(info, "Delete AsynchIO queued; ops in progress");
+ // AsynchIOHandler calls this then deletes itself; don't do any more
+ // callbacks.
+ readCallback = 0;
+ eofCallback = 0;
+ disCallback = 0;
+ closedCallback = 0;
+ emptyCallback = 0;
+ idleCallback = 0;
+ }
+ else {
+ delete this;
+ }
+}
+
+void AsynchIO::start(Poller::shared_ptr poller0) {
+ poller = poller0;
+ poller->addFd(PollerHandle(socket), Poller::INPUT);
+ if (writeQueue.size() > 0) // Already have data queued for write
+ notifyPendingWrite();
+ startRead();
+}
+
+void AsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) {
+ assert(buff);
+ buff->dataStart = 0;
+ buff->dataCount = 0;
+ QLock l(bufferQueueLock);
+ bufferQueue.push_back(buff);
+}
+
+void AsynchIO::unread(AsynchIO::BufferBase* buff) {
+ assert(buff);
+ if (buff->dataStart != 0) {
+ memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount);
+ buff->dataStart = 0;
+ }
+ QLock l(bufferQueueLock);
+ bufferQueue.push_front(buff);
+}
+
+void AsynchIO::queueWrite(AsynchIO::BufferBase* buff) {
+ assert(buff);
+ QLock l(bufferQueueLock);
+ writeQueue.push_back(buff);
+ if (!writeInProgress)
+ notifyPendingWrite();
+}
+
+void AsynchIO::notifyPendingWrite() {
+ // This method is generally called from a processing thread; transfer
+ // work on this to an I/O thread. Much of the upper layer code assumes
+ // that all I/O-related things happen in an I/O thread.
+ if (poller == 0) // Not really going yet...
+ return;
+
+ InterlockedIncrement(&opsInProgress);
+ IOHandlePrivate *hp =
+ new IOHandlePrivate (INVALID_SOCKET,
+ boost::bind(&AsynchIO::completion, this, _1));
+ IOHandle h(hp);
+ PollerHandle ph(h);
+ poller->addFd(ph, Poller::OUTPUT);
+}
+
+void AsynchIO::queueWriteClose() {
+ queuedClose = true;
+ if (!writeInProgress)
+ notifyPendingWrite();
+}
+
+bool AsynchIO::writeQueueEmpty() {
+ QLock l(bufferQueueLock);
+ return writeQueue.size() == 0;
+}
+
+/**
+ * Return a queued buffer if there are enough to spare.
+ */
+AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() {
+ QLock l(bufferQueueLock);
+ // Always keep at least one buffer (it might have data that was
+ // "unread" in it).
+ if (bufferQueue.size() <= 1)
+ return 0;
+ BufferBase* buff = bufferQueue.back();
+ assert(buff);
+ bufferQueue.pop_back();
+ return buff;
+}
+
+void AsynchIO::dispatchReadComplete(AsynchIO::BufferBase *buffer) {
+ if (readCallback)
+ readCallback(*this, buffer);
+}
+
+void AsynchIO::notifyEof(void) {
+ if (eofCallback)
+ eofCallback(*this);
+}
+
+void AsynchIO::notifyDisconnect(void) {
+ if (disCallback)
+ disCallback(*this);
+}
+
+void AsynchIO::notifyClosed(void) {
+ if (closedCallback)
+ closedCallback(*this, socket);
+}
+
+void AsynchIO::notifyBuffersEmpty(void) {
+ if (emptyCallback)
+ emptyCallback(*this);
+}
+
+void AsynchIO::notifyIdle(void) {
+ if (idleCallback)
+ idleCallback(*this);
+}
+
+/*
+ * Asynch reader/writer using overlapped I/O
+ */
+
+void AsynchIO::startRead(void) {
+ if (queuedDelete)
+ return;
+
+ // (Try to) get a buffer; look on the front since there may be an
+ // "unread" one there with data remaining from last time.
+ AsynchIO::BufferBase *buff = 0;
+ {
+ QLock l(bufferQueueLock);
+
+ if (!bufferQueue.empty()) {
+ buff = bufferQueue.front();
+ assert(buff);
+ bufferQueue.pop_front();
+ }
+ }
+ if (buff != 0) {
+ int readCount = buff->byteCount - buff->dataCount;
+ AsynchReadResult *result =
+ new AsynchReadResult(boost::bind(&AsynchIO::completion, this, _1),
+ buff,
+ readCount);
+ DWORD bytesReceived = 0, flags = 0;
+ InterlockedIncrement(&opsInProgress);
+ int status = WSARecv(toFd(socket.impl),
+ const_cast<LPWSABUF>(result->getWSABUF()), 1,
+ &bytesReceived,
+ &flags,
+ result->overlapped(),
+ 0);
+ if (status != 0) {
+ int error = WSAGetLastError();
+ if (error != WSA_IO_PENDING) {
+ result->failure(error);
+ result = 0; // result is invalid here
+ return;
+ }
+ }
+ // On status 0 or WSA_IO_PENDING, completion will handle the rest.
+ }
+ else {
+ notifyBuffersEmpty();
+ }
+ return;
+}
+
+void AsynchIO::startWrite(AsynchIO::BufferBase* buff) {
+ writeInProgress = true;
+ InterlockedIncrement(&opsInProgress);
+ int writeCount = buff->byteCount-buff->dataCount;
+ AsynchWriteResult *result =
+ new AsynchWriteResult(boost::bind(&AsynchIO::completion, this, _1),
+ buff,
+ buff->dataCount);
+ DWORD bytesSent = 0;
+ int status = WSASend(toFd(socket.impl),
+ const_cast<LPWSABUF>(result->getWSABUF()), 1,
+ &bytesSent,
+ 0,
+ result->overlapped(),
+ 0);
+ if (status != 0) {
+ int error = WSAGetLastError();
+ if (error != WSA_IO_PENDING) {
+ result->failure(error); // Also decrements in-progress count
+ result = 0; // result is invalid here
+ return;
+ }
+ }
+ // On status 0 or WSA_IO_PENDING, completion will handle the rest.
+ return;
+}
+
+bool AsynchIO::writesNotComplete() {
+ return writeInProgress;
+}
+
+/*
+ * Close the socket and callback to say we've done it
+ */
+void AsynchIO::close(void) {
+ socket.close();
+ notifyClosed();
+}
+
+void AsynchIO::readComplete(AsynchReadResult *result) {
+ ++threadReadCount;
+ int status = result->getStatus();
+ size_t bytes = result->getTransferred();
+ if (status == 0 && bytes > 0) {
+ threadReadTotal += bytes;
+ dispatchReadComplete(result->getBuff());
+ startRead();
+ }
+ else {
+ // No data read, so put the buffer back. It may be partially filled,
+ // so "unread" it back to the front of the queue.
+ unread(result->getBuff());
+ if (status == 0)
+ notifyEof();
+ else
+ notifyDisconnect();
+ }
+}
+
+/*
+ * NOTE - this completion is called for completed writes and also when
+ * a write is desired. The difference is in the buff - if a write is desired
+ * the buff is 0.
+ */
+void AsynchIO::writeComplete(AsynchWriteResult *result) {
+ int status = result->getStatus();
+ size_t bytes = result->getTransferred();
+ AsynchIO::BufferBase *buff = result->getBuff();
+ if (buff != 0) {
+ ++threadWriteCount;
+ writeInProgress = false;
+ if (status == 0 && bytes > 0) {
+ threadWriteTotal += bytes;
+ if (bytes < result->getRequested()) // Still more to go; resubmit
+ startWrite(buff);
+ else
+ queueReadBuffer(buff); // All done; back to the pool
+ }
+ else {
+ // An error... if it's a connection close, ignore it - it will be
+ // noticed and handled on a read completion any moment now.
+ // What to do with real error??? Save the Buffer?
+ }
+ }
+
+ // If there are no writes outstanding, the priority is to write any
+ // remaining buffers first (either queued or via idle), then close the
+ // socket if that's queued.
+ // opsInProgress handled in completion()
+ if (!writeInProgress) {
+ bool writing = false;
+ {
+ QLock l(bufferQueueLock);
+ if (writeQueue.size() > 0) {
+ buff = writeQueue.front();
+ assert(buff);
+ writeQueue.pop_front();
+ startWrite(buff);
+ writing = true;
+ }
+ }
+ if (!writing) {
+ if (queuedClose)
+ close();
+ else
+ notifyIdle();
+ }
+ }
+ return;
+}
+
+void AsynchIO::completion(AsynchIoResult *result) {
+ {
+ ScopedLock<Mutex> l(completionLock);
+ if (working) {
+ completionQueue.push(result);
+ return;
+ }
+
+ // First thread in with something to do; note we're working then keep
+ // handling completions.
+ working = true;
+ while (result != 0) {
+ // New scope to unlock temporarily.
+ {
+ ScopedUnlock<Mutex> ul(completionLock);
+ AsynchReadResult *r = dynamic_cast<AsynchReadResult*>(result);
+ if (r != 0)
+ readComplete(r);
+ else {
+ AsynchWriteResult *w =
+ dynamic_cast<AsynchWriteResult*>(result);
+ writeComplete(w);
+ }
+ delete result;
+ result = 0;
+ InterlockedDecrement(&opsInProgress);
+ }
+ // Lock is held again.
+ if (completionQueue.empty())
+ continue;
+ result = completionQueue.front();
+ completionQueue.pop();
+ }
+ working = false;
+ }
+ // Lock released; ok to delete if all is done.
+ if (opsInProgress == 0 && queuedDelete)
+ delete this;
+}
+
+}} // namespace qpid::windows
diff --git a/cpp/src/qpid/sys/windows/AsynchIoResult.h b/cpp/src/qpid/sys/windows/AsynchIoResult.h
new file mode 100755
index 0000000000..9efdaebda8
--- /dev/null
+++ b/cpp/src/qpid/sys/windows/AsynchIoResult.h
@@ -0,0 +1,187 @@
+#ifndef _windows_asynchIoResult_h
+#define _windows_asynchIoResult_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AsynchIO.h"
+#include "qpid/sys/Socket.h"
+#include <memory.h>
+#include <winsock2.h>
+#include <ws2tcpip.h>
+
+namespace qpid {
+namespace sys {
+
+/*
+ * AsynchIoResult defines the class that receives the result of an
+ * asynchronous I/O operation, either send/recv or accept/connect.
+ *
+ * Operation factories should set one of these up before beginning the
+ * operation. Poller knows how to dispatch completion to this class.
+ * This class must be subclassed for needed operations; this class provides
+ * an interface only and cannot be instantiated.
+ *
+ * This class is tied to Windows; it inherits from OVERLAPPED so that the
+ * IocpPoller can cast OVERLAPPED pointers back to AsynchIoResult and call
+ * the completion handler.
+ */
+class AsynchResult : private OVERLAPPED {
+public:
+ LPOVERLAPPED overlapped(void) { return this; }
+ static AsynchResult* from_overlapped(LPOVERLAPPED ol) {
+ return static_cast<AsynchResult*>(ol);
+ }
+ virtual void success (size_t bytesTransferred) {
+ bytes = bytesTransferred;
+ status = 0;
+ complete();
+ }
+ virtual void failure (int error) {
+ bytes = 0;
+ status = error;
+ complete();
+ }
+ size_t getTransferred(void) const { return bytes; }
+ int getStatus(void) const { return status; }
+
+protected:
+ AsynchResult() : bytes(0), status(0)
+ { memset(overlapped(), 0, sizeof(OVERLAPPED)); }
+ ~AsynchResult() {}
+ virtual void complete(void) = 0;
+
+ size_t bytes;
+ int status;
+};
+
+class AsynchAcceptorPrivate;
+class AsynchAcceptResult : public AsynchResult {
+
+ friend class AsynchAcceptorPrivate;
+
+public:
+ AsynchAcceptResult(AsynchAcceptor::Callback cb,
+ AsynchAcceptorPrivate *acceptor,
+ SOCKET listener);
+ virtual void success (size_t bytesTransferred);
+ virtual void failure (int error);
+
+private:
+ virtual void complete(void) {} // No-op for this class.
+
+ std::auto_ptr<qpid::sys::Socket> newSocket;
+ AsynchAcceptor::Callback callback;
+ AsynchAcceptorPrivate *acceptor;
+ SOCKET listener;
+
+ // AcceptEx needs a place to write the local and remote addresses
+ // when accepting the connection. Place those here; get enough for
+ // IPv6 addresses, even if the socket is IPv4.
+ enum { SOCKADDRMAXLEN = sizeof sockaddr_in6 + 16,
+ SOCKADDRBUFLEN = 2 * SOCKADDRMAXLEN };
+ char addressBuffer[SOCKADDRBUFLEN];
+};
+
+class AsynchIO;
+
+class AsynchIoResult : public AsynchResult {
+public:
+ typedef boost::function1<void, AsynchIoResult *> Completer;
+
+ virtual ~AsynchIoResult() {}
+ AsynchIO::BufferBase *getBuff(void) const { return iobuff; }
+ size_t getRequested(void) const { return requested; }
+ const WSABUF *getWSABUF(void) const { return &wsabuf; }
+
+protected:
+ void setBuff (AsynchIO::BufferBase *buffer) { iobuff = buffer; }
+
+protected:
+ AsynchIoResult(Completer cb,
+ AsynchIO::BufferBase *buff, size_t length)
+ : completionCallback(cb), iobuff(buff), requested(length) {}
+
+ virtual void complete(void) = 0;
+ WSABUF wsabuf;
+ Completer completionCallback;
+
+private:
+ AsynchIO::BufferBase *iobuff;
+ size_t requested; // Number of bytes in original I/O request
+};
+
+class AsynchReadResult : public AsynchIoResult {
+
+ // complete() updates buffer then does completion callback.
+ virtual void complete(void) {
+ getBuff()->dataCount += bytes;
+ completionCallback(this);
+ }
+
+public:
+ AsynchReadResult(AsynchIoResult::Completer cb,
+ AsynchIO::BufferBase *buff,
+ size_t length)
+ : AsynchIoResult(cb, buff, length) {
+ wsabuf.buf = buff->bytes + buff->dataCount;
+ wsabuf.len = length;
+ }
+};
+
+class AsynchWriteResult : public AsynchIoResult {
+
+ // complete() updates buffer then does completion callback.
+ virtual void complete(void) {
+ AsynchIO::BufferBase *b = getBuff();
+ b->dataStart += bytes;
+ b->dataCount -= bytes;
+ completionCallback(this);
+ }
+
+public:
+ AsynchWriteResult(AsynchIoResult::Completer cb,
+ AsynchIO::BufferBase *buff,
+ size_t length)
+ : AsynchIoResult(cb, buff, length) {
+ wsabuf.buf = buff ? buff->bytes : 0;
+ wsabuf.len = length;
+ }
+};
+
+class AsynchWriteWanted : public AsynchWriteResult {
+
+ // complete() just does completion callback; no buffers used.
+ virtual void complete(void) {
+ completionCallback(this);
+ }
+
+public:
+ AsynchWriteWanted(AsynchIoResult::Completer cb)
+ : AsynchWriteResult(cb, 0, 0) {
+ wsabuf.buf = 0;
+ wsabuf.len = 0;
+ }
+};
+
+}}
+
+#endif /*!_windows_asynchIoResult_h*/
diff --git a/cpp/src/qpid/sys/windows/IOHandle.cpp b/cpp/src/qpid/sys/windows/IOHandle.cpp
new file mode 100755
index 0000000000..ba544c8c90
--- /dev/null
+++ b/cpp/src/qpid/sys/windows/IOHandle.cpp
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/IOHandle.h"
+#include "IoHandlePrivate.h"
+#include <windows.h>
+
+namespace qpid {
+namespace sys {
+
+SOCKET toFd(const IOHandlePrivate* h)
+{
+ return h->fd;
+}
+
+IOHandle::IOHandle(IOHandlePrivate* h) :
+ impl(h)
+{}
+
+IOHandle::~IOHandle() {
+ delete impl;
+}
+
+}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/windows/IoHandlePrivate.h b/cpp/src/qpid/sys/windows/IoHandlePrivate.h
new file mode 100755
index 0000000000..18e75047ed
--- /dev/null
+++ b/cpp/src/qpid/sys/windows/IoHandlePrivate.h
@@ -0,0 +1,52 @@
+#ifndef _sys_windows_IoHandlePrivate_h
+#define _sys_windows_IoHandlePrivate_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AsynchIoResult.h"
+
+#include <winsock2.h>
+
+namespace qpid {
+namespace sys {
+
+// Private fd related implementation details
+// There should be either a valid socket handle or a completer callback.
+// Handle is used to associate with poller's iocp; completer is used to
+// inject a completion that will very quickly trigger a callback to the
+// completer from an I/O thread.
+class IOHandlePrivate {
+public:
+ IOHandlePrivate(SOCKET f = INVALID_SOCKET,
+ AsynchIoResult::Completer cb = 0) :
+ fd(f), event(cb)
+ {}
+
+ SOCKET fd;
+ AsynchIoResult::Completer event;
+};
+
+SOCKET toFd(const IOHandlePrivate* h);
+
+}}
+
+#endif /* _sys_windows_IoHandlePrivate_h */
diff --git a/cpp/src/qpid/sys/windows/IocpDispatcher.cpp b/cpp/src/qpid/sys/windows/IocpDispatcher.cpp
new file mode 100755
index 0000000000..1a0f6ce927
--- /dev/null
+++ b/cpp/src/qpid/sys/windows/IocpDispatcher.cpp
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Dispatcher.h"
+
+#include <assert.h>
+
+namespace qpid {
+namespace sys {
+
+Dispatcher::Dispatcher(Poller::shared_ptr poller0) :
+ poller(poller0) {
+}
+
+Dispatcher::~Dispatcher() {
+}
+
+void Dispatcher::run() {
+ do {
+ Poller::Event event = poller->wait();
+
+ // Handle shutdown
+ switch (event.type) {
+ case Poller::SHUTDOWN:
+ return;
+ break;
+ case Poller::INVALID: // On any type of success or fail completion
+ break;
+ default:
+ // This should be impossible
+ assert(false);
+ }
+ } while (true);
+}
+
+}}
diff --git a/cpp/src/qpid/sys/windows/IocpPoller.cpp b/cpp/src/qpid/sys/windows/IocpPoller.cpp
new file mode 100755
index 0000000000..44298ac8ea
--- /dev/null
+++ b/cpp/src/qpid/sys/windows/IocpPoller.cpp
@@ -0,0 +1,176 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/Mutex.h"
+
+#include "AsynchIoResult.h"
+#include "IoHandlePrivate.h"
+#include "check.h"
+
+#include <winsock2.h>
+#include <windows.h>
+
+#include <assert.h>
+#include <vector>
+#include <exception>
+
+namespace qpid {
+namespace sys {
+
+class PollerHandlePrivate {
+ friend class Poller;
+ friend class PollerHandle;
+
+ SOCKET fd;
+ AsynchIoResult::Completer cb;
+
+ PollerHandlePrivate(SOCKET f, AsynchIoResult::Completer cb0 = 0) :
+ fd(f), cb(cb0)
+ {
+ }
+
+};
+
+PollerHandle::PollerHandle(const IOHandle& h) :
+ impl(new PollerHandlePrivate(toFd(h.impl), h.impl->event))
+{}
+
+PollerHandle::~PollerHandle() {
+ delete impl;
+}
+
+/**
+ * Concrete implementation of Poller to use the Windows I/O Completion
+ * port (IOCP) facility.
+ */
+class PollerPrivate {
+ friend class Poller;
+
+ const HANDLE iocp;
+
+ // The number of threads running the event loop.
+ volatile LONG threadsRunning;
+
+ // Shutdown request is handled by setting isShutdown and injecting a
+ // well-formed completion event into the iocp.
+ bool isShutdown;
+
+ PollerPrivate() :
+ iocp(::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)),
+ threadsRunning(0),
+ isShutdown(false) {
+ QPID_WINDOWS_CHECK_NULL(iocp);
+ }
+
+ ~PollerPrivate() {
+ // It's probably okay to ignore any errors here as there can't be
+ // data loss
+ ::CloseHandle(iocp);
+ }
+};
+
+void Poller::addFd(PollerHandle& handle, Direction dir) {
+ HANDLE h = (HANDLE)(handle.impl->fd);
+ if (h != INVALID_HANDLE_VALUE) {
+ HANDLE iocpHandle = ::CreateIoCompletionPort (h, impl->iocp, 0, 0);
+ QPID_WINDOWS_CHECK_NULL(iocpHandle);
+ }
+ else {
+ AsynchWriteWanted *result = new AsynchWriteWanted(handle.impl->cb);
+ PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped());
+ }
+}
+
+void Poller::shutdown() {
+ // Allow sloppy code to shut us down more than once.
+ if (impl->isShutdown)
+ return;
+ ULONG_PTR key = 1; // Tell wait() it's a shutdown, not I/O
+ PostQueuedCompletionStatus(impl->iocp, 0, key, 0);
+}
+
+// All no-ops...
+void Poller::delFd(PollerHandle& handle) {}
+void Poller::modFd(PollerHandle& handle, Direction dir) {}
+void Poller::rearmFd(PollerHandle& handle) {}
+
+Poller::Event Poller::wait(Duration timeout) {
+ DWORD timeoutMs = 0;
+ DWORD numTransferred = 0;
+ ULONG_PTR completionKey = 0;
+ OVERLAPPED *overlapped = 0;
+ AsynchResult *result = 0;
+
+ // Wait for either an I/O operation to finish (thus signaling the
+ // IOCP handle) or a shutdown request to be made (thus signaling the
+ // shutdown event).
+ if (timeout == TIME_INFINITE)
+ timeoutMs = INFINITE;
+ else
+ timeoutMs = static_cast<DWORD>(timeout / TIME_MSEC);
+
+ InterlockedIncrement(&impl->threadsRunning);
+ bool goodOp = ::GetQueuedCompletionStatus (impl->iocp,
+ &numTransferred,
+ &completionKey,
+ &overlapped,
+ timeoutMs);
+ LONG remainingThreads = InterlockedDecrement(&impl->threadsRunning);
+ if (goodOp) {
+ // Dequeued a successful completion. If it's a posted packet from
+ // shutdown() the overlapped ptr is 0 and key is 1. Else downcast
+ // the OVERLAPPED pointer to an AsynchIoResult and call the
+ // completion handler.
+ if (overlapped == 0 && completionKey == 1) {
+ // If there are other threads still running this wait, re-post
+ // the completion.
+ if (remainingThreads > 0)
+ PostQueuedCompletionStatus(impl->iocp, 0, completionKey, 0);
+ return Event(0, SHUTDOWN);
+ }
+
+ result = AsynchResult::from_overlapped(overlapped);
+ result->success (static_cast<size_t>(numTransferred));
+ }
+ else {
+ if (overlapped != 0) {
+ // Dequeued a completion for a failed operation. Downcast back
+ // to the result object and inform it that the operation failed.
+ DWORD status = ::GetLastError();
+ result = AsynchResult::from_overlapped(overlapped);
+ result->failure (static_cast<int>(status));
+ }
+ }
+ return Event(0, INVALID); // TODO - this may need to be changed.
+
+}
+
+// Concrete constructors
+Poller::Poller() :
+ impl(new PollerPrivate())
+{}
+
+Poller::~Poller() {
+ delete impl;
+}
+
+}}
diff --git a/cpp/src/qpid/sys/windows/Socket.cpp b/cpp/src/qpid/sys/windows/Socket.cpp
new file mode 100755
index 0000000000..a9959bf43e
--- /dev/null
+++ b/cpp/src/qpid/sys/windows/Socket.cpp
@@ -0,0 +1,329 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Socket.h"
+#include "IoHandlePrivate.h"
+#include "check.h"
+#include "qpid/sys/Time.h"
+
+#include <cstdlib>
+#include <string.h>
+#include <iostream>
+#include <memory.h>
+
+#include <winsock2.h>
+#include <ws2tcpip.h>
+
+#include <boost/format.hpp>
+
+// Need to initialize WinSock. Ideally, this would be a singleton or embedded
+// in some one-time initialization function. I tried boost singleton and could
+// not get it to compile (and others located in google had the same problem).
+// So, this simple static with an interlocked increment will do for known
+// use cases at this time. Since this will only shut down winsock at process
+// termination, there may be some problems with client programs that also
+// expect to load and unload winsock, but we'll see...
+// If someone does get an easy-to-use singleton sometime, converting to it
+// may be preferable.
+
+namespace {
+
+static LONG volatile initialized = 0;
+
+class WinSockSetup {
+ // : public boost::details::pool::singleton_default<WinSockSetup> {
+
+public:
+ WinSockSetup() {
+ LONG timesEntered = InterlockedIncrement(&initialized);
+ if (timesEntered > 1)
+ return;
+ err = 0;
+ WORD wVersionRequested;
+ WSADATA wsaData;
+
+ /* Request WinSock 2.2 */
+ wVersionRequested = MAKEWORD(2, 2);
+ err = WSAStartup(wVersionRequested, &wsaData);
+ }
+
+ ~WinSockSetup() {
+ WSACleanup();
+ }
+
+public:
+ int error(void) const { return err; }
+
+protected:
+ DWORD err;
+};
+
+static WinSockSetup setup;
+
+} /* namespace */
+
+namespace qpid {
+namespace sys {
+
+namespace {
+
+std::string getName(SOCKET fd, bool local, bool includeService = false)
+{
+ sockaddr_in name; // big enough for any socket address
+ socklen_t namelen = sizeof(name);
+ if (local) {
+ QPID_WINSOCK_CHECK(::getsockname(fd, (sockaddr*)&name, &namelen));
+ } else {
+ QPID_WINSOCK_CHECK(::getpeername(fd, (sockaddr*)&name, &namelen));
+ }
+
+ char servName[NI_MAXSERV];
+ char dispName[NI_MAXHOST];
+ if (includeService) {
+ if (int rc = ::getnameinfo((sockaddr*)&name, namelen,
+ dispName, sizeof(dispName),
+ servName, sizeof(servName),
+ NI_NUMERICHOST | NI_NUMERICSERV) != 0)
+ throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
+ return std::string(dispName) + ":" + std::string(servName);
+ } else {
+ if (int rc = ::getnameinfo((sockaddr*)&name, namelen,
+ dispName, sizeof(dispName),
+ 0, 0,
+ NI_NUMERICHOST) != 0)
+ throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
+ return dispName;
+ }
+}
+
+std::string getService(SOCKET fd, bool local)
+{
+ sockaddr_in name; // big enough for any socket address
+ socklen_t namelen = sizeof(name);
+
+ if (local) {
+ QPID_WINSOCK_CHECK(::getsockname(fd, (sockaddr*)&name, &namelen));
+ } else {
+ QPID_WINSOCK_CHECK(::getpeername(fd, (sockaddr*)&name, &namelen));
+ }
+
+ char servName[NI_MAXSERV];
+ if (int rc = ::getnameinfo((sockaddr*)&name, namelen,
+ 0, 0,
+ servName, sizeof(servName),
+ NI_NUMERICHOST | NI_NUMERICSERV) != 0)
+ throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
+ return servName;
+}
+} // namespace
+
+Socket::Socket() :
+ IOHandle(new IOHandlePrivate)
+{
+ createTcp();
+}
+
+Socket::Socket(IOHandlePrivate* h) :
+ IOHandle(h)
+{}
+
+void Socket::createTcp() const
+{
+ SOCKET& socket = impl->fd;
+ if (socket != INVALID_SOCKET) Socket::close();
+ SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0);
+ if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
+ socket = s;
+}
+
+void Socket::setTimeout(const Duration& interval) const
+{
+ const SOCKET& socket = impl->fd;
+ int64_t nanosecs = interval;
+ nanosecs /= (1000 * 1000); // nsecs -> usec -> msec
+ int msec = 0;
+ if (nanosecs > std::numeric_limits<int>::max())
+ msec = std::numeric_limits<int>::max();
+ else
+ msec = static_cast<int>(nanosecs);
+ setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&msec, sizeof(msec));
+ setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&msec, sizeof(msec));
+}
+
+void Socket::setNonblocking() const {
+ u_long nonblock = 1;
+ QPID_WINSOCK_CHECK(ioctlsocket(impl->fd, FIONBIO, &nonblock));
+}
+
+void Socket::connect(const std::string& host, uint16_t port) const
+{
+ std::stringstream portstream;
+ portstream << port << std::ends;
+ std::string portstr = portstream.str();
+ std::stringstream namestream;
+ namestream << host << ":" << port;
+ connectname = namestream.str();
+
+ const SOCKET& socket = impl->fd;
+ // TODO: Be good to make this work for IPv6 as well as IPv4. Would require
+ // other changes, such as waiting to create the socket until after we
+ // have the address family. Maybe unbundle the translation of names here;
+ // use TcpAddress to resolve things and make this class take a TcpAddress
+ // and grab its address family to create the socket.
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_INET; // We always creating AF_INET-only sockets.
+ hints.ai_socktype = SOCK_STREAM; // We always do TCP
+ addrinfo *addrs;
+ int status = getaddrinfo(host.c_str(), portstr.c_str(), &hints, &addrs);
+ if (status != 0)
+ throw Exception(QPID_MSG("Cannot resolve " << host << ": " <<
+ gai_strerror(status)));
+ addrinfo *addr = addrs;
+ int error = 0;
+ WSASetLastError(0);
+ while (addr != 0) {
+ if ((::connect(socket, addr->ai_addr, addr->ai_addrlen) == 0) ||
+ (WSAGetLastError() == WSAEWOULDBLOCK))
+ break;
+ // Error... save this error code and see if there are other address
+ // to try before throwing the exception.
+ error = WSAGetLastError();
+ addr = addr->ai_next;
+ }
+ freeaddrinfo(addrs);
+ if (error)
+ throw qpid::Exception(QPID_MSG(strError(error) << ": " << connectname));
+}
+
+void
+Socket::close() const
+{
+ SOCKET& socket = impl->fd;
+ if (socket == INVALID_SOCKET) return;
+ QPID_WINSOCK_CHECK(closesocket(socket));
+ socket = INVALID_SOCKET;
+}
+
+
+int Socket::write(const void *buf, size_t count) const
+{
+ const SOCKET& socket = impl->fd;
+ int sent = ::send(socket, (const char *)buf, count, 0);
+ if (sent == SOCKET_ERROR)
+ return -1;
+ return sent;
+}
+
+int Socket::read(void *buf, size_t count) const
+{
+ const SOCKET& socket = impl->fd;
+ int received = ::recv(socket, (char *)buf, count, 0);
+ if (received == SOCKET_ERROR)
+ return -1;
+ return received;
+}
+
+int Socket::listen(uint16_t port, int backlog) const
+{
+ const SOCKET& socket = impl->fd;
+ BOOL yes=1;
+ QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, sizeof(yes)));
+ struct sockaddr_in name;
+ memset(&name, 0, sizeof(name));
+ name.sin_family = AF_INET;
+ name.sin_port = htons(port);
+ name.sin_addr.s_addr = 0;
+ if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) == SOCKET_ERROR)
+ throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(WSAGetLastError())));
+ if (::listen(socket, backlog) == SOCKET_ERROR)
+ throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(WSAGetLastError())));
+
+ socklen_t namelen = sizeof(name);
+ QPID_WINSOCK_CHECK(::getsockname(socket, (struct sockaddr*)&name, &namelen));
+ return ntohs(name.sin_port);
+}
+
+Socket* Socket::accept(struct sockaddr *addr, socklen_t *addrlen) const
+{
+ SOCKET afd = ::accept(impl->fd, addr, addrlen);
+ if (afd != INVALID_SOCKET)
+ return new Socket(new IOHandlePrivate(afd));
+ else if (WSAGetLastError() == EAGAIN)
+ return 0;
+ else throw QPID_WINDOWS_ERROR(WSAGetLastError());
+}
+
+std::string Socket::getSockname() const
+{
+ return getName(impl->fd, true);
+}
+
+std::string Socket::getPeername() const
+{
+ return getName(impl->fd, false);
+}
+
+std::string Socket::getPeerAddress() const
+{
+ if (!connectname.empty())
+ return std::string (connectname);
+ return getName(impl->fd, false, true);
+}
+
+std::string Socket::getLocalAddress() const
+{
+ return getName(impl->fd, true, true);
+}
+
+uint16_t Socket::getLocalPort() const
+{
+ return atoi(getService(impl->fd, true).c_str());
+}
+
+uint16_t Socket::getRemotePort() const
+{
+ return atoi(getService(impl->fd, true).c_str());
+}
+
+int Socket::getError() const
+{
+ int result;
+ socklen_t rSize = sizeof (result);
+
+ QPID_WINSOCK_CHECK(::getsockopt(impl->fd, SOL_SOCKET, SO_ERROR, (char *)&result, &rSize));
+ return result;
+}
+
+void Socket::setTcpNoDelay(bool nodelay) const
+{
+ if (nodelay) {
+ int flag = 1;
+ int result = setsockopt(impl->fd,
+ IPPROTO_TCP,
+ TCP_NODELAY,
+ (char *)&flag,
+ sizeof(flag));
+ QPID_WINSOCK_CHECK(result);
+ }
+}
+
+}} // namespace qpid::sys