From 1d2363b6dccc3139ca4cbda3f1127d40adff1d3e Mon Sep 17 00:00:00 2001 From: Clifford Allan Jansen Date: Mon, 19 Mar 2012 23:24:23 +0000 Subject: QPID-3759 hang on heartbeat connection close git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1302718 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) (limited to 'qpid/cpp') diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp index 30378d4c5f..fb8df5ddf8 100644 --- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -295,6 +295,8 @@ private: volatile bool queuedDelete; // Socket close requested, but there are operations in progress. volatile bool queuedClose; + // Most recent asynch read request + volatile AsynchReadResult* pendingRead; private: // Dispatch events that have completed. @@ -374,6 +376,7 @@ AsynchIO::AsynchIO(const Socket& s, writeInProgress(false), queuedDelete(false), queuedClose(false), + pendingRead(0), working(false) { } @@ -504,6 +507,7 @@ void AsynchIO::startReading() { } } // On status 0 or WSA_IO_PENDING, completion will handle the rest. + pendingRead = result; } else { notifyBuffersEmpty(); @@ -617,16 +621,17 @@ void AsynchIO::readComplete(AsynchReadResult *result) { int status = result->getStatus(); size_t bytes = result->getTransferred(); if (status == 0 && bytes > 0) { - bool restartRead = true; // May not if receiver doesn't want more if (readCallback) readCallback(*this, result->getBuff()); - if (restartRead) - startReading(); + startReading(); } else { // No data read, so put the buffer back. It may be partially filled, // so "unread" it back to the front of the queue. unread(result->getBuff()); + if (queuedClose && status == ERROR_OPERATION_ABORTED) { + return; // Expected reap from CancelIoEx + } notifyEof(); if (status != 0) { @@ -697,8 +702,11 @@ void AsynchIO::completion(AsynchIoResult *result) { { ScopedUnlock ul(completionLock); AsynchReadResult *r = dynamic_cast(result); - if (r != 0) + if (r != 0) { readComplete(r); + // Set pendingRead to 0 if it's still pointing to (newly completed) r + InterlockedCompareExchangePointer((void * volatile *)&pendingRead, 0, r); + } else { AsynchWriteResult *w = dynamic_cast(result); @@ -732,6 +740,15 @@ void AsynchIO::completion(AsynchIoResult *result) { else if (queuedDelete) delete this; } + else { + if (queuedClose && pendingRead) { + // Force outstanding read to completion. Layer above will + // call back. + CancelIoEx((HANDLE)toSocketHandle(socket), + ((AsynchReadResult *)pendingRead)->overlapped()); + pendingRead = 0; + } + } } } // namespace windows -- cgit v1.2.1