diff options
Diffstat (limited to 'cpp/src/qpid/sys/windows')
-rw-r--r-- | cpp/src/qpid/sys/windows/AsynchIO.cpp | 79 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/FileSysDir.cpp | 35 | ||||
-rwxr-xr-x | cpp/src/qpid/sys/windows/IOHandle.cpp | 13 | ||||
-rwxr-xr-x | cpp/src/qpid/sys/windows/IoHandlePrivate.h | 17 | ||||
-rwxr-xr-x | cpp/src/qpid/sys/windows/IocpPoller.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/PollableCondition.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/QpidDllMain.h | 72 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/SslAsynchIO.cpp | 20 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/SslAsynchIO.h | 5 | ||||
-rwxr-xr-x | cpp/src/qpid/sys/windows/SystemInfo.cpp | 65 | ||||
-rwxr-xr-x | cpp/src/qpid/sys/windows/Thread.cpp | 23 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/WinSocket.cpp (renamed from cpp/src/qpid/sys/windows/Socket.cpp) | 124 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/WinSocket.h | 118 |
13 files changed, 407 insertions, 179 deletions
diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp index 355acbe0e6..b36ee9f941 100644 --- a/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -24,6 +24,8 @@ #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Socket.h" +#include "qpid/sys/windows/WinSocket.h" +#include "qpid/sys/SocketAddress.h" #include "qpid/sys/Poller.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Time.h" @@ -50,8 +52,8 @@ namespace { * The function pointers for AcceptEx and ConnectEx need to be looked up * at run time. */ -const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::Socket& s) { - SOCKET h = toSocketHandle(s); +const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::IOHandle& io) { + SOCKET h = io.fd; GUID guidAcceptEx = WSAID_ACCEPTEX; DWORD dwBytes = 0; LPFN_ACCEPTEX fnAcceptEx; @@ -93,12 +95,14 @@ private: AsynchAcceptor::Callback acceptedCallback; const Socket& socket; + const SOCKET wSocket; const LPFN_ACCEPTEX fnAcceptEx; }; AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) : acceptedCallback(callback), socket(s), + wSocket(IOHandle(s).fd), fnAcceptEx(lookUpAcceptEx(s)) { s.setNonblocking(); @@ -121,8 +125,8 @@ void AsynchAcceptor::restart(void) { this, socket); BOOL status; - status = fnAcceptEx(toSocketHandle(socket), - toSocketHandle(*result->newSocket), + status = fnAcceptEx(wSocket, + IOHandle(*result->newSocket).fd, result->addressBuffer, 0, AsynchAcceptResult::SOCKADDRMAXLEN, @@ -133,16 +137,30 @@ void AsynchAcceptor::restart(void) { } +Socket* createSameTypeSocket(const Socket& sock) { + SOCKET socket = IOHandle(sock).fd; + // Socket currently has no actual socket attached + if (socket == INVALID_SOCKET) + return new WinSocket; + + ::sockaddr_storage sa; + ::socklen_t salen = sizeof(sa); + QPID_WINSOCK_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen)); + SOCKET s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM + if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); + return new WinSocket(s); +} + AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb, AsynchAcceptor *acceptor, - const Socket& listener) + const Socket& lsocket) : callback(cb), acceptor(acceptor), - listener(toSocketHandle(listener)), - newSocket(listener.createSameTypeSocket()) { + listener(IOHandle(lsocket).fd), + newSocket(createSameTypeSocket(lsocket)) { } void AsynchAcceptResult::success(size_t /*bytesTransferred*/) { - ::setsockopt (toSocketHandle(*newSocket), + ::setsockopt (IOHandle(*newSocket).fd, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&listener, @@ -180,6 +198,7 @@ public: ConnectedCallback connCb, FailedCallback failCb = 0); void start(Poller::shared_ptr poller); + void requestCallback(RequestCallback rCb); }; AsynchConnector::AsynchConnector(const Socket& sock, @@ -195,7 +214,7 @@ AsynchConnector::AsynchConnector(const Socket& sock, void AsynchConnector::start(Poller::shared_ptr) { try { - socket.connect(hostname, port); + socket.connect(SocketAddress(hostname, port)); socket.setNonblocking(); connCallback(socket); } catch(std::exception& e) { @@ -205,6 +224,13 @@ void AsynchConnector::start(Poller::shared_ptr) } } +// This can never be called in the current windows code as connect +// is blocking and requestCallback only makes sense if connect is +// non-blocking with the results returned via a poller callback. +void AsynchConnector::requestCallback(RequestCallback rCb) +{ +} + } // namespace windows AsynchAcceptor* AsynchAcceptor::create(const Socket& s, @@ -260,8 +286,6 @@ public: virtual void notifyPendingWrite(); virtual void queueWriteClose(); virtual bool writeQueueEmpty(); - virtual void startReading(); - virtual void stopReading(); virtual void requestCallback(RequestCallback); /** @@ -272,6 +296,8 @@ public: */ virtual BufferBase* getQueuedBuffer(); + virtual SecuritySettings getSecuritySettings(void); + private: ReadCallback readCallback; EofCallback eofCallback; @@ -319,6 +345,12 @@ private: void close(void); /** + * startReading initiates reading, readComplete() is + * called when the read completes. + */ + void startReading(); + + /** * readComplete is called when a read request is complete. * * @param result Results of the operation. @@ -362,7 +394,7 @@ class CallbackHandle : public IOHandle { public: CallbackHandle(AsynchIoResult::Completer completeCb, AsynchIO::RequestCallback reqCb = 0) : - IOHandle(new IOHandlePrivate (INVALID_SOCKET, completeCb, reqCb)) + IOHandle(INVALID_SOCKET, completeCb, reqCb) {} }; @@ -515,7 +547,7 @@ void AsynchIO::startReading() { DWORD bytesReceived = 0, flags = 0; InterlockedIncrement(&opsInProgress); readInProgress = true; - int status = WSARecv(toSocketHandle(socket), + int status = WSARecv(IOHandle(socket).fd, const_cast<LPWSABUF>(result->getWSABUF()), 1, &bytesReceived, &flags, @@ -537,15 +569,6 @@ void AsynchIO::startReading() { return; } -// stopReading was added to prevent a race condition with read-credit on Linux. -// It may or may not be required on windows. -// -// AsynchIOHandler::readbuff() calls stopReading() inside the same -// critical section that protects startReading() in -// AsynchIOHandler::giveReadCredit(). -// -void AsynchIO::stopReading() {} - // Queue the specified callback for invocation from an I/O thread. void AsynchIO::requestCallback(RequestCallback callback) { // This method is generally called from a processing thread; transfer @@ -613,7 +636,7 @@ void AsynchIO::startWrite(AsynchIO::BufferBase* buff) { buff, buff->dataCount); DWORD bytesSent = 0; - int status = WSASend(toSocketHandle(socket), + int status = WSASend(IOHandle(socket).fd, const_cast<LPWSABUF>(result->getWSABUF()), 1, &bytesSent, 0, @@ -639,6 +662,13 @@ void AsynchIO::close(void) { notifyClosed(); } +SecuritySettings AsynchIO::getSecuritySettings() { + SecuritySettings settings; + settings.ssf = socket.getKeyLen(); + settings.authid = socket.getClientAuthId(); + return settings; +} + void AsynchIO::readComplete(AsynchReadResult *result) { int status = result->getStatus(); size_t bytes = result->getTransferred(); @@ -683,7 +713,8 @@ void AsynchIO::writeComplete(AsynchWriteResult *result) { else { // An error... if it's a connection close, ignore it - it will be // noticed and handled on a read completion any moment now. - // What to do with real error??? Save the Buffer? + // What to do with real error??? Save the Buffer? TBD. + queueReadBuffer(buff); // All done; back to the pool } } diff --git a/cpp/src/qpid/sys/windows/FileSysDir.cpp b/cpp/src/qpid/sys/windows/FileSysDir.cpp index 88f1637d48..e090747715 100644 --- a/cpp/src/qpid/sys/windows/FileSysDir.cpp +++ b/cpp/src/qpid/sys/windows/FileSysDir.cpp @@ -24,6 +24,9 @@ #include <sys/stat.h> #include <direct.h> #include <errno.h> +#include <windows.h> +#include <strsafe.h> + namespace qpid { namespace sys { @@ -50,4 +53,36 @@ void FileSysDir::mkdir(void) throw Exception ("Can't create directory: " + dirPath); } +void FileSysDir::forEachFile(Callback cb) const { + + WIN32_FIND_DATAA findFileData; + char szDir[MAX_PATH]; + size_t dirPathLength; + HANDLE hFind = INVALID_HANDLE_VALUE; + + // create dirPath+"\*" in szDir + StringCchLength (dirPath.c_str(), MAX_PATH, &dirPathLength); + + if (dirPathLength > (MAX_PATH - 3)) { + throw Exception ("Directory path is too long: " + dirPath); + } + + StringCchCopy(szDir, MAX_PATH, dirPath.c_str()); + StringCchCat(szDir, MAX_PATH, TEXT("\\*")); + + // Special work for first file + hFind = FindFirstFileA(szDir, &findFileData); + if (INVALID_HANDLE_VALUE == hFind) { + return; + } + + // process everything that isn't a directory + do { + if (!(findFileData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY)) { + std::string fileName(findFileData.cFileName); + cb(fileName); + } + } while (FindNextFile(hFind, &findFileData) != 0); +} + }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/windows/IOHandle.cpp b/cpp/src/qpid/sys/windows/IOHandle.cpp index 250737cb99..19a1c44875 100755 --- a/cpp/src/qpid/sys/windows/IOHandle.cpp +++ b/cpp/src/qpid/sys/windows/IOHandle.cpp @@ -19,24 +19,11 @@ * */ -#include "qpid/sys/IOHandle.h" #include "qpid/sys/windows/IoHandlePrivate.h" #include <windows.h> namespace qpid { namespace sys { -SOCKET toFd(const IOHandlePrivate* h) -{ - return h->fd; -} - -IOHandle::IOHandle(IOHandlePrivate* h) : - impl(h) -{} - -IOHandle::~IOHandle() { - delete impl; -} }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/windows/IoHandlePrivate.h b/cpp/src/qpid/sys/windows/IoHandlePrivate.h index 5943db5cc7..4529ad93ec 100755 --- a/cpp/src/qpid/sys/windows/IoHandlePrivate.h +++ b/cpp/src/qpid/sys/windows/IoHandlePrivate.h @@ -38,15 +38,14 @@ namespace sys { // completer from an I/O thread. If the callback mechanism is used, there // can be a RequestCallback set - this carries the callback object through // from AsynchIO::requestCallback() through to the I/O completion processing. -class IOHandlePrivate { - friend QPID_COMMON_EXTERN SOCKET toSocketHandle(const Socket& s); - static IOHandlePrivate* getImpl(const IOHandle& h); - +class IOHandle { public: - IOHandlePrivate(SOCKET f = INVALID_SOCKET, - windows::AsynchIoResult::Completer cb = 0, - AsynchIO::RequestCallback reqCallback = 0) : - fd(f), event(cb), cbRequest(reqCallback) + IOHandle(SOCKET f = INVALID_SOCKET, + windows::AsynchIoResult::Completer cb = 0, + AsynchIO::RequestCallback reqCallback = 0) : + fd(f), + event(cb), + cbRequest(reqCallback) {} SOCKET fd; @@ -54,8 +53,6 @@ public: AsynchIO::RequestCallback cbRequest; }; -QPID_COMMON_EXTERN SOCKET toSocketHandle(const Socket& s); - }} #endif /* _sys_windows_IoHandlePrivate_h */ diff --git a/cpp/src/qpid/sys/windows/IocpPoller.cpp b/cpp/src/qpid/sys/windows/IocpPoller.cpp index c81cef87b0..ecb33c5517 100755 --- a/cpp/src/qpid/sys/windows/IocpPoller.cpp +++ b/cpp/src/qpid/sys/windows/IocpPoller.cpp @@ -22,7 +22,7 @@ #include "qpid/sys/Poller.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Dispatcher.h" - +#include "qpid/sys/IOHandle.h" #include "qpid/sys/windows/AsynchIoResult.h" #include "qpid/sys/windows/IoHandlePrivate.h" #include "qpid/sys/windows/check.h" @@ -55,7 +55,7 @@ class PollerHandlePrivate { }; PollerHandle::PollerHandle(const IOHandle& h) : - impl(new PollerHandlePrivate(toSocketHandle(static_cast<const Socket&>(h)), h.impl->event, h.impl->cbRequest)) + impl(new PollerHandlePrivate(h.fd, h.event, h.cbRequest)) {} PollerHandle::~PollerHandle() { diff --git a/cpp/src/qpid/sys/windows/PollableCondition.cpp b/cpp/src/qpid/sys/windows/PollableCondition.cpp index bb637be0a6..3e2a5fb36c 100644 --- a/cpp/src/qpid/sys/windows/PollableCondition.cpp +++ b/cpp/src/qpid/sys/windows/PollableCondition.cpp @@ -52,14 +52,14 @@ private: PollableCondition& parent; boost::shared_ptr<sys::Poller> poller; LONG isSet; + LONG isDispatching; }; PollableConditionPrivate::PollableConditionPrivate(const sys::PollableCondition::Callback& cb, sys::PollableCondition& parent, const boost::shared_ptr<sys::Poller>& poller) - : IOHandle(new sys::IOHandlePrivate(INVALID_SOCKET, - boost::bind(&PollableConditionPrivate::dispatch, this, _1))), - cb(cb), parent(parent), poller(poller), isSet(0) + : IOHandle(INVALID_SOCKET, boost::bind(&PollableConditionPrivate::dispatch, this, _1)), + cb(cb), parent(parent), poller(poller), isSet(0), isDispatching(0) { } @@ -78,7 +78,12 @@ void PollableConditionPrivate::poke() void PollableConditionPrivate::dispatch(windows::AsynchIoResult *result) { delete result; // Poller::monitorHandle() allocates this + // If isDispatching is already set, just return. Else, enter. + if (::InterlockedCompareExchange(&isDispatching, 1, 0) == 1) + return; cb(parent); + LONG oops = ::InterlockedDecrement(&isDispatching); // Result must be 0 + assert(!oops); if (isSet) poke(); } diff --git a/cpp/src/qpid/sys/windows/QpidDllMain.h b/cpp/src/qpid/sys/windows/QpidDllMain.h new file mode 100644 index 0000000000..74eaf0256a --- /dev/null +++ b/cpp/src/qpid/sys/windows/QpidDllMain.h @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * Include this file once in each DLL that relies on SystemInfo.h: + * threadSafeShutdown(). Note that Thread.cpp has a more elaborate + * DllMain, that also provides this functionality separately. + * + * Teardown is in the reverse order of the DLL dependencies used + * during the load phase. The calls to DllMain and the static + * destructors are from the same thread, so no locking is necessary + * and there is no downside to an invocation of DllMain by multiple + * Qpid DLLs. + */ + +#ifdef _DLL + +#include <qpid/ImportExport.h> +#include <windows.h> + +namespace qpid { +namespace sys { +namespace windows { + +QPID_IMPORT bool processExiting; +QPID_IMPORT bool libraryUnloading; + +}}} // namespace qpid::sys::SystemInfo + + +BOOL APIENTRY DllMain(HMODULE hm, DWORD reason, LPVOID reserved) { + switch (reason) { + case DLL_PROCESS_ATTACH: + case DLL_THREAD_ATTACH: + case DLL_THREAD_DETACH: + break; + + case DLL_PROCESS_DETACH: + // Remember how the process is terminating this DLL. + if (reserved != NULL) { + qpid::sys::windows::processExiting = true; + // Danger: all threading suspect, including indirect use of malloc or locks. + // Think twice before adding more functionality here. + return TRUE; + } + else { + qpid::sys::windows::libraryUnloading = true; + } + break; + } + return TRUE; +} + + +#endif diff --git a/cpp/src/qpid/sys/windows/SslAsynchIO.cpp b/cpp/src/qpid/sys/windows/SslAsynchIO.cpp index d263f00ab3..e48c799b29 100644 --- a/cpp/src/qpid/sys/windows/SslAsynchIO.cpp +++ b/cpp/src/qpid/sys/windows/SslAsynchIO.cpp @@ -209,18 +209,6 @@ bool SslAsynchIO::writeQueueEmpty() { return aio->writeQueueEmpty(); } -/* - * Initiate a read operation. AsynchIO::readComplete() will be - * called when the read is complete and data is available. - */ -void SslAsynchIO::startReading() { - aio->startReading(); -} - -void SslAsynchIO::stopReading() { - aio->stopReading(); -} - // Queue the specified callback for invocation from an I/O thread. void SslAsynchIO::requestCallback(RequestCallback callback) { aio->requestCallback(callback); @@ -241,11 +229,15 @@ AsynchIO::BufferBase* SslAsynchIO::getQueuedBuffer() { return sslBuff; } -unsigned int SslAsynchIO::getSslKeySize() { +SecuritySettings SslAsynchIO::getSecuritySettings() { SecPkgContext_KeyInfo info; memset(&info, 0, sizeof(info)); ::QueryContextAttributes(&ctxtHandle, SECPKG_ATTR_KEY_INFO, &info); - return info.KeySize; + + SecuritySettings settings; + settings.ssf = info.KeySize; + settings.authid = std::string(); + return settings; } void SslAsynchIO::negotiationDone() { diff --git a/cpp/src/qpid/sys/windows/SslAsynchIO.h b/cpp/src/qpid/sys/windows/SslAsynchIO.h index e9d9e8d629..2f6842b135 100644 --- a/cpp/src/qpid/sys/windows/SslAsynchIO.h +++ b/cpp/src/qpid/sys/windows/SslAsynchIO.h @@ -77,12 +77,9 @@ public: virtual void notifyPendingWrite(); virtual void queueWriteClose(); virtual bool writeQueueEmpty(); - virtual void startReading(); - virtual void stopReading(); virtual void requestCallback(RequestCallback); virtual BufferBase* getQueuedBuffer(); - - QPID_COMMON_EXTERN unsigned int getSslKeySize(); + virtual SecuritySettings getSecuritySettings(void); protected: CredHandle credHandle; diff --git a/cpp/src/qpid/sys/windows/SystemInfo.cpp b/cpp/src/qpid/sys/windows/SystemInfo.cpp index cef78dcc60..fb58d53b81 100755 --- a/cpp/src/qpid/sys/windows/SystemInfo.cpp +++ b/cpp/src/qpid/sys/windows/SystemInfo.cpp @@ -25,7 +25,8 @@ #include "qpid/sys/SystemInfo.h" #include "qpid/sys/IntegerTypes.h" -#include "qpid/Exception.h"
+#include "qpid/Exception.h" +#include "qpid/log/Statement.h" #include <assert.h> #include <winsock2.h> @@ -66,39 +67,10 @@ bool SystemInfo::getLocalHostname (Address &address) { static const std::string LOCALHOST("127.0.0.1"); static const std::string TCP("tcp"); -void SystemInfo::getLocalIpAddresses (uint16_t port, - std::vector<Address> &addrList) { - enum { MAX_URL_INTERFACES = 100 }; - - SOCKET s = socket (PF_INET, SOCK_STREAM, 0); - if (s != INVALID_SOCKET) { - INTERFACE_INFO interfaces[MAX_URL_INTERFACES]; - DWORD filledBytes = 0; - WSAIoctl (s, - SIO_GET_INTERFACE_LIST, - 0, - 0, - interfaces, - sizeof (interfaces), - &filledBytes, - 0, - 0); - unsigned int interfaceCount = filledBytes / sizeof (INTERFACE_INFO); - for (unsigned int i = 0; i < interfaceCount; ++i) { - if (interfaces[i].iiFlags & IFF_UP) { - std::string addr(inet_ntoa(interfaces[i].iiAddress.AddressIn.sin_addr)); - if (addr != LOCALHOST) - addrList.push_back(Address(TCP, addr, port)); - } - } - closesocket (s); - } -} - -bool SystemInfo::isLocalHost(const std::string& candidateHost) { - // FIXME aconway 2012-05-03: not implemented. - assert(0); - throw Exception("Not implemented: isLocalHost"); +// Null function which always fails to find an network interface name +bool SystemInfo::getInterfaceAddresses(const std::string&, std::vector<std::string>&) +{ + return false; } void SystemInfo::getSystemId (std::string &osName, @@ -208,4 +180,29 @@ std::string SystemInfo::getProcessName() return name; } + +#ifdef _DLL +namespace windows { +// set from one or more Qpid DLLs: i.e. in DllMain with DLL_PROCESS_DETACH +QPID_EXPORT bool processExiting = false; +QPID_EXPORT bool libraryUnloading = false; +} +#endif + +bool SystemInfo::threadSafeShutdown() +{ +#ifdef _DLL + if (!windows::processExiting && !windows::libraryUnloading) { + // called before exit() or FreeLibrary(), or by a DLL without + // a participating DllMain. + QPID_LOG(warning, "invalid query for shutdown state"); + throw qpid::Exception(QPID_MSG("Unable to determine shutdown state.")); + } + return !windows::processExiting; +#else + // Not a DLL: shutdown can only be by exit() or return from main(). + return false; +#endif +} + }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/windows/Thread.cpp b/cpp/src/qpid/sys/windows/Thread.cpp index 23b0033be4..b342c9da1d 100755 --- a/cpp/src/qpid/sys/windows/Thread.cpp +++ b/cpp/src/qpid/sys/windows/Thread.cpp @@ -27,6 +27,7 @@ #include "qpid/sys/Thread.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/windows/check.h" +#include "qpid/sys/SystemInfo.h" #include <process.h> #include <windows.h> @@ -274,8 +275,17 @@ Thread Thread::current() { #ifdef _DLL +namespace qpid { +namespace sys { +namespace windows { + +extern bool processExiting; +extern bool libraryUnloading; + +}}} // namespace qpid::sys::SystemInfo + // DllMain: called possibly many times in a process lifetime if dll -// loaded and freed repeatedly . Be mindful of Windows loader lock +// loaded and freed repeatedly. Be mindful of Windows loader lock // and other DllMain restrictions. BOOL APIENTRY DllMain(HMODULE hm, DWORD reason, LPVOID reserved) { @@ -290,10 +300,12 @@ BOOL APIENTRY DllMain(HMODULE hm, DWORD reason, LPVOID reserved) { if (reserved != NULL) { // process exit(): threads are stopped arbitrarily and // possibly in an inconsistent state. Not even threadLock - // can be trusted. All static destructors have been - // called at this point and any resources this unit knows - // about will be released as part of process tear down by - // the OS. Accordingly, do nothing. + // can be trusted. All static destructors for this unit + // are pending and face the same unsafe environment. + // Any resources this unit knows about will be released as + // part of process tear down by the OS. Accordingly, skip + // any clean up tasks. + qpid::sys::windows::processExiting = true; return TRUE; } else { @@ -301,6 +313,7 @@ BOOL APIENTRY DllMain(HMODULE hm, DWORD reason, LPVOID reserved) { // encouraged to clean up to avoid leaks. Mostly we just // want any straggler threads to finish and notify // threadsDone as the last thing they do. + qpid::sys::windows::libraryUnloading = true; while (1) { { ScopedCriticalSection l(threadLock); diff --git a/cpp/src/qpid/sys/windows/Socket.cpp b/cpp/src/qpid/sys/windows/WinSocket.cpp index a4374260cc..b2d2d79c63 100644 --- a/cpp/src/qpid/sys/windows/Socket.cpp +++ b/cpp/src/qpid/sys/windows/WinSocket.cpp @@ -19,18 +19,12 @@ * */ -#include "qpid/sys/Socket.h" +#include "qpid/sys/windows/WinSocket.h" #include "qpid/sys/SocketAddress.h" #include "qpid/sys/windows/check.h" #include "qpid/sys/windows/IoHandlePrivate.h" - -// Ensure we get all of winsock2.h -#ifndef _WIN32_WINNT -#define _WIN32_WINNT 0x0501 -#endif - -#include <winsock2.h> +#include "qpid/sys/SystemInfo.h" namespace qpid { namespace sys { @@ -67,7 +61,8 @@ public: } ~WinSockSetup() { - WSACleanup(); + if (SystemInfo::threadSafeShutdown()) + WSACleanup(); } public: @@ -106,22 +101,32 @@ uint16_t getLocalPort(int fd) } } // namespace -Socket::Socket() : - IOHandle(new IOHandlePrivate), +WinSocket::WinSocket() : + handle(new IOHandle), nonblocking(false), nodelay(false) {} -Socket::Socket(IOHandlePrivate* h) : - IOHandle(h), +Socket* createSocket() +{ + return new WinSocket; +} + +WinSocket::WinSocket(SOCKET fd) : + handle(new IOHandle(fd)), nonblocking(false), nodelay(false) {} -void Socket::createSocket(const SocketAddress& sa) const +WinSocket::operator const IOHandle&() const { - SOCKET& socket = impl->fd; - if (socket != INVALID_SOCKET) Socket::close(); + return *handle; +} + +void WinSocket::createSocket(const SocketAddress& sa) const +{ + SOCKET& socket = handle->fd; + if (socket != INVALID_SOCKET) WinSocket::close(); SOCKET s = ::socket (getAddrInfo(sa).ai_family, getAddrInfo(sa).ai_socktype, @@ -139,39 +144,19 @@ void Socket::createSocket(const SocketAddress& sa) const } } -Socket* Socket::createSameTypeSocket() const { - SOCKET& socket = impl->fd; - // Socket currently has no actual socket attached - if (socket == INVALID_SOCKET) - return new Socket; - - ::sockaddr_storage sa; - ::socklen_t salen = sizeof(sa); - QPID_WINSOCK_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen)); - SOCKET s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM - if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); - return new Socket(new IOHandlePrivate(s)); -} - -void Socket::setNonblocking() const { +void WinSocket::setNonblocking() const { u_long nonblock = 1; - QPID_WINSOCK_CHECK(ioctlsocket(impl->fd, FIONBIO, &nonblock)); -} - -void Socket::connect(const std::string& host, const std::string& port) const -{ - SocketAddress sa(host, port); - connect(sa); + QPID_WINSOCK_CHECK(ioctlsocket(handle->fd, FIONBIO, &nonblock)); } void -Socket::connect(const SocketAddress& addr) const +WinSocket::connect(const SocketAddress& addr) const { peername = addr.asString(false); createSocket(addr); - const SOCKET& socket = impl->fd; + const SOCKET& socket = handle->fd; int err; WSASetLastError(0); if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) != 0) && @@ -180,44 +165,43 @@ Socket::connect(const SocketAddress& addr) const } void -Socket::close() const +WinSocket::finishConnect(const SocketAddress&) const { - SOCKET& socket = impl->fd; +} + +void +WinSocket::close() const +{ + SOCKET& socket = handle->fd; if (socket == INVALID_SOCKET) return; QPID_WINSOCK_CHECK(closesocket(socket)); socket = INVALID_SOCKET; } -int Socket::write(const void *buf, size_t count) const +int WinSocket::write(const void *buf, size_t count) const { - const SOCKET& socket = impl->fd; + const SOCKET& socket = handle->fd; int sent = ::send(socket, (const char *)buf, count, 0); if (sent == SOCKET_ERROR) return -1; return sent; } -int Socket::read(void *buf, size_t count) const +int WinSocket::read(void *buf, size_t count) const { - const SOCKET& socket = impl->fd; + const SOCKET& socket = handle->fd; int received = ::recv(socket, (char *)buf, count, 0); if (received == SOCKET_ERROR) return -1; return received; } -int Socket::listen(const std::string& host, const std::string& port, int backlog) const -{ - SocketAddress sa(host, port); - return listen(sa, backlog); -} - -int Socket::listen(const SocketAddress& addr, int backlog) const +int WinSocket::listen(const SocketAddress& addr, int backlog) const { createSocket(addr); - const SOCKET& socket = impl->fd; + const SOCKET& socket = handle->fd; BOOL yes=1; QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, sizeof(yes))); @@ -229,48 +213,48 @@ int Socket::listen(const SocketAddress& addr, int backlog) const return getLocalPort(socket); } -Socket* Socket::accept() const +Socket* WinSocket::accept() const { - SOCKET afd = ::accept(impl->fd, 0, 0); + SOCKET afd = ::accept(handle->fd, 0, 0); if (afd != INVALID_SOCKET) - return new Socket(new IOHandlePrivate(afd)); + return new WinSocket(afd); else if (WSAGetLastError() == EAGAIN) return 0; else throw QPID_WINDOWS_ERROR(WSAGetLastError()); } -std::string Socket::getPeerAddress() const +std::string WinSocket::getPeerAddress() const { if (peername.empty()) { - peername = getName(impl->fd, false); + peername = getName(handle->fd, false); } return peername; } -std::string Socket::getLocalAddress() const +std::string WinSocket::getLocalAddress() const { if (localname.empty()) { - localname = getName(impl->fd, true); + localname = getName(handle->fd, true); } return localname; } -int Socket::getError() const +int WinSocket::getError() const { int result; socklen_t rSize = sizeof (result); - QPID_WINSOCK_CHECK(::getsockopt(impl->fd, SOL_SOCKET, SO_ERROR, (char *)&result, &rSize)); + QPID_WINSOCK_CHECK(::getsockopt(handle->fd, SOL_SOCKET, SO_ERROR, (char *)&result, &rSize)); return result; } -void Socket::setTcpNoDelay() const +void WinSocket::setTcpNoDelay() const { - SOCKET& socket = impl->fd; + SOCKET& socket = handle->fd; nodelay = true; if (socket != INVALID_SOCKET) { int flag = 1; - int result = setsockopt(impl->fd, + int result = setsockopt(handle->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, @@ -279,14 +263,14 @@ void Socket::setTcpNoDelay() const } } -inline IOHandlePrivate* IOHandlePrivate::getImpl(const qpid::sys::IOHandle &h) +int WinSocket::getKeyLen() const { - return h.impl; + return 0; } -SOCKET toSocketHandle(const Socket& s) +std::string WinSocket::getClientAuthId() const { - return IOHandlePrivate::getImpl(s)->fd; + return std::string(); } }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/windows/WinSocket.h b/cpp/src/qpid/sys/windows/WinSocket.h new file mode 100644 index 0000000000..bee6a58e7a --- /dev/null +++ b/cpp/src/qpid/sys/windows/WinSocket.h @@ -0,0 +1,118 @@ +#ifndef QPID_SYS_WINDOWS_BSDSOCKET_H +#define QPID_SYS_WINDOWS_BSDSOCKET_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Socket.h" +#include "qpid/sys/IntegerTypes.h" +#include "qpid/CommonImportExport.h" +#include <string> + +#include <boost/scoped_ptr.hpp> + +// Ensure we get all of winsock2.h +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0501 +#endif + +#include <winsock2.h> + +namespace qpid { +namespace sys { + +namespace windows { +Socket* createSameTypeSocket(const Socket&); +} + +class Duration; +class IOHandle; +class SocketAddress; + +class QPID_COMMON_CLASS_EXTERN WinSocket : public Socket +{ +public: + /** Create a socket wrapper for descriptor. */ + QPID_COMMON_EXTERN WinSocket(); + + QPID_COMMON_EXTERN operator const IOHandle&() const; + + /** Set socket non blocking */ + QPID_COMMON_EXTERN virtual void setNonblocking() const; + + QPID_COMMON_EXTERN virtual void setTcpNoDelay() const; + + QPID_COMMON_EXTERN virtual void connect(const SocketAddress&) const; + QPID_COMMON_EXTERN virtual void finishConnect(const SocketAddress&) const; + + QPID_COMMON_EXTERN virtual void close() const; + + /** Bind to a port and start listening. + *@return The bound port number + */ + QPID_COMMON_EXTERN virtual int listen(const SocketAddress&, int backlog = 10) const; + + /** + * Returns an address (host and port) for the remote end of the + * socket + */ + QPID_COMMON_EXTERN std::string getPeerAddress() const; + /** + * Returns an address (host and port) for the local end of the + * socket + */ + QPID_COMMON_EXTERN std::string getLocalAddress() const; + + /** + * Returns the error code stored in the socket. This may be used + * to determine the result of a non-blocking connect. + */ + QPID_COMMON_EXTERN int getError() const; + + /** Accept a connection from a socket that is already listening + * and has an incoming connection + */ + QPID_COMMON_EXTERN virtual Socket* accept() const; + + // TODO The following are raw operations, maybe they need better wrapping? + QPID_COMMON_EXTERN virtual int read(void *buf, size_t count) const; + QPID_COMMON_EXTERN virtual int write(const void *buf, size_t count) const; + + QPID_COMMON_EXTERN int getKeyLen() const; + QPID_COMMON_EXTERN std::string getClientAuthId() const; + +protected: + /** Create socket */ + void createSocket(const SocketAddress&) const; + + mutable boost::scoped_ptr<IOHandle> handle; + mutable std::string localname; + mutable std::string peername; + mutable bool nonblocking; + mutable bool nodelay; + + /** Construct socket with existing handle */ + friend Socket* qpid::sys::windows::createSameTypeSocket(const Socket&); + WinSocket(SOCKET fd); +}; + +}} +#endif /*!QPID_SYS_WINDOWS_BSDSOCKET_H*/ |