summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/windows/AsynchIO.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/windows/AsynchIO.cpp')
-rw-r--r--cpp/src/qpid/sys/windows/AsynchIO.cpp71
1 files changed, 34 insertions, 37 deletions
diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp
index 38d8842521..30378d4c5f 100644
--- a/cpp/src/qpid/sys/windows/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -30,6 +30,7 @@
#include "qpid/log/Statement.h"
#include "qpid/sys/windows/check.h"
+#include "qpid/sys/windows/mingw32_compat.h"
#include <boost/thread/once.hpp>
@@ -46,16 +47,13 @@ namespace {
/*
* The function pointers for AcceptEx and ConnectEx need to be looked up
- * at run time. Make sure this is done only once.
+ * at run time.
*/
-boost::once_flag lookUpAcceptExOnce = BOOST_ONCE_INIT;
-LPFN_ACCEPTEX fnAcceptEx = 0;
-typedef void (*lookUpFunc)(const qpid::sys::Socket &);
-
-void lookUpAcceptEx() {
- SOCKET h = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::Socket& s) {
+ SOCKET h = toSocketHandle(s);
GUID guidAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes = 0;
+ LPFN_ACCEPTEX fnAcceptEx;
WSAIoctl(h,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidAcceptEx,
@@ -65,9 +63,9 @@ void lookUpAcceptEx() {
&dwBytes,
NULL,
NULL);
- closesocket(h);
if (fnAcceptEx == 0)
throw qpid::Exception(QPID_MSG("Failed to look up AcceptEx"));
+ return fnAcceptEx;
}
}
@@ -94,18 +92,15 @@ private:
AsynchAcceptor::Callback acceptedCallback;
const Socket& socket;
+ const LPFN_ACCEPTEX fnAcceptEx;
};
AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback)
: acceptedCallback(callback),
- socket(s) {
+ socket(s),
+ fnAcceptEx(lookUpAcceptEx(s)) {
s.setNonblocking();
-#if (BOOST_VERSION >= 103500) /* boost 1.35 or later reversed the args */
- boost::call_once(lookUpAcceptExOnce, lookUpAcceptEx);
-#else
- boost::call_once(lookUpAcceptEx, lookUpAcceptExOnce);
-#endif
}
AsynchAcceptor::~AsynchAcceptor()
@@ -114,7 +109,8 @@ AsynchAcceptor::~AsynchAcceptor()
}
void AsynchAcceptor::start(Poller::shared_ptr poller) {
- poller->monitorHandle(PollerHandle(socket), Poller::INPUT);
+ PollerHandle ph = PollerHandle(socket);
+ poller->monitorHandle(ph, Poller::INPUT);
restart ();
}
@@ -122,25 +118,26 @@ void AsynchAcceptor::restart(void) {
DWORD bytesReceived = 0; // Not used, needed for AcceptEx API
AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback,
this,
- toSocketHandle(socket));
+ socket);
BOOL status;
- status = ::fnAcceptEx(toSocketHandle(socket),
- toSocketHandle(*result->newSocket),
- result->addressBuffer,
- 0,
- AsynchAcceptResult::SOCKADDRMAXLEN,
- AsynchAcceptResult::SOCKADDRMAXLEN,
- &bytesReceived,
- result->overlapped());
+ status = fnAcceptEx(toSocketHandle(socket),
+ toSocketHandle(*result->newSocket),
+ result->addressBuffer,
+ 0,
+ AsynchAcceptResult::SOCKADDRMAXLEN,
+ AsynchAcceptResult::SOCKADDRMAXLEN,
+ &bytesReceived,
+ result->overlapped());
QPID_WINDOWS_CHECK_ASYNC_START(status);
}
AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb,
AsynchAcceptor *acceptor,
- SOCKET listener)
- : callback(cb), acceptor(acceptor), listener(listener) {
- newSocket.reset (new Socket());
+ const Socket& listener)
+ : callback(cb), acceptor(acceptor),
+ listener(toSocketHandle(listener)),
+ newSocket(listener.createSameTypeSocket()) {
}
void AsynchAcceptResult::success(size_t /*bytesTransferred*/) {
@@ -154,7 +151,7 @@ void AsynchAcceptResult::success(size_t /*bytesTransferred*/) {
delete this;
}
-void AsynchAcceptResult::failure(int status) {
+void AsynchAcceptResult::failure(int /*status*/) {
//if (status != WSA_OPERATION_ABORTED)
// Can there be anything else? ;
delete this;
@@ -173,20 +170,20 @@ private:
FailedCallback failCallback;
const Socket& socket;
const std::string hostname;
- const uint16_t port;
+ const std::string port;
public:
AsynchConnector(const Socket& socket,
- std::string hostname,
- uint16_t port,
+ const std::string& hostname,
+ const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb = 0);
void start(Poller::shared_ptr poller);
};
AsynchConnector::AsynchConnector(const Socket& sock,
- std::string hname,
- uint16_t p,
+ const std::string& hname,
+ const std::string& p,
ConnectedCallback connCb,
FailedCallback failCb) :
connCallback(connCb), failCallback(failCb), socket(sock),
@@ -216,8 +213,8 @@ AsynchAcceptor* AsynchAcceptor::create(const Socket& s,
}
AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s,
- std::string hostname,
- uint16_t port,
+ const std::string& hostname,
+ const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb)
{
@@ -410,8 +407,9 @@ void AsynchIO::queueForDeletion() {
}
void AsynchIO::start(Poller::shared_ptr poller0) {
+ PollerHandle ph = PollerHandle(socket);
poller = poller0;
- poller->monitorHandle(PollerHandle(socket), Poller::INPUT);
+ poller->monitorHandle(ph, Poller::INPUT);
if (writeQueue.size() > 0) // Already have data queued for write
notifyPendingWrite();
startReading();
@@ -584,7 +582,6 @@ void AsynchIO::notifyIdle(void) {
void AsynchIO::startWrite(AsynchIO::BufferBase* buff) {
writeInProgress = true;
InterlockedIncrement(&opsInProgress);
- int writeCount = buff->byteCount-buff->dataCount;
AsynchWriteResult *result =
new AsynchWriteResult(boost::bind(&AsynchIO::completion, this, _1),
buff,