diff options
author | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
commit | f83677056891e436bf5ba99e79240df2a44528cd (patch) | |
tree | 625bfd644b948e89105630759cf6decb0435354d /cpp/src/qpid/sys/windows/AsynchIO.cpp | |
parent | ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (diff) | |
download | qpid-python-QPID-2519.tar.gz |
Merged out from trunkQPID-2519
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/windows/AsynchIO.cpp')
-rw-r--r-- | cpp/src/qpid/sys/windows/AsynchIO.cpp | 71 |
1 files changed, 34 insertions, 37 deletions
diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp index 38d8842521..30378d4c5f 100644 --- a/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -30,6 +30,7 @@ #include "qpid/log/Statement.h" #include "qpid/sys/windows/check.h" +#include "qpid/sys/windows/mingw32_compat.h" #include <boost/thread/once.hpp> @@ -46,16 +47,13 @@ namespace { /* * The function pointers for AcceptEx and ConnectEx need to be looked up - * at run time. Make sure this is done only once. + * at run time. */ -boost::once_flag lookUpAcceptExOnce = BOOST_ONCE_INIT; -LPFN_ACCEPTEX fnAcceptEx = 0; -typedef void (*lookUpFunc)(const qpid::sys::Socket &); - -void lookUpAcceptEx() { - SOCKET h = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); +const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::Socket& s) { + SOCKET h = toSocketHandle(s); GUID guidAcceptEx = WSAID_ACCEPTEX; DWORD dwBytes = 0; + LPFN_ACCEPTEX fnAcceptEx; WSAIoctl(h, SIO_GET_EXTENSION_FUNCTION_POINTER, &guidAcceptEx, @@ -65,9 +63,9 @@ void lookUpAcceptEx() { &dwBytes, NULL, NULL); - closesocket(h); if (fnAcceptEx == 0) throw qpid::Exception(QPID_MSG("Failed to look up AcceptEx")); + return fnAcceptEx; } } @@ -94,18 +92,15 @@ private: AsynchAcceptor::Callback acceptedCallback; const Socket& socket; + const LPFN_ACCEPTEX fnAcceptEx; }; AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) : acceptedCallback(callback), - socket(s) { + socket(s), + fnAcceptEx(lookUpAcceptEx(s)) { s.setNonblocking(); -#if (BOOST_VERSION >= 103500) /* boost 1.35 or later reversed the args */ - boost::call_once(lookUpAcceptExOnce, lookUpAcceptEx); -#else - boost::call_once(lookUpAcceptEx, lookUpAcceptExOnce); -#endif } AsynchAcceptor::~AsynchAcceptor() @@ -114,7 +109,8 @@ AsynchAcceptor::~AsynchAcceptor() } void AsynchAcceptor::start(Poller::shared_ptr poller) { - poller->monitorHandle(PollerHandle(socket), Poller::INPUT); + PollerHandle ph = PollerHandle(socket); + poller->monitorHandle(ph, Poller::INPUT); restart (); } @@ -122,25 +118,26 @@ void AsynchAcceptor::restart(void) { DWORD bytesReceived = 0; // Not used, needed for AcceptEx API AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback, this, - toSocketHandle(socket)); + socket); BOOL status; - status = ::fnAcceptEx(toSocketHandle(socket), - toSocketHandle(*result->newSocket), - result->addressBuffer, - 0, - AsynchAcceptResult::SOCKADDRMAXLEN, - AsynchAcceptResult::SOCKADDRMAXLEN, - &bytesReceived, - result->overlapped()); + status = fnAcceptEx(toSocketHandle(socket), + toSocketHandle(*result->newSocket), + result->addressBuffer, + 0, + AsynchAcceptResult::SOCKADDRMAXLEN, + AsynchAcceptResult::SOCKADDRMAXLEN, + &bytesReceived, + result->overlapped()); QPID_WINDOWS_CHECK_ASYNC_START(status); } AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb, AsynchAcceptor *acceptor, - SOCKET listener) - : callback(cb), acceptor(acceptor), listener(listener) { - newSocket.reset (new Socket()); + const Socket& listener) + : callback(cb), acceptor(acceptor), + listener(toSocketHandle(listener)), + newSocket(listener.createSameTypeSocket()) { } void AsynchAcceptResult::success(size_t /*bytesTransferred*/) { @@ -154,7 +151,7 @@ void AsynchAcceptResult::success(size_t /*bytesTransferred*/) { delete this; } -void AsynchAcceptResult::failure(int status) { +void AsynchAcceptResult::failure(int /*status*/) { //if (status != WSA_OPERATION_ABORTED) // Can there be anything else? ; delete this; @@ -173,20 +170,20 @@ private: FailedCallback failCallback; const Socket& socket; const std::string hostname; - const uint16_t port; + const std::string port; public: AsynchConnector(const Socket& socket, - std::string hostname, - uint16_t port, + const std::string& hostname, + const std::string& port, ConnectedCallback connCb, FailedCallback failCb = 0); void start(Poller::shared_ptr poller); }; AsynchConnector::AsynchConnector(const Socket& sock, - std::string hname, - uint16_t p, + const std::string& hname, + const std::string& p, ConnectedCallback connCb, FailedCallback failCb) : connCallback(connCb), failCallback(failCb), socket(sock), @@ -216,8 +213,8 @@ AsynchAcceptor* AsynchAcceptor::create(const Socket& s, } AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s, - std::string hostname, - uint16_t port, + const std::string& hostname, + const std::string& port, ConnectedCallback connCb, FailedCallback failCb) { @@ -410,8 +407,9 @@ void AsynchIO::queueForDeletion() { } void AsynchIO::start(Poller::shared_ptr poller0) { + PollerHandle ph = PollerHandle(socket); poller = poller0; - poller->monitorHandle(PollerHandle(socket), Poller::INPUT); + poller->monitorHandle(ph, Poller::INPUT); if (writeQueue.size() > 0) // Already have data queued for write notifyPendingWrite(); startReading(); @@ -584,7 +582,6 @@ void AsynchIO::notifyIdle(void) { void AsynchIO::startWrite(AsynchIO::BufferBase* buff) { writeInProgress = true; InterlockedIncrement(&opsInProgress); - int writeCount = buff->byteCount-buff->dataCount; AsynchWriteResult *result = new AsynchWriteResult(boost::bind(&AsynchIO::completion, this, _1), buff, |