summaryrefslogtreecommitdiff
path: root/Lib/asyncio/unix_events.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/asyncio/unix_events.py')
-rw-r--r--Lib/asyncio/unix_events.py79
1 files changed, 51 insertions, 28 deletions
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index a234f4fac1..3807680f8d 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -3,7 +3,6 @@
import collections
import errno
import fcntl
-import functools
import os
import signal
import socket
@@ -167,23 +166,29 @@ class SelectorEventLoop(selector_events.BaseSelectorEventLoop):
def _sig_chld(self):
try:
- try:
- pid, status = os.waitpid(0, os.WNOHANG)
- except ChildProcessError:
- return
- if pid == 0:
- self.call_soon(self._sig_chld)
- return
- elif os.WIFSIGNALED(status):
- returncode = -os.WTERMSIG(status)
- elif os.WIFEXITED(status):
- returncode = os.WEXITSTATUS(status)
- else:
- self.call_soon(self._sig_chld)
- return
- transp = self._subprocesses.get(pid)
- if transp is not None:
- transp._process_exited(returncode)
+ # Because of signal coalescing, we must keep calling waitpid() as
+ # long as we're able to reap a child.
+ while True:
+ try:
+ pid, status = os.waitpid(-1, os.WNOHANG)
+ except ChildProcessError:
+ break # No more child processes exist.
+ if pid == 0:
+ break # All remaining child processes are still alive.
+ elif os.WIFSIGNALED(status):
+ # A child process died because of a signal.
+ returncode = -os.WTERMSIG(status)
+ elif os.WIFEXITED(status):
+ # A child process exited (e.g. sys.exit()).
+ returncode = os.WEXITSTATUS(status)
+ else:
+ # A child exited, but we don't understand its status.
+ # This shouldn't happen, but if it does, let's just
+ # return that status; perhaps that helps debug it.
+ returncode = status
+ transp = self._subprocesses.get(pid)
+ if transp is not None:
+ transp._process_exited(returncode)
except Exception:
logger.exception('Unknown exception in SIGCHLD handler')
@@ -208,6 +213,9 @@ class _UnixReadPipeTransport(transports.ReadTransport):
self._loop = loop
self._pipe = pipe
self._fileno = pipe.fileno()
+ mode = os.fstat(self._fileno).st_mode
+ if not (stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode)):
+ raise ValueError("Pipe transport is for pipes/sockets only.")
_set_nonblocking(self._fileno)
self._protocol = protocol
self._closing = False
@@ -270,21 +278,29 @@ class _UnixWritePipeTransport(transports.WriteTransport):
self._loop = loop
self._pipe = pipe
self._fileno = pipe.fileno()
- if not stat.S_ISFIFO(os.fstat(self._fileno).st_mode):
- raise ValueError("Pipe transport is for pipes only.")
+ mode = os.fstat(self._fileno).st_mode
+ is_socket = stat.S_ISSOCK(mode)
+ is_pipe = stat.S_ISFIFO(mode)
+ if not (is_socket or is_pipe):
+ raise ValueError("Pipe transport is for pipes/sockets only.")
_set_nonblocking(self._fileno)
self._protocol = protocol
self._buffer = []
self._conn_lost = 0
self._closing = False # Set when close() or write_eof() called.
- self._loop.add_reader(self._fileno, self._read_ready)
+
+ # On AIX, the reader trick only works for sockets.
+ # On other platforms it works for pipes and sockets.
+ # (Exception: OS X 10.4? Issue #19294.)
+ if is_socket or not sys.platform.startswith("aix"):
+ self._loop.add_reader(self._fileno, self._read_ready)
self._loop.call_soon(self._protocol.connection_made, self)
if waiter is not None:
self._loop.call_soon(waiter.set_result, None)
def _read_ready(self):
- # pipe was closed by peer
+ # Pipe was closed by peer.
self._close()
def write(self, data):
@@ -430,8 +446,15 @@ class _UnixSubprocessTransport(transports.SubprocessTransport):
self._loop = loop
self._pipes = {}
+ stdin_w = None
if stdin == subprocess.PIPE:
self._pipes[STDIN] = None
+ # Use a socket pair for stdin, since not all platforms
+ # support selecting read events on the write end of a
+ # socket (which we use in order to detect closing of the
+ # other end). Notably this is needed on AIX, and works
+ # just fine on other platforms.
+ stdin, stdin_w = self._loop._socketpair()
if stdout == subprocess.PIPE:
self._pipes[STDOUT] = None
if stderr == subprocess.PIPE:
@@ -443,6 +466,9 @@ class _UnixSubprocessTransport(transports.SubprocessTransport):
self._proc = subprocess.Popen(
args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
universal_newlines=False, bufsize=bufsize, **kwargs)
+ if stdin_w is not None:
+ stdin.close()
+ self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize)
self._extra['subprocess'] = self._proc
def close(self):
@@ -478,18 +504,15 @@ class _UnixSubprocessTransport(transports.SubprocessTransport):
loop = self._loop
if proc.stdin is not None:
transp, proto = yield from loop.connect_write_pipe(
- functools.partial(
- _UnixWriteSubprocessPipeProto, self, STDIN),
+ lambda: _UnixWriteSubprocessPipeProto(self, STDIN),
proc.stdin)
if proc.stdout is not None:
transp, proto = yield from loop.connect_read_pipe(
- functools.partial(
- _UnixReadSubprocessPipeProto, self, STDOUT),
+ lambda: _UnixReadSubprocessPipeProto(self, STDOUT),
proc.stdout)
if proc.stderr is not None:
transp, proto = yield from loop.connect_read_pipe(
- functools.partial(
- _UnixReadSubprocessPipeProto, self, STDERR),
+ lambda: _UnixReadSubprocessPipeProto(self, STDERR),
proc.stderr)
if not self._pipes:
self._try_connected()