summaryrefslogtreecommitdiff
path: root/Lib/asyncio
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/asyncio')
-rw-r--r--Lib/asyncio/base_subprocess.py108
-rw-r--r--Lib/asyncio/subprocess.py40
-rw-r--r--Lib/asyncio/unix_events.py15
-rw-r--r--Lib/asyncio/windows_events.py7
4 files changed, 83 insertions, 87 deletions
diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py
index 651a9a291e..001f9b8c24 100644
--- a/Lib/asyncio/base_subprocess.py
+++ b/Lib/asyncio/base_subprocess.py
@@ -3,6 +3,7 @@ import subprocess
import sys
import warnings
+from . import futures
from . import protocols
from . import transports
from .coroutines import coroutine
@@ -13,27 +14,32 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
def __init__(self, loop, protocol, args, shell,
stdin, stdout, stderr, bufsize,
- extra=None, **kwargs):
+ waiter=None, extra=None, **kwargs):
super().__init__(extra)
self._closed = False
self._protocol = protocol
self._loop = loop
+ self._proc = None
self._pid = None
-
+ self._returncode = None
+ self._exit_waiters = []
+ self._pending_calls = collections.deque()
self._pipes = {}
+ self._finished = False
+
if stdin == subprocess.PIPE:
self._pipes[0] = None
if stdout == subprocess.PIPE:
self._pipes[1] = None
if stderr == subprocess.PIPE:
self._pipes[2] = None
- self._pending_calls = collections.deque()
- self._finished = False
- self._returncode = None
+
+ # Create the child process: set the _proc attribute
self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
stderr=stderr, bufsize=bufsize, **kwargs)
self._pid = self._proc.pid
self._extra['subprocess'] = self._proc
+
if self._loop.get_debug():
if isinstance(args, (bytes, str)):
program = args
@@ -42,6 +48,8 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
logger.debug('process %r created: pid %s',
program, self._pid)
+ self._loop.create_task(self._connect_pipes(waiter))
+
def __repr__(self):
info = [self.__class__.__name__]
if self._closed:
@@ -77,12 +85,23 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
def close(self):
self._closed = True
+
for proto in self._pipes.values():
if proto is None:
continue
proto.pipe.close()
- if self._returncode is None:
- self.terminate()
+
+ if self._proc is not None and self._returncode is None:
+ if self._loop.get_debug():
+ logger.warning('Close running child process: kill %r', self)
+
+ try:
+ self._proc.kill()
+ except ProcessLookupError:
+ pass
+
+ # Don't clear the _proc reference yet because _post_init() may
+ # still run
# On Python 3.3 and older, objects with a destructor part of a reference
# cycle are never destroyed. It's not more the case on Python 3.4 thanks
@@ -105,59 +124,42 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
else:
return None
+ def _check_proc(self):
+ if self._closed:
+ raise ValueError("operation on closed transport")
+ if self._proc is None:
+ raise ProcessLookupError()
+
def send_signal(self, signal):
+ self._check_proc()
self._proc.send_signal(signal)
def terminate(self):
+ self._check_proc()
self._proc.terminate()
def kill(self):
+ self._check_proc()
self._proc.kill()
- def _kill_wait(self):
- """Close pipes, kill the subprocess and read its return status.
-
- Function called when an exception is raised during the creation
- of a subprocess.
- """
- self._closed = True
- if self._loop.get_debug():
- logger.warning('Exception during subprocess creation, '
- 'kill the subprocess %r',
- self,
- exc_info=True)
-
- proc = self._proc
- if proc.stdout:
- proc.stdout.close()
- if proc.stderr:
- proc.stderr.close()
- if proc.stdin:
- proc.stdin.close()
-
- try:
- proc.kill()
- except ProcessLookupError:
- pass
- self._returncode = proc.wait()
-
- self.close()
-
@coroutine
- def _post_init(self):
+ def _connect_pipes(self, waiter):
try:
proc = self._proc
loop = self._loop
+
if proc.stdin is not None:
_, pipe = yield from loop.connect_write_pipe(
lambda: WriteSubprocessPipeProto(self, 0),
proc.stdin)
self._pipes[0] = pipe
+
if proc.stdout is not None:
_, pipe = yield from loop.connect_read_pipe(
lambda: ReadSubprocessPipeProto(self, 1),
proc.stdout)
self._pipes[1] = pipe
+
if proc.stderr is not None:
_, pipe = yield from loop.connect_read_pipe(
lambda: ReadSubprocessPipeProto(self, 2),
@@ -166,13 +168,16 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
assert self._pending_calls is not None
- self._loop.call_soon(self._protocol.connection_made, self)
+ loop.call_soon(self._protocol.connection_made, self)
for callback, data in self._pending_calls:
- self._loop.call_soon(callback, *data)
+ loop.call_soon(callback, *data)
self._pending_calls = None
- except:
- self._kill_wait()
- raise
+ except Exception as exc:
+ if waiter is not None and not waiter.cancelled():
+ waiter.set_exception(exc)
+ else:
+ if waiter is not None and not waiter.cancelled():
+ waiter.set_result(None)
def _call(self, cb, *data):
if self._pending_calls is not None:
@@ -197,6 +202,23 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
self._call(self._protocol.process_exited)
self._try_finish()
+ # wake up futures waiting for wait()
+ for waiter in self._exit_waiters:
+ if not waiter.cancelled():
+ waiter.set_result(returncode)
+ self._exit_waiters = None
+
+ def wait(self):
+ """Wait until the process exit and return the process return code.
+
+ This method is a coroutine."""
+ if self._returncode is not None:
+ return self._returncode
+
+ waiter = futures.Future(loop=self._loop)
+ self._exit_waiters.append(waiter)
+ return (yield from waiter)
+
def _try_finish(self):
assert not self._finished
if self._returncode is None:
@@ -210,9 +232,9 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
try:
self._protocol.connection_lost(exc)
finally:
+ self._loop = None
self._proc = None
self._protocol = None
- self._loop = None
class WriteSubprocessPipeProto(protocols.BaseProtocol):
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py
index c848a21a8f..d0c9779c1c 100644
--- a/Lib/asyncio/subprocess.py
+++ b/Lib/asyncio/subprocess.py
@@ -25,8 +25,6 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
super().__init__(loop=loop)
self._limit = limit
self.stdin = self.stdout = self.stderr = None
- self.waiter = futures.Future(loop=loop)
- self._waiters = collections.deque()
self._transport = None
def __repr__(self):
@@ -61,9 +59,6 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
reader=None,
loop=self._loop)
- if not self.waiter.cancelled():
- self.waiter.set_result(None)
-
def pipe_data_received(self, fd, data):
if fd == 1:
reader = self.stdout
@@ -94,16 +89,9 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
reader.set_exception(exc)
def process_exited(self):
- returncode = self._transport.get_returncode()
self._transport.close()
self._transport = None
- # wake up futures waiting for wait()
- while self._waiters:
- waiter = self._waiters.popleft()
- if not waiter.cancelled():
- waiter.set_result(returncode)
-
class Process:
def __init__(self, transport, protocol, loop):
@@ -124,30 +112,18 @@ class Process:
@coroutine
def wait(self):
- """Wait until the process exit and return the process return code."""
- returncode = self._transport.get_returncode()
- if returncode is not None:
- return returncode
-
- waiter = futures.Future(loop=self._loop)
- self._protocol._waiters.append(waiter)
- yield from waiter
- return waiter.result()
+ """Wait until the process exit and return the process return code.
- def _check_alive(self):
- if self._transport.get_returncode() is not None:
- raise ProcessLookupError()
+ This method is a coroutine."""
+ return (yield from self._transport.wait())
def send_signal(self, signal):
- self._check_alive()
self._transport.send_signal(signal)
def terminate(self):
- self._check_alive()
self._transport.terminate()
def kill(self):
- self._check_alive()
self._transport.kill()
@coroutine
@@ -221,11 +197,6 @@ def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
protocol_factory,
cmd, stdin=stdin, stdout=stdout,
stderr=stderr, **kwds)
- try:
- yield from protocol.waiter
- except:
- transport._kill_wait()
- raise
return Process(transport, protocol, loop)
@coroutine
@@ -241,9 +212,4 @@ def create_subprocess_exec(program, *args, stdin=None, stdout=None,
program, *args,
stdin=stdin, stdout=stdout,
stderr=stderr, **kwds)
- try:
- yield from protocol.waiter
- except:
- transport._kill_wait()
- raise
return Process(transport, protocol, loop)
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index b06f1b2330..3ecdfd2e0b 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -16,6 +16,7 @@ from . import base_subprocess
from . import constants
from . import coroutines
from . import events
+from . import futures
from . import selector_events
from . import selectors
from . import transports
@@ -175,16 +176,20 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
with events.get_child_watcher() as watcher:
+ waiter = futures.Future(loop=self)
transp = _UnixSubprocessTransport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
- extra=extra, **kwargs)
+ waiter=waiter, extra=extra,
+ **kwargs)
+
+ watcher.add_child_handler(transp.get_pid(),
+ self._child_watcher_callback, transp)
try:
- yield from transp._post_init()
+ yield from waiter
except:
transp.close()
+ yield from transp.wait()
raise
- watcher.add_child_handler(transp.get_pid(),
- self._child_watcher_callback, transp)
return transp
@@ -774,7 +779,7 @@ class SafeChildWatcher(BaseChildWatcher):
pass
def add_child_handler(self, pid, callback, *args):
- self._callbacks[pid] = callback, args
+ self._callbacks[pid] = (callback, args)
# Prevent a race condition in case the child is already terminated.
self._do_waitpid(pid)
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
index 94aafb6f5a..437eb0ac9d 100644
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -366,13 +366,16 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
+ waiter = futures.Future(loop=self)
transp = _WindowsSubprocessTransport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
- extra=extra, **kwargs)
+ waiter=waiter, extra=extra,
+ **kwargs)
try:
- yield from transp._post_init()
+ yield from waiter
except:
transp.close()
+ yield from transp.wait()
raise
return transp