diff options
Diffstat (limited to 'Lib/asyncio/unix_events.py')
-rw-r--r-- | Lib/asyncio/unix_events.py | 79 |
1 files changed, 51 insertions, 28 deletions
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index a234f4fac1..3807680f8d 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -3,7 +3,6 @@ import collections import errno import fcntl -import functools import os import signal import socket @@ -167,23 +166,29 @@ class SelectorEventLoop(selector_events.BaseSelectorEventLoop): def _sig_chld(self): try: - try: - pid, status = os.waitpid(0, os.WNOHANG) - except ChildProcessError: - return - if pid == 0: - self.call_soon(self._sig_chld) - return - elif os.WIFSIGNALED(status): - returncode = -os.WTERMSIG(status) - elif os.WIFEXITED(status): - returncode = os.WEXITSTATUS(status) - else: - self.call_soon(self._sig_chld) - return - transp = self._subprocesses.get(pid) - if transp is not None: - transp._process_exited(returncode) + # Because of signal coalescing, we must keep calling waitpid() as + # long as we're able to reap a child. + while True: + try: + pid, status = os.waitpid(-1, os.WNOHANG) + except ChildProcessError: + break # No more child processes exist. + if pid == 0: + break # All remaining child processes are still alive. + elif os.WIFSIGNALED(status): + # A child process died because of a signal. + returncode = -os.WTERMSIG(status) + elif os.WIFEXITED(status): + # A child process exited (e.g. sys.exit()). + returncode = os.WEXITSTATUS(status) + else: + # A child exited, but we don't understand its status. + # This shouldn't happen, but if it does, let's just + # return that status; perhaps that helps debug it. + returncode = status + transp = self._subprocesses.get(pid) + if transp is not None: + transp._process_exited(returncode) except Exception: logger.exception('Unknown exception in SIGCHLD handler') @@ -208,6 +213,9 @@ class _UnixReadPipeTransport(transports.ReadTransport): self._loop = loop self._pipe = pipe self._fileno = pipe.fileno() + mode = os.fstat(self._fileno).st_mode + if not (stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode)): + raise ValueError("Pipe transport is for pipes/sockets only.") _set_nonblocking(self._fileno) self._protocol = protocol self._closing = False @@ -270,21 +278,29 @@ class _UnixWritePipeTransport(transports.WriteTransport): self._loop = loop self._pipe = pipe self._fileno = pipe.fileno() - if not stat.S_ISFIFO(os.fstat(self._fileno).st_mode): - raise ValueError("Pipe transport is for pipes only.") + mode = os.fstat(self._fileno).st_mode + is_socket = stat.S_ISSOCK(mode) + is_pipe = stat.S_ISFIFO(mode) + if not (is_socket or is_pipe): + raise ValueError("Pipe transport is for pipes/sockets only.") _set_nonblocking(self._fileno) self._protocol = protocol self._buffer = [] self._conn_lost = 0 self._closing = False # Set when close() or write_eof() called. - self._loop.add_reader(self._fileno, self._read_ready) + + # On AIX, the reader trick only works for sockets. + # On other platforms it works for pipes and sockets. + # (Exception: OS X 10.4? Issue #19294.) + if is_socket or not sys.platform.startswith("aix"): + self._loop.add_reader(self._fileno, self._read_ready) self._loop.call_soon(self._protocol.connection_made, self) if waiter is not None: self._loop.call_soon(waiter.set_result, None) def _read_ready(self): - # pipe was closed by peer + # Pipe was closed by peer. self._close() def write(self, data): @@ -430,8 +446,15 @@ class _UnixSubprocessTransport(transports.SubprocessTransport): self._loop = loop self._pipes = {} + stdin_w = None if stdin == subprocess.PIPE: self._pipes[STDIN] = None + # Use a socket pair for stdin, since not all platforms + # support selecting read events on the write end of a + # socket (which we use in order to detect closing of the + # other end). Notably this is needed on AIX, and works + # just fine on other platforms. + stdin, stdin_w = self._loop._socketpair() if stdout == subprocess.PIPE: self._pipes[STDOUT] = None if stderr == subprocess.PIPE: @@ -443,6 +466,9 @@ class _UnixSubprocessTransport(transports.SubprocessTransport): self._proc = subprocess.Popen( args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, universal_newlines=False, bufsize=bufsize, **kwargs) + if stdin_w is not None: + stdin.close() + self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize) self._extra['subprocess'] = self._proc def close(self): @@ -478,18 +504,15 @@ class _UnixSubprocessTransport(transports.SubprocessTransport): loop = self._loop if proc.stdin is not None: transp, proto = yield from loop.connect_write_pipe( - functools.partial( - _UnixWriteSubprocessPipeProto, self, STDIN), + lambda: _UnixWriteSubprocessPipeProto(self, STDIN), proc.stdin) if proc.stdout is not None: transp, proto = yield from loop.connect_read_pipe( - functools.partial( - _UnixReadSubprocessPipeProto, self, STDOUT), + lambda: _UnixReadSubprocessPipeProto(self, STDOUT), proc.stdout) if proc.stderr is not None: transp, proto = yield from loop.connect_read_pipe( - functools.partial( - _UnixReadSubprocessPipeProto, self, STDERR), + lambda: _UnixReadSubprocessPipeProto(self, STDERR), proc.stderr) if not self._pipes: self._try_connected() |