summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/windows
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2013-02-28 16:14:30 +0000
committerKim van der Riet <kpvdr@apache.org>2013-02-28 16:14:30 +0000
commit9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch)
tree2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/sys/windows
parent172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff)
downloadqpid-python-asyncstore.tar.gz
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/windows')
-rw-r--r--cpp/src/qpid/sys/windows/AsynchIO.cpp79
-rw-r--r--cpp/src/qpid/sys/windows/FileSysDir.cpp35
-rwxr-xr-xcpp/src/qpid/sys/windows/IOHandle.cpp13
-rwxr-xr-xcpp/src/qpid/sys/windows/IoHandlePrivate.h17
-rwxr-xr-xcpp/src/qpid/sys/windows/IocpPoller.cpp4
-rw-r--r--cpp/src/qpid/sys/windows/PollableCondition.cpp11
-rw-r--r--cpp/src/qpid/sys/windows/QpidDllMain.h72
-rw-r--r--cpp/src/qpid/sys/windows/SslAsynchIO.cpp20
-rw-r--r--cpp/src/qpid/sys/windows/SslAsynchIO.h5
-rwxr-xr-xcpp/src/qpid/sys/windows/SystemInfo.cpp65
-rwxr-xr-xcpp/src/qpid/sys/windows/Thread.cpp23
-rw-r--r--cpp/src/qpid/sys/windows/WinSocket.cpp (renamed from cpp/src/qpid/sys/windows/Socket.cpp)124
-rw-r--r--cpp/src/qpid/sys/windows/WinSocket.h118
13 files changed, 407 insertions, 179 deletions
diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp
index 355acbe0e6..b36ee9f941 100644
--- a/cpp/src/qpid/sys/windows/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -24,6 +24,8 @@
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Socket.h"
+#include "qpid/sys/windows/WinSocket.h"
+#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/Poller.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Time.h"
@@ -50,8 +52,8 @@ namespace {
* The function pointers for AcceptEx and ConnectEx need to be looked up
* at run time.
*/
-const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::Socket& s) {
- SOCKET h = toSocketHandle(s);
+const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::IOHandle& io) {
+ SOCKET h = io.fd;
GUID guidAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes = 0;
LPFN_ACCEPTEX fnAcceptEx;
@@ -93,12 +95,14 @@ private:
AsynchAcceptor::Callback acceptedCallback;
const Socket& socket;
+ const SOCKET wSocket;
const LPFN_ACCEPTEX fnAcceptEx;
};
AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback)
: acceptedCallback(callback),
socket(s),
+ wSocket(IOHandle(s).fd),
fnAcceptEx(lookUpAcceptEx(s)) {
s.setNonblocking();
@@ -121,8 +125,8 @@ void AsynchAcceptor::restart(void) {
this,
socket);
BOOL status;
- status = fnAcceptEx(toSocketHandle(socket),
- toSocketHandle(*result->newSocket),
+ status = fnAcceptEx(wSocket,
+ IOHandle(*result->newSocket).fd,
result->addressBuffer,
0,
AsynchAcceptResult::SOCKADDRMAXLEN,
@@ -133,16 +137,30 @@ void AsynchAcceptor::restart(void) {
}
+Socket* createSameTypeSocket(const Socket& sock) {
+ SOCKET socket = IOHandle(sock).fd;
+ // Socket currently has no actual socket attached
+ if (socket == INVALID_SOCKET)
+ return new WinSocket;
+
+ ::sockaddr_storage sa;
+ ::socklen_t salen = sizeof(sa);
+ QPID_WINSOCK_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen));
+ SOCKET s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
+ if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
+ return new WinSocket(s);
+}
+
AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb,
AsynchAcceptor *acceptor,
- const Socket& listener)
+ const Socket& lsocket)
: callback(cb), acceptor(acceptor),
- listener(toSocketHandle(listener)),
- newSocket(listener.createSameTypeSocket()) {
+ listener(IOHandle(lsocket).fd),
+ newSocket(createSameTypeSocket(lsocket)) {
}
void AsynchAcceptResult::success(size_t /*bytesTransferred*/) {
- ::setsockopt (toSocketHandle(*newSocket),
+ ::setsockopt (IOHandle(*newSocket).fd,
SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT,
(char*)&listener,
@@ -180,6 +198,7 @@ public:
ConnectedCallback connCb,
FailedCallback failCb = 0);
void start(Poller::shared_ptr poller);
+ void requestCallback(RequestCallback rCb);
};
AsynchConnector::AsynchConnector(const Socket& sock,
@@ -195,7 +214,7 @@ AsynchConnector::AsynchConnector(const Socket& sock,
void AsynchConnector::start(Poller::shared_ptr)
{
try {
- socket.connect(hostname, port);
+ socket.connect(SocketAddress(hostname, port));
socket.setNonblocking();
connCallback(socket);
} catch(std::exception& e) {
@@ -205,6 +224,13 @@ void AsynchConnector::start(Poller::shared_ptr)
}
}
+// This can never be called in the current windows code as connect
+// is blocking and requestCallback only makes sense if connect is
+// non-blocking with the results returned via a poller callback.
+void AsynchConnector::requestCallback(RequestCallback rCb)
+{
+}
+
} // namespace windows
AsynchAcceptor* AsynchAcceptor::create(const Socket& s,
@@ -260,8 +286,6 @@ public:
virtual void notifyPendingWrite();
virtual void queueWriteClose();
virtual bool writeQueueEmpty();
- virtual void startReading();
- virtual void stopReading();
virtual void requestCallback(RequestCallback);
/**
@@ -272,6 +296,8 @@ public:
*/
virtual BufferBase* getQueuedBuffer();
+ virtual SecuritySettings getSecuritySettings(void);
+
private:
ReadCallback readCallback;
EofCallback eofCallback;
@@ -319,6 +345,12 @@ private:
void close(void);
/**
+ * startReading initiates reading, readComplete() is
+ * called when the read completes.
+ */
+ void startReading();
+
+ /**
* readComplete is called when a read request is complete.
*
* @param result Results of the operation.
@@ -362,7 +394,7 @@ class CallbackHandle : public IOHandle {
public:
CallbackHandle(AsynchIoResult::Completer completeCb,
AsynchIO::RequestCallback reqCb = 0) :
- IOHandle(new IOHandlePrivate (INVALID_SOCKET, completeCb, reqCb))
+ IOHandle(INVALID_SOCKET, completeCb, reqCb)
{}
};
@@ -515,7 +547,7 @@ void AsynchIO::startReading() {
DWORD bytesReceived = 0, flags = 0;
InterlockedIncrement(&opsInProgress);
readInProgress = true;
- int status = WSARecv(toSocketHandle(socket),
+ int status = WSARecv(IOHandle(socket).fd,
const_cast<LPWSABUF>(result->getWSABUF()), 1,
&bytesReceived,
&flags,
@@ -537,15 +569,6 @@ void AsynchIO::startReading() {
return;
}
-// stopReading was added to prevent a race condition with read-credit on Linux.
-// It may or may not be required on windows.
-//
-// AsynchIOHandler::readbuff() calls stopReading() inside the same
-// critical section that protects startReading() in
-// AsynchIOHandler::giveReadCredit().
-//
-void AsynchIO::stopReading() {}
-
// Queue the specified callback for invocation from an I/O thread.
void AsynchIO::requestCallback(RequestCallback callback) {
// This method is generally called from a processing thread; transfer
@@ -613,7 +636,7 @@ void AsynchIO::startWrite(AsynchIO::BufferBase* buff) {
buff,
buff->dataCount);
DWORD bytesSent = 0;
- int status = WSASend(toSocketHandle(socket),
+ int status = WSASend(IOHandle(socket).fd,
const_cast<LPWSABUF>(result->getWSABUF()), 1,
&bytesSent,
0,
@@ -639,6 +662,13 @@ void AsynchIO::close(void) {
notifyClosed();
}
+SecuritySettings AsynchIO::getSecuritySettings() {
+ SecuritySettings settings;
+ settings.ssf = socket.getKeyLen();
+ settings.authid = socket.getClientAuthId();
+ return settings;
+}
+
void AsynchIO::readComplete(AsynchReadResult *result) {
int status = result->getStatus();
size_t bytes = result->getTransferred();
@@ -683,7 +713,8 @@ void AsynchIO::writeComplete(AsynchWriteResult *result) {
else {
// An error... if it's a connection close, ignore it - it will be
// noticed and handled on a read completion any moment now.
- // What to do with real error??? Save the Buffer?
+ // What to do with real error??? Save the Buffer? TBD.
+ queueReadBuffer(buff); // All done; back to the pool
}
}
diff --git a/cpp/src/qpid/sys/windows/FileSysDir.cpp b/cpp/src/qpid/sys/windows/FileSysDir.cpp
index 88f1637d48..e090747715 100644
--- a/cpp/src/qpid/sys/windows/FileSysDir.cpp
+++ b/cpp/src/qpid/sys/windows/FileSysDir.cpp
@@ -24,6 +24,9 @@
#include <sys/stat.h>
#include <direct.h>
#include <errno.h>
+#include <windows.h>
+#include <strsafe.h>
+
namespace qpid {
namespace sys {
@@ -50,4 +53,36 @@ void FileSysDir::mkdir(void)
throw Exception ("Can't create directory: " + dirPath);
}
+void FileSysDir::forEachFile(Callback cb) const {
+
+ WIN32_FIND_DATAA findFileData;
+ char szDir[MAX_PATH];
+ size_t dirPathLength;
+ HANDLE hFind = INVALID_HANDLE_VALUE;
+
+ // create dirPath+"\*" in szDir
+ StringCchLength (dirPath.c_str(), MAX_PATH, &dirPathLength);
+
+ if (dirPathLength > (MAX_PATH - 3)) {
+ throw Exception ("Directory path is too long: " + dirPath);
+ }
+
+ StringCchCopy(szDir, MAX_PATH, dirPath.c_str());
+ StringCchCat(szDir, MAX_PATH, TEXT("\\*"));
+
+ // Special work for first file
+ hFind = FindFirstFileA(szDir, &findFileData);
+ if (INVALID_HANDLE_VALUE == hFind) {
+ return;
+ }
+
+ // process everything that isn't a directory
+ do {
+ if (!(findFileData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY)) {
+ std::string fileName(findFileData.cFileName);
+ cb(fileName);
+ }
+ } while (FindNextFile(hFind, &findFileData) != 0);
+}
+
}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/windows/IOHandle.cpp b/cpp/src/qpid/sys/windows/IOHandle.cpp
index 250737cb99..19a1c44875 100755
--- a/cpp/src/qpid/sys/windows/IOHandle.cpp
+++ b/cpp/src/qpid/sys/windows/IOHandle.cpp
@@ -19,24 +19,11 @@
*
*/
-#include "qpid/sys/IOHandle.h"
#include "qpid/sys/windows/IoHandlePrivate.h"
#include <windows.h>
namespace qpid {
namespace sys {
-SOCKET toFd(const IOHandlePrivate* h)
-{
- return h->fd;
-}
-
-IOHandle::IOHandle(IOHandlePrivate* h) :
- impl(h)
-{}
-
-IOHandle::~IOHandle() {
- delete impl;
-}
}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/windows/IoHandlePrivate.h b/cpp/src/qpid/sys/windows/IoHandlePrivate.h
index 5943db5cc7..4529ad93ec 100755
--- a/cpp/src/qpid/sys/windows/IoHandlePrivate.h
+++ b/cpp/src/qpid/sys/windows/IoHandlePrivate.h
@@ -38,15 +38,14 @@ namespace sys {
// completer from an I/O thread. If the callback mechanism is used, there
// can be a RequestCallback set - this carries the callback object through
// from AsynchIO::requestCallback() through to the I/O completion processing.
-class IOHandlePrivate {
- friend QPID_COMMON_EXTERN SOCKET toSocketHandle(const Socket& s);
- static IOHandlePrivate* getImpl(const IOHandle& h);
-
+class IOHandle {
public:
- IOHandlePrivate(SOCKET f = INVALID_SOCKET,
- windows::AsynchIoResult::Completer cb = 0,
- AsynchIO::RequestCallback reqCallback = 0) :
- fd(f), event(cb), cbRequest(reqCallback)
+ IOHandle(SOCKET f = INVALID_SOCKET,
+ windows::AsynchIoResult::Completer cb = 0,
+ AsynchIO::RequestCallback reqCallback = 0) :
+ fd(f),
+ event(cb),
+ cbRequest(reqCallback)
{}
SOCKET fd;
@@ -54,8 +53,6 @@ public:
AsynchIO::RequestCallback cbRequest;
};
-QPID_COMMON_EXTERN SOCKET toSocketHandle(const Socket& s);
-
}}
#endif /* _sys_windows_IoHandlePrivate_h */
diff --git a/cpp/src/qpid/sys/windows/IocpPoller.cpp b/cpp/src/qpid/sys/windows/IocpPoller.cpp
index c81cef87b0..ecb33c5517 100755
--- a/cpp/src/qpid/sys/windows/IocpPoller.cpp
+++ b/cpp/src/qpid/sys/windows/IocpPoller.cpp
@@ -22,7 +22,7 @@
#include "qpid/sys/Poller.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Dispatcher.h"
-
+#include "qpid/sys/IOHandle.h"
#include "qpid/sys/windows/AsynchIoResult.h"
#include "qpid/sys/windows/IoHandlePrivate.h"
#include "qpid/sys/windows/check.h"
@@ -55,7 +55,7 @@ class PollerHandlePrivate {
};
PollerHandle::PollerHandle(const IOHandle& h) :
- impl(new PollerHandlePrivate(toSocketHandle(static_cast<const Socket&>(h)), h.impl->event, h.impl->cbRequest))
+ impl(new PollerHandlePrivate(h.fd, h.event, h.cbRequest))
{}
PollerHandle::~PollerHandle() {
diff --git a/cpp/src/qpid/sys/windows/PollableCondition.cpp b/cpp/src/qpid/sys/windows/PollableCondition.cpp
index bb637be0a6..3e2a5fb36c 100644
--- a/cpp/src/qpid/sys/windows/PollableCondition.cpp
+++ b/cpp/src/qpid/sys/windows/PollableCondition.cpp
@@ -52,14 +52,14 @@ private:
PollableCondition& parent;
boost::shared_ptr<sys::Poller> poller;
LONG isSet;
+ LONG isDispatching;
};
PollableConditionPrivate::PollableConditionPrivate(const sys::PollableCondition::Callback& cb,
sys::PollableCondition& parent,
const boost::shared_ptr<sys::Poller>& poller)
- : IOHandle(new sys::IOHandlePrivate(INVALID_SOCKET,
- boost::bind(&PollableConditionPrivate::dispatch, this, _1))),
- cb(cb), parent(parent), poller(poller), isSet(0)
+ : IOHandle(INVALID_SOCKET, boost::bind(&PollableConditionPrivate::dispatch, this, _1)),
+ cb(cb), parent(parent), poller(poller), isSet(0), isDispatching(0)
{
}
@@ -78,7 +78,12 @@ void PollableConditionPrivate::poke()
void PollableConditionPrivate::dispatch(windows::AsynchIoResult *result)
{
delete result; // Poller::monitorHandle() allocates this
+ // If isDispatching is already set, just return. Else, enter.
+ if (::InterlockedCompareExchange(&isDispatching, 1, 0) == 1)
+ return;
cb(parent);
+ LONG oops = ::InterlockedDecrement(&isDispatching); // Result must be 0
+ assert(!oops);
if (isSet)
poke();
}
diff --git a/cpp/src/qpid/sys/windows/QpidDllMain.h b/cpp/src/qpid/sys/windows/QpidDllMain.h
new file mode 100644
index 0000000000..74eaf0256a
--- /dev/null
+++ b/cpp/src/qpid/sys/windows/QpidDllMain.h
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * Include this file once in each DLL that relies on SystemInfo.h:
+ * threadSafeShutdown(). Note that Thread.cpp has a more elaborate
+ * DllMain, that also provides this functionality separately.
+ *
+ * Teardown is in the reverse order of the DLL dependencies used
+ * during the load phase. The calls to DllMain and the static
+ * destructors are from the same thread, so no locking is necessary
+ * and there is no downside to an invocation of DllMain by multiple
+ * Qpid DLLs.
+ */
+
+#ifdef _DLL
+
+#include <qpid/ImportExport.h>
+#include <windows.h>
+
+namespace qpid {
+namespace sys {
+namespace windows {
+
+QPID_IMPORT bool processExiting;
+QPID_IMPORT bool libraryUnloading;
+
+}}} // namespace qpid::sys::SystemInfo
+
+
+BOOL APIENTRY DllMain(HMODULE hm, DWORD reason, LPVOID reserved) {
+ switch (reason) {
+ case DLL_PROCESS_ATTACH:
+ case DLL_THREAD_ATTACH:
+ case DLL_THREAD_DETACH:
+ break;
+
+ case DLL_PROCESS_DETACH:
+ // Remember how the process is terminating this DLL.
+ if (reserved != NULL) {
+ qpid::sys::windows::processExiting = true;
+ // Danger: all threading suspect, including indirect use of malloc or locks.
+ // Think twice before adding more functionality here.
+ return TRUE;
+ }
+ else {
+ qpid::sys::windows::libraryUnloading = true;
+ }
+ break;
+ }
+ return TRUE;
+}
+
+
+#endif
diff --git a/cpp/src/qpid/sys/windows/SslAsynchIO.cpp b/cpp/src/qpid/sys/windows/SslAsynchIO.cpp
index d263f00ab3..e48c799b29 100644
--- a/cpp/src/qpid/sys/windows/SslAsynchIO.cpp
+++ b/cpp/src/qpid/sys/windows/SslAsynchIO.cpp
@@ -209,18 +209,6 @@ bool SslAsynchIO::writeQueueEmpty() {
return aio->writeQueueEmpty();
}
-/*
- * Initiate a read operation. AsynchIO::readComplete() will be
- * called when the read is complete and data is available.
- */
-void SslAsynchIO::startReading() {
- aio->startReading();
-}
-
-void SslAsynchIO::stopReading() {
- aio->stopReading();
-}
-
// Queue the specified callback for invocation from an I/O thread.
void SslAsynchIO::requestCallback(RequestCallback callback) {
aio->requestCallback(callback);
@@ -241,11 +229,15 @@ AsynchIO::BufferBase* SslAsynchIO::getQueuedBuffer() {
return sslBuff;
}
-unsigned int SslAsynchIO::getSslKeySize() {
+SecuritySettings SslAsynchIO::getSecuritySettings() {
SecPkgContext_KeyInfo info;
memset(&info, 0, sizeof(info));
::QueryContextAttributes(&ctxtHandle, SECPKG_ATTR_KEY_INFO, &info);
- return info.KeySize;
+
+ SecuritySettings settings;
+ settings.ssf = info.KeySize;
+ settings.authid = std::string();
+ return settings;
}
void SslAsynchIO::negotiationDone() {
diff --git a/cpp/src/qpid/sys/windows/SslAsynchIO.h b/cpp/src/qpid/sys/windows/SslAsynchIO.h
index e9d9e8d629..2f6842b135 100644
--- a/cpp/src/qpid/sys/windows/SslAsynchIO.h
+++ b/cpp/src/qpid/sys/windows/SslAsynchIO.h
@@ -77,12 +77,9 @@ public:
virtual void notifyPendingWrite();
virtual void queueWriteClose();
virtual bool writeQueueEmpty();
- virtual void startReading();
- virtual void stopReading();
virtual void requestCallback(RequestCallback);
virtual BufferBase* getQueuedBuffer();
-
- QPID_COMMON_EXTERN unsigned int getSslKeySize();
+ virtual SecuritySettings getSecuritySettings(void);
protected:
CredHandle credHandle;
diff --git a/cpp/src/qpid/sys/windows/SystemInfo.cpp b/cpp/src/qpid/sys/windows/SystemInfo.cpp
index cef78dcc60..fb58d53b81 100755
--- a/cpp/src/qpid/sys/windows/SystemInfo.cpp
+++ b/cpp/src/qpid/sys/windows/SystemInfo.cpp
@@ -25,7 +25,8 @@
#include "qpid/sys/SystemInfo.h"
#include "qpid/sys/IntegerTypes.h"
-#include "qpid/Exception.h"
+#include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
#include <assert.h>
#include <winsock2.h>
@@ -66,39 +67,10 @@ bool SystemInfo::getLocalHostname (Address &address) {
static const std::string LOCALHOST("127.0.0.1");
static const std::string TCP("tcp");
-void SystemInfo::getLocalIpAddresses (uint16_t port,
- std::vector<Address> &addrList) {
- enum { MAX_URL_INTERFACES = 100 };
-
- SOCKET s = socket (PF_INET, SOCK_STREAM, 0);
- if (s != INVALID_SOCKET) {
- INTERFACE_INFO interfaces[MAX_URL_INTERFACES];
- DWORD filledBytes = 0;
- WSAIoctl (s,
- SIO_GET_INTERFACE_LIST,
- 0,
- 0,
- interfaces,
- sizeof (interfaces),
- &filledBytes,
- 0,
- 0);
- unsigned int interfaceCount = filledBytes / sizeof (INTERFACE_INFO);
- for (unsigned int i = 0; i < interfaceCount; ++i) {
- if (interfaces[i].iiFlags & IFF_UP) {
- std::string addr(inet_ntoa(interfaces[i].iiAddress.AddressIn.sin_addr));
- if (addr != LOCALHOST)
- addrList.push_back(Address(TCP, addr, port));
- }
- }
- closesocket (s);
- }
-}
-
-bool SystemInfo::isLocalHost(const std::string& candidateHost) {
- // FIXME aconway 2012-05-03: not implemented.
- assert(0);
- throw Exception("Not implemented: isLocalHost");
+// Null function which always fails to find an network interface name
+bool SystemInfo::getInterfaceAddresses(const std::string&, std::vector<std::string>&)
+{
+ return false;
}
void SystemInfo::getSystemId (std::string &osName,
@@ -208,4 +180,29 @@ std::string SystemInfo::getProcessName()
return name;
}
+
+#ifdef _DLL
+namespace windows {
+// set from one or more Qpid DLLs: i.e. in DllMain with DLL_PROCESS_DETACH
+QPID_EXPORT bool processExiting = false;
+QPID_EXPORT bool libraryUnloading = false;
+}
+#endif
+
+bool SystemInfo::threadSafeShutdown()
+{
+#ifdef _DLL
+ if (!windows::processExiting && !windows::libraryUnloading) {
+ // called before exit() or FreeLibrary(), or by a DLL without
+ // a participating DllMain.
+ QPID_LOG(warning, "invalid query for shutdown state");
+ throw qpid::Exception(QPID_MSG("Unable to determine shutdown state."));
+ }
+ return !windows::processExiting;
+#else
+ // Not a DLL: shutdown can only be by exit() or return from main().
+ return false;
+#endif
+}
+
}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/windows/Thread.cpp b/cpp/src/qpid/sys/windows/Thread.cpp
index 23b0033be4..b342c9da1d 100755
--- a/cpp/src/qpid/sys/windows/Thread.cpp
+++ b/cpp/src/qpid/sys/windows/Thread.cpp
@@ -27,6 +27,7 @@
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/windows/check.h"
+#include "qpid/sys/SystemInfo.h"
#include <process.h>
#include <windows.h>
@@ -274,8 +275,17 @@ Thread Thread::current() {
#ifdef _DLL
+namespace qpid {
+namespace sys {
+namespace windows {
+
+extern bool processExiting;
+extern bool libraryUnloading;
+
+}}} // namespace qpid::sys::SystemInfo
+
// DllMain: called possibly many times in a process lifetime if dll
-// loaded and freed repeatedly . Be mindful of Windows loader lock
+// loaded and freed repeatedly. Be mindful of Windows loader lock
// and other DllMain restrictions.
BOOL APIENTRY DllMain(HMODULE hm, DWORD reason, LPVOID reserved) {
@@ -290,10 +300,12 @@ BOOL APIENTRY DllMain(HMODULE hm, DWORD reason, LPVOID reserved) {
if (reserved != NULL) {
// process exit(): threads are stopped arbitrarily and
// possibly in an inconsistent state. Not even threadLock
- // can be trusted. All static destructors have been
- // called at this point and any resources this unit knows
- // about will be released as part of process tear down by
- // the OS. Accordingly, do nothing.
+ // can be trusted. All static destructors for this unit
+ // are pending and face the same unsafe environment.
+ // Any resources this unit knows about will be released as
+ // part of process tear down by the OS. Accordingly, skip
+ // any clean up tasks.
+ qpid::sys::windows::processExiting = true;
return TRUE;
}
else {
@@ -301,6 +313,7 @@ BOOL APIENTRY DllMain(HMODULE hm, DWORD reason, LPVOID reserved) {
// encouraged to clean up to avoid leaks. Mostly we just
// want any straggler threads to finish and notify
// threadsDone as the last thing they do.
+ qpid::sys::windows::libraryUnloading = true;
while (1) {
{
ScopedCriticalSection l(threadLock);
diff --git a/cpp/src/qpid/sys/windows/Socket.cpp b/cpp/src/qpid/sys/windows/WinSocket.cpp
index a4374260cc..b2d2d79c63 100644
--- a/cpp/src/qpid/sys/windows/Socket.cpp
+++ b/cpp/src/qpid/sys/windows/WinSocket.cpp
@@ -19,18 +19,12 @@
*
*/
-#include "qpid/sys/Socket.h"
+#include "qpid/sys/windows/WinSocket.h"
#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/windows/check.h"
#include "qpid/sys/windows/IoHandlePrivate.h"
-
-// Ensure we get all of winsock2.h
-#ifndef _WIN32_WINNT
-#define _WIN32_WINNT 0x0501
-#endif
-
-#include <winsock2.h>
+#include "qpid/sys/SystemInfo.h"
namespace qpid {
namespace sys {
@@ -67,7 +61,8 @@ public:
}
~WinSockSetup() {
- WSACleanup();
+ if (SystemInfo::threadSafeShutdown())
+ WSACleanup();
}
public:
@@ -106,22 +101,32 @@ uint16_t getLocalPort(int fd)
}
} // namespace
-Socket::Socket() :
- IOHandle(new IOHandlePrivate),
+WinSocket::WinSocket() :
+ handle(new IOHandle),
nonblocking(false),
nodelay(false)
{}
-Socket::Socket(IOHandlePrivate* h) :
- IOHandle(h),
+Socket* createSocket()
+{
+ return new WinSocket;
+}
+
+WinSocket::WinSocket(SOCKET fd) :
+ handle(new IOHandle(fd)),
nonblocking(false),
nodelay(false)
{}
-void Socket::createSocket(const SocketAddress& sa) const
+WinSocket::operator const IOHandle&() const
{
- SOCKET& socket = impl->fd;
- if (socket != INVALID_SOCKET) Socket::close();
+ return *handle;
+}
+
+void WinSocket::createSocket(const SocketAddress& sa) const
+{
+ SOCKET& socket = handle->fd;
+ if (socket != INVALID_SOCKET) WinSocket::close();
SOCKET s = ::socket (getAddrInfo(sa).ai_family,
getAddrInfo(sa).ai_socktype,
@@ -139,39 +144,19 @@ void Socket::createSocket(const SocketAddress& sa) const
}
}
-Socket* Socket::createSameTypeSocket() const {
- SOCKET& socket = impl->fd;
- // Socket currently has no actual socket attached
- if (socket == INVALID_SOCKET)
- return new Socket;
-
- ::sockaddr_storage sa;
- ::socklen_t salen = sizeof(sa);
- QPID_WINSOCK_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen));
- SOCKET s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
- if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
- return new Socket(new IOHandlePrivate(s));
-}
-
-void Socket::setNonblocking() const {
+void WinSocket::setNonblocking() const {
u_long nonblock = 1;
- QPID_WINSOCK_CHECK(ioctlsocket(impl->fd, FIONBIO, &nonblock));
-}
-
-void Socket::connect(const std::string& host, const std::string& port) const
-{
- SocketAddress sa(host, port);
- connect(sa);
+ QPID_WINSOCK_CHECK(ioctlsocket(handle->fd, FIONBIO, &nonblock));
}
void
-Socket::connect(const SocketAddress& addr) const
+WinSocket::connect(const SocketAddress& addr) const
{
peername = addr.asString(false);
createSocket(addr);
- const SOCKET& socket = impl->fd;
+ const SOCKET& socket = handle->fd;
int err;
WSASetLastError(0);
if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) != 0) &&
@@ -180,44 +165,43 @@ Socket::connect(const SocketAddress& addr) const
}
void
-Socket::close() const
+WinSocket::finishConnect(const SocketAddress&) const
{
- SOCKET& socket = impl->fd;
+}
+
+void
+WinSocket::close() const
+{
+ SOCKET& socket = handle->fd;
if (socket == INVALID_SOCKET) return;
QPID_WINSOCK_CHECK(closesocket(socket));
socket = INVALID_SOCKET;
}
-int Socket::write(const void *buf, size_t count) const
+int WinSocket::write(const void *buf, size_t count) const
{
- const SOCKET& socket = impl->fd;
+ const SOCKET& socket = handle->fd;
int sent = ::send(socket, (const char *)buf, count, 0);
if (sent == SOCKET_ERROR)
return -1;
return sent;
}
-int Socket::read(void *buf, size_t count) const
+int WinSocket::read(void *buf, size_t count) const
{
- const SOCKET& socket = impl->fd;
+ const SOCKET& socket = handle->fd;
int received = ::recv(socket, (char *)buf, count, 0);
if (received == SOCKET_ERROR)
return -1;
return received;
}
-int Socket::listen(const std::string& host, const std::string& port, int backlog) const
-{
- SocketAddress sa(host, port);
- return listen(sa, backlog);
-}
-
-int Socket::listen(const SocketAddress& addr, int backlog) const
+int WinSocket::listen(const SocketAddress& addr, int backlog) const
{
createSocket(addr);
- const SOCKET& socket = impl->fd;
+ const SOCKET& socket = handle->fd;
BOOL yes=1;
QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, sizeof(yes)));
@@ -229,48 +213,48 @@ int Socket::listen(const SocketAddress& addr, int backlog) const
return getLocalPort(socket);
}
-Socket* Socket::accept() const
+Socket* WinSocket::accept() const
{
- SOCKET afd = ::accept(impl->fd, 0, 0);
+ SOCKET afd = ::accept(handle->fd, 0, 0);
if (afd != INVALID_SOCKET)
- return new Socket(new IOHandlePrivate(afd));
+ return new WinSocket(afd);
else if (WSAGetLastError() == EAGAIN)
return 0;
else throw QPID_WINDOWS_ERROR(WSAGetLastError());
}
-std::string Socket::getPeerAddress() const
+std::string WinSocket::getPeerAddress() const
{
if (peername.empty()) {
- peername = getName(impl->fd, false);
+ peername = getName(handle->fd, false);
}
return peername;
}
-std::string Socket::getLocalAddress() const
+std::string WinSocket::getLocalAddress() const
{
if (localname.empty()) {
- localname = getName(impl->fd, true);
+ localname = getName(handle->fd, true);
}
return localname;
}
-int Socket::getError() const
+int WinSocket::getError() const
{
int result;
socklen_t rSize = sizeof (result);
- QPID_WINSOCK_CHECK(::getsockopt(impl->fd, SOL_SOCKET, SO_ERROR, (char *)&result, &rSize));
+ QPID_WINSOCK_CHECK(::getsockopt(handle->fd, SOL_SOCKET, SO_ERROR, (char *)&result, &rSize));
return result;
}
-void Socket::setTcpNoDelay() const
+void WinSocket::setTcpNoDelay() const
{
- SOCKET& socket = impl->fd;
+ SOCKET& socket = handle->fd;
nodelay = true;
if (socket != INVALID_SOCKET) {
int flag = 1;
- int result = setsockopt(impl->fd,
+ int result = setsockopt(handle->fd,
IPPROTO_TCP,
TCP_NODELAY,
(char *)&flag,
@@ -279,14 +263,14 @@ void Socket::setTcpNoDelay() const
}
}
-inline IOHandlePrivate* IOHandlePrivate::getImpl(const qpid::sys::IOHandle &h)
+int WinSocket::getKeyLen() const
{
- return h.impl;
+ return 0;
}
-SOCKET toSocketHandle(const Socket& s)
+std::string WinSocket::getClientAuthId() const
{
- return IOHandlePrivate::getImpl(s)->fd;
+ return std::string();
}
}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/windows/WinSocket.h b/cpp/src/qpid/sys/windows/WinSocket.h
new file mode 100644
index 0000000000..bee6a58e7a
--- /dev/null
+++ b/cpp/src/qpid/sys/windows/WinSocket.h
@@ -0,0 +1,118 @@
+#ifndef QPID_SYS_WINDOWS_BSDSOCKET_H
+#define QPID_SYS_WINDOWS_BSDSOCKET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/CommonImportExport.h"
+#include <string>
+
+#include <boost/scoped_ptr.hpp>
+
+// Ensure we get all of winsock2.h
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+
+#include <winsock2.h>
+
+namespace qpid {
+namespace sys {
+
+namespace windows {
+Socket* createSameTypeSocket(const Socket&);
+}
+
+class Duration;
+class IOHandle;
+class SocketAddress;
+
+class QPID_COMMON_CLASS_EXTERN WinSocket : public Socket
+{
+public:
+ /** Create a socket wrapper for descriptor. */
+ QPID_COMMON_EXTERN WinSocket();
+
+ QPID_COMMON_EXTERN operator const IOHandle&() const;
+
+ /** Set socket non blocking */
+ QPID_COMMON_EXTERN virtual void setNonblocking() const;
+
+ QPID_COMMON_EXTERN virtual void setTcpNoDelay() const;
+
+ QPID_COMMON_EXTERN virtual void connect(const SocketAddress&) const;
+ QPID_COMMON_EXTERN virtual void finishConnect(const SocketAddress&) const;
+
+ QPID_COMMON_EXTERN virtual void close() const;
+
+ /** Bind to a port and start listening.
+ *@return The bound port number
+ */
+ QPID_COMMON_EXTERN virtual int listen(const SocketAddress&, int backlog = 10) const;
+
+ /**
+ * Returns an address (host and port) for the remote end of the
+ * socket
+ */
+ QPID_COMMON_EXTERN std::string getPeerAddress() const;
+ /**
+ * Returns an address (host and port) for the local end of the
+ * socket
+ */
+ QPID_COMMON_EXTERN std::string getLocalAddress() const;
+
+ /**
+ * Returns the error code stored in the socket. This may be used
+ * to determine the result of a non-blocking connect.
+ */
+ QPID_COMMON_EXTERN int getError() const;
+
+ /** Accept a connection from a socket that is already listening
+ * and has an incoming connection
+ */
+ QPID_COMMON_EXTERN virtual Socket* accept() const;
+
+ // TODO The following are raw operations, maybe they need better wrapping?
+ QPID_COMMON_EXTERN virtual int read(void *buf, size_t count) const;
+ QPID_COMMON_EXTERN virtual int write(const void *buf, size_t count) const;
+
+ QPID_COMMON_EXTERN int getKeyLen() const;
+ QPID_COMMON_EXTERN std::string getClientAuthId() const;
+
+protected:
+ /** Create socket */
+ void createSocket(const SocketAddress&) const;
+
+ mutable boost::scoped_ptr<IOHandle> handle;
+ mutable std::string localname;
+ mutable std::string peername;
+ mutable bool nonblocking;
+ mutable bool nodelay;
+
+ /** Construct socket with existing handle */
+ friend Socket* qpid::sys::windows::createSameTypeSocket(const Socket&);
+ WinSocket(SOCKET fd);
+};
+
+}}
+#endif /*!QPID_SYS_WINDOWS_BSDSOCKET_H*/