diff options
-rw-r--r-- | Lib/asyncio/windows_events.py | 53 | ||||
-rw-r--r-- | Lib/asyncio/windows_utils.py | 2 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_windows_events.py | 13 |
3 files changed, 46 insertions, 22 deletions
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 6c7e058042..c9ba7850ee 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -257,7 +257,7 @@ class PipeServer(object): def _server_pipe_handle(self, first): # Return a wrapper for a new pipe handle. - if self._address is None: + if self.closed(): return None flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED if first: @@ -273,6 +273,9 @@ class PipeServer(object): self._free_instances.add(pipe) return pipe + def closed(self): + return (self._address is None) + def close(self): if self._accept_pipe_future is not None: self._accept_pipe_future.cancel() @@ -325,12 +328,21 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop): if f: pipe = f.result() server._free_instances.discard(pipe) + + if server.closed(): + # A client connected before the server was closed: + # drop the client (close the pipe) and exit + pipe.close() + return + protocol = protocol_factory() self._make_duplex_pipe_transport( pipe, protocol, extra={'addr': address}) + pipe = server._get_unconnected_pipe() if pipe is None: return + f = self._proactor.accept_pipe(pipe) except OSError as exc: if pipe and pipe.fileno() != -1: @@ -506,28 +518,25 @@ class IocpProactor: return self._register(ov, pipe, finish_accept_pipe) - def _connect_pipe(self, fut, address, delay): - # Unfortunately there is no way to do an overlapped connect to a pipe. - # Call CreateFile() in a loop until it doesn't fail with - # ERROR_PIPE_BUSY - try: - handle = _overlapped.ConnectPipe(address) - except OSError as exc: - if exc.winerror == _overlapped.ERROR_PIPE_BUSY: - # Polling: retry later - delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY) - self._loop.call_later(delay, - self._connect_pipe, fut, address, delay) - else: - fut.set_exception(exc) - else: - pipe = windows_utils.PipeHandle(handle) - fut.set_result(pipe) - + @coroutine def connect_pipe(self, address): - fut = futures.Future(loop=self._loop) - self._connect_pipe(fut, address, CONNECT_PIPE_INIT_DELAY) - return fut + delay = CONNECT_PIPE_INIT_DELAY + while True: + # Unfortunately there is no way to do an overlapped connect to a pipe. + # Call CreateFile() in a loop until it doesn't fail with + # ERROR_PIPE_BUSY + try: + handle = _overlapped.ConnectPipe(address) + break + except OSError as exc: + if exc.winerror != _overlapped.ERROR_PIPE_BUSY: + raise + + # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later + delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY) + yield from tasks.sleep(delay, loop=self._loop) + + return windows_utils.PipeHandle(handle) def wait_for_handle(self, handle, timeout=None): """Wait for a handle. diff --git a/Lib/asyncio/windows_utils.py b/Lib/asyncio/windows_utils.py index e6642960db..5f8327eba6 100644 --- a/Lib/asyncio/windows_utils.py +++ b/Lib/asyncio/windows_utils.py @@ -147,6 +147,8 @@ class PipeHandle: return self._handle def fileno(self): + if self._handle is None: + raise ValueError("I/O operatioon on closed pipe") return self._handle def close(self, *, CloseHandle=_winapi.CloseHandle): diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py index f9b3dd1567..73d8fcdb18 100644 --- a/Lib/test/test_asyncio/test_windows_events.py +++ b/Lib/test/test_asyncio/test_windows_events.py @@ -1,6 +1,7 @@ import os import sys import unittest +from unittest import mock if sys.platform != 'win32': raise unittest.SkipTest('Windows only') @@ -91,6 +92,18 @@ class ProactorTests(test_utils.TestCase): return 'done' + def test_connect_pipe_cancel(self): + exc = OSError() + exc.winerror = _overlapped.ERROR_PIPE_BUSY + with mock.patch.object(_overlapped, 'ConnectPipe', side_effect=exc) as connect: + coro = self.loop._proactor.connect_pipe('pipe_address') + task = self.loop.create_task(coro) + + # check that it's possible to cancel connect_pipe() + task.cancel() + with self.assertRaises(asyncio.CancelledError): + self.loop.run_until_complete(task) + def test_wait_for_handle(self): event = _overlapped.CreateEvent(None, True, False, None) self.addCleanup(_winapi.CloseHandle, event) |