summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Lib/asyncio/windows_events.py53
-rw-r--r--Lib/asyncio/windows_utils.py2
-rw-r--r--Lib/test/test_asyncio/test_windows_events.py13
3 files changed, 46 insertions, 22 deletions
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
index 6c7e058042..c9ba7850ee 100644
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -257,7 +257,7 @@ class PipeServer(object):
def _server_pipe_handle(self, first):
# Return a wrapper for a new pipe handle.
- if self._address is None:
+ if self.closed():
return None
flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
if first:
@@ -273,6 +273,9 @@ class PipeServer(object):
self._free_instances.add(pipe)
return pipe
+ def closed(self):
+ return (self._address is None)
+
def close(self):
if self._accept_pipe_future is not None:
self._accept_pipe_future.cancel()
@@ -325,12 +328,21 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
if f:
pipe = f.result()
server._free_instances.discard(pipe)
+
+ if server.closed():
+ # A client connected before the server was closed:
+ # drop the client (close the pipe) and exit
+ pipe.close()
+ return
+
protocol = protocol_factory()
self._make_duplex_pipe_transport(
pipe, protocol, extra={'addr': address})
+
pipe = server._get_unconnected_pipe()
if pipe is None:
return
+
f = self._proactor.accept_pipe(pipe)
except OSError as exc:
if pipe and pipe.fileno() != -1:
@@ -506,28 +518,25 @@ class IocpProactor:
return self._register(ov, pipe, finish_accept_pipe)
- def _connect_pipe(self, fut, address, delay):
- # Unfortunately there is no way to do an overlapped connect to a pipe.
- # Call CreateFile() in a loop until it doesn't fail with
- # ERROR_PIPE_BUSY
- try:
- handle = _overlapped.ConnectPipe(address)
- except OSError as exc:
- if exc.winerror == _overlapped.ERROR_PIPE_BUSY:
- # Polling: retry later
- delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
- self._loop.call_later(delay,
- self._connect_pipe, fut, address, delay)
- else:
- fut.set_exception(exc)
- else:
- pipe = windows_utils.PipeHandle(handle)
- fut.set_result(pipe)
-
+ @coroutine
def connect_pipe(self, address):
- fut = futures.Future(loop=self._loop)
- self._connect_pipe(fut, address, CONNECT_PIPE_INIT_DELAY)
- return fut
+ delay = CONNECT_PIPE_INIT_DELAY
+ while True:
+ # Unfortunately there is no way to do an overlapped connect to a pipe.
+ # Call CreateFile() in a loop until it doesn't fail with
+ # ERROR_PIPE_BUSY
+ try:
+ handle = _overlapped.ConnectPipe(address)
+ break
+ except OSError as exc:
+ if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
+ raise
+
+ # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
+ delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
+ yield from tasks.sleep(delay, loop=self._loop)
+
+ return windows_utils.PipeHandle(handle)
def wait_for_handle(self, handle, timeout=None):
"""Wait for a handle.
diff --git a/Lib/asyncio/windows_utils.py b/Lib/asyncio/windows_utils.py
index e6642960db..5f8327eba6 100644
--- a/Lib/asyncio/windows_utils.py
+++ b/Lib/asyncio/windows_utils.py
@@ -147,6 +147,8 @@ class PipeHandle:
return self._handle
def fileno(self):
+ if self._handle is None:
+ raise ValueError("I/O operatioon on closed pipe")
return self._handle
def close(self, *, CloseHandle=_winapi.CloseHandle):
diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py
index f9b3dd1567..73d8fcdb18 100644
--- a/Lib/test/test_asyncio/test_windows_events.py
+++ b/Lib/test/test_asyncio/test_windows_events.py
@@ -1,6 +1,7 @@
import os
import sys
import unittest
+from unittest import mock
if sys.platform != 'win32':
raise unittest.SkipTest('Windows only')
@@ -91,6 +92,18 @@ class ProactorTests(test_utils.TestCase):
return 'done'
+ def test_connect_pipe_cancel(self):
+ exc = OSError()
+ exc.winerror = _overlapped.ERROR_PIPE_BUSY
+ with mock.patch.object(_overlapped, 'ConnectPipe', side_effect=exc) as connect:
+ coro = self.loop._proactor.connect_pipe('pipe_address')
+ task = self.loop.create_task(coro)
+
+ # check that it's possible to cancel connect_pipe()
+ task.cancel()
+ with self.assertRaises(asyncio.CancelledError):
+ self.loop.run_until_complete(task)
+
def test_wait_for_handle(self):
event = _overlapped.CreateEvent(None, True, False, None)
self.addCleanup(_winapi.CloseHandle, event)