diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2014-02-01 22:49:59 +0100 |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2014-02-01 22:49:59 +0100 |
commit | 915bcb01110c7db65f8be9139bf887c749fbde75 (patch) | |
tree | fa24b947b19c1479ed581dc817c2e696386f3fb0 /Lib | |
parent | 153d97b24e7253f344860094eb2c98ed93657720 (diff) | |
download | cpython-git-915bcb01110c7db65f8be9139bf887c749fbde75.tar.gz |
Issue #20400: Merge Tulip into Python: add the new asyncio.subprocess module
* Add a new asyncio.subprocess module
* Add new create_subprocess_exec() and create_subprocess_shell() functions
* The new asyncio.subprocess.SubprocessStreamProtocol creates stream readers
for stdout and stderr and a stream writer for stdin.
* The new asyncio.subprocess.Process class offers an API close to the
subprocess.Popen class:
- pid, returncode, stdin, stdout and stderr attributes
- communicate(), wait(), send_signal(), terminate() and kill() methods
* Remove STDIN (0), STDOUT (1) and STDERR (2) constants from base_subprocess
and unix_events, to not be confused with the symbols with the same name of
subprocess and asyncio.subprocess modules
* _ProactorBasePipeTransport.get_write_buffer_size() now counts also the size
of the pending write
* _ProactorBaseWritePipeTransport._loop_writing() may now pause the protocol if
the write buffer size is greater than the high water mark (64 KB by default)
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/asyncio/__init__.py | 2 | ||||
-rw-r--r-- | Lib/asyncio/base_subprocess.py | 23 | ||||
-rw-r--r-- | Lib/asyncio/proactor_events.py | 34 | ||||
-rw-r--r-- | Lib/asyncio/subprocess.py | 197 | ||||
-rw-r--r-- | Lib/asyncio/unix_events.py | 7 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_base_events.py | 15 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_events.py | 8 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_subprocess.py | 196 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_windows_events.py | 2 |
9 files changed, 434 insertions, 50 deletions
diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py index eb22c385b8..3df2f8034d 100644 --- a/Lib/asyncio/__init__.py +++ b/Lib/asyncio/__init__.py @@ -24,6 +24,7 @@ from .locks import * from .protocols import * from .queues import * from .streams import * +from .subprocess import * from .tasks import * from .transports import * @@ -39,5 +40,6 @@ __all__ = (events.__all__ + protocols.__all__ + queues.__all__ + streams.__all__ + + subprocess.__all__ + tasks.__all__ + transports.__all__) diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py index b7cdbceffa..b78f816d4c 100644 --- a/Lib/asyncio/base_subprocess.py +++ b/Lib/asyncio/base_subprocess.py @@ -6,11 +6,6 @@ from . import tasks from . import transports -STDIN = 0 -STDOUT = 1 -STDERR = 2 - - class BaseSubprocessTransport(transports.SubprocessTransport): def __init__(self, loop, protocol, args, shell, @@ -22,11 +17,11 @@ class BaseSubprocessTransport(transports.SubprocessTransport): self._pipes = {} if stdin == subprocess.PIPE: - self._pipes[STDIN] = None + self._pipes[0] = None if stdout == subprocess.PIPE: - self._pipes[STDOUT] = None + self._pipes[1] = None if stderr == subprocess.PIPE: - self._pipes[STDERR] = None + self._pipes[2] = None self._pending_calls = collections.deque() self._finished = False self._returncode = None @@ -76,19 +71,19 @@ class BaseSubprocessTransport(transports.SubprocessTransport): loop = self._loop if proc.stdin is not None: _, pipe = yield from loop.connect_write_pipe( - lambda: WriteSubprocessPipeProto(self, STDIN), + lambda: WriteSubprocessPipeProto(self, 0), proc.stdin) - self._pipes[STDIN] = pipe + self._pipes[0] = pipe if proc.stdout is not None: _, pipe = yield from loop.connect_read_pipe( - lambda: ReadSubprocessPipeProto(self, STDOUT), + lambda: ReadSubprocessPipeProto(self, 1), proc.stdout) - self._pipes[STDOUT] = pipe + self._pipes[1] = pipe if proc.stderr is not None: _, pipe = yield from loop.connect_read_pipe( - lambda: ReadSubprocessPipeProto(self, STDERR), + lambda: ReadSubprocessPipeProto(self, 2), proc.stderr) - self._pipes[STDERR] = pipe + self._pipes[2] = pipe assert self._pending_calls is not None diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index b6b3be2dc6..fb67155786 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -29,6 +29,7 @@ class _ProactorBasePipeTransport(transports.BaseTransport): self._buffer = None # None or bytearray. self._read_fut = None self._write_fut = None + self._pending_write = 0 self._conn_lost = 0 self._closing = False # Set when close() called. self._eof_written = False @@ -68,6 +69,7 @@ class _ProactorBasePipeTransport(transports.BaseTransport): if self._read_fut: self._read_fut.cancel() self._write_fut = self._read_fut = None + self._pending_write = 0 self._buffer = None self._loop.call_soon(self._call_connection_lost, exc) @@ -128,11 +130,10 @@ class _ProactorBasePipeTransport(transports.BaseTransport): self._low_water = low def get_write_buffer_size(self): - # NOTE: This doesn't take into account data already passed to - # send() even if send() hasn't finished yet. - if not self._buffer: - return 0 - return len(self._buffer) + size = self._pending_write + if self._buffer is not None: + size += len(self._buffer) + return size class _ProactorReadPipeTransport(_ProactorBasePipeTransport, @@ -206,7 +207,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport, - transports.WriteTransport): + transports.WriteTransport): """Transport for write pipes.""" def write(self, data): @@ -252,6 +253,7 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport, try: assert f is self._write_fut self._write_fut = None + self._pending_write = 0 if f: f.result() if data is None: @@ -262,15 +264,21 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport, self._loop.call_soon(self._call_connection_lost, None) if self._eof_written: self._sock.shutdown(socket.SHUT_WR) + # Now that we've reduced the buffer size, tell the + # protocol to resume writing if it was paused. Note that + # we do this last since the callback is called immediately + # and it may add more data to the buffer (even causing the + # protocol to be paused again). + self._maybe_resume_protocol() else: self._write_fut = self._loop._proactor.send(self._sock, data) - self._write_fut.add_done_callback(self._loop_writing) - # Now that we've reduced the buffer size, tell the - # protocol to resume writing if it was paused. Note that - # we do this last since the callback is called immediately - # and it may add more data to the buffer (even causing the - # protocol to be paused again). - self._maybe_resume_protocol() + if not self._write_fut.done(): + assert self._pending_write == 0 + self._pending_write = len(data) + self._write_fut.add_done_callback(self._loop_writing) + self._maybe_pause_protocol() + else: + self._write_fut.add_done_callback(self._loop_writing) except ConnectionResetError as exc: self._force_close(exc) except OSError as exc: diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py new file mode 100644 index 0000000000..4312d445e2 --- /dev/null +++ b/Lib/asyncio/subprocess.py @@ -0,0 +1,197 @@ +__all__ = ['create_subprocess_exec', 'create_subprocess_shell'] + +import collections +import subprocess + +from . import events +from . import futures +from . import protocols +from . import streams +from . import tasks + + +PIPE = subprocess.PIPE +STDOUT = subprocess.STDOUT +DEVNULL = subprocess.DEVNULL + + +class SubprocessStreamProtocol(streams.FlowControlMixin, + protocols.SubprocessProtocol): + """Like StreamReaderProtocol, but for a subprocess.""" + + def __init__(self, limit, loop): + super().__init__(loop=loop) + self._limit = limit + self.stdin = self.stdout = self.stderr = None + self.waiter = futures.Future(loop=loop) + self._waiters = collections.deque() + self._transport = None + + def connection_made(self, transport): + self._transport = transport + if transport.get_pipe_transport(1): + self.stdout = streams.StreamReader(limit=self._limit, + loop=self._loop) + if transport.get_pipe_transport(2): + self.stderr = streams.StreamReader(limit=self._limit, + loop=self._loop) + stdin = transport.get_pipe_transport(0) + if stdin is not None: + self.stdin = streams.StreamWriter(stdin, + protocol=self, + reader=None, + loop=self._loop) + self.waiter.set_result(None) + + def pipe_data_received(self, fd, data): + if fd == 1: + reader = self.stdout + elif fd == 2: + reader = self.stderr + else: + reader = None + if reader is not None: + reader.feed_data(data) + + def pipe_connection_lost(self, fd, exc): + if fd == 0: + pipe = self.stdin + if pipe is not None: + pipe.close() + self.connection_lost(exc) + return + if fd == 1: + reader = self.stdout + elif fd == 2: + reader = self.stderr + else: + reader = None + if reader != None: + if exc is None: + reader.feed_eof() + else: + reader.set_exception(exc) + + def process_exited(self): + # wake up futures waiting for wait() + returncode = self._transport.get_returncode() + while self._waiters: + waiter = self._waiters.popleft() + waiter.set_result(returncode) + + +class Process: + def __init__(self, transport, protocol, loop): + self._transport = transport + self._protocol = protocol + self._loop = loop + self.stdin = protocol.stdin + self.stdout = protocol.stdout + self.stderr = protocol.stderr + self.pid = transport.get_pid() + + @property + def returncode(self): + return self._transport.get_returncode() + + @tasks.coroutine + def wait(self): + """Wait until the process exit and return the process return code.""" + returncode = self._transport.get_returncode() + if returncode is not None: + return returncode + + waiter = futures.Future(loop=self._loop) + self._protocol._waiters.append(waiter) + yield from waiter + return waiter.result() + + def get_subprocess(self): + return self._transport.get_extra_info('subprocess') + + def _check_alive(self): + if self._transport.get_returncode() is not None: + raise ProcessLookupError() + + def send_signal(self, signal): + self._check_alive() + self._transport.send_signal(signal) + + def terminate(self): + self._check_alive() + self._transport.terminate() + + def kill(self): + self._check_alive() + self._transport.kill() + + @tasks.coroutine + def _feed_stdin(self, input): + self.stdin.write(input) + yield from self.stdin.drain() + self.stdin.close() + + @tasks.coroutine + def _noop(self): + return None + + @tasks.coroutine + def _read_stream(self, fd): + transport = self._transport.get_pipe_transport(fd) + if fd == 2: + stream = self.stderr + else: + assert fd == 1 + stream = self.stdout + output = yield from stream.read() + transport.close() + return output + + @tasks.coroutine + def communicate(self, input=None): + loop = self._transport._loop + if input: + stdin = self._feed_stdin(input) + else: + stdin = self._noop() + if self.stdout is not None: + stdout = self._read_stream(1) + else: + stdout = self._noop() + if self.stderr is not None: + stderr = self._read_stream(2) + else: + stderr = self._noop() + stdin, stdout, stderr = yield from tasks.gather(stdin, stdout, stderr, + loop=loop) + yield from self.wait() + return (stdout, stderr) + + +@tasks.coroutine +def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, + loop=None, limit=streams._DEFAULT_LIMIT, **kwds): + if loop is None: + loop = events.get_event_loop() + protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, + loop=loop) + transport, protocol = yield from loop.subprocess_shell( + protocol_factory, + cmd, stdin=stdin, stdout=stdout, + stderr=stderr, **kwds) + yield from protocol.waiter + return Process(transport, protocol, loop) + +@tasks.coroutine +def create_subprocess_exec(*args, stdin=None, stdout=None, stderr=None, + loop=None, limit=streams._DEFAULT_LIMIT, **kwds): + if loop is None: + loop = events.get_event_loop() + protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, + loop=loop) + transport, protocol = yield from loop.subprocess_exec( + protocol_factory, + *args, stdin=stdin, stdout=stdout, + stderr=stderr, **kwds) + yield from protocol.waiter + return Process(transport, protocol, loop) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 98fdddeec9..3ce2db8d42 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -21,16 +21,11 @@ from . import transports from .log import logger -__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR', +__all__ = ['SelectorEventLoop', 'AbstractChildWatcher', 'SafeChildWatcher', 'FastChildWatcher', 'DefaultEventLoopPolicy', ] -STDIN = 0 -STDOUT = 1 -STDERR = 2 - - if sys.platform == 'win32': # pragma: no cover raise ImportError('Signals are not really supported on Windows') diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index 1db772331d..72f5c8a0b4 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -116,18 +116,17 @@ class BaseEventLoopTests(unittest.TestCase): self.loop.stop() self.loop._process_events = unittest.mock.Mock() - delay = 0.1 - - when = self.loop.time() + delay + when = self.loop.time() + 0.1 self.loop.call_at(when, cb) t0 = self.loop.time() self.loop.run_forever() dt = self.loop.time() - t0 - - self.assertGreaterEqual(dt, delay - self.loop._granularity, dt) - # tolerate a difference of +800 ms because some Python buildbots - # are really slow - self.assertLessEqual(dt, 0.9, dt) + self.assertTrue(0.09 <= dt <= 0.9, + # Issue #20452: add more info in case of failure, + # to try to investigate the bug + (dt, + self.loop._granularity, + time.get_clock_info('monotonic'))) def test_run_once_in_executor_handle(self): def cb(): diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 3ca35eb2d1..24808cb11c 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -1179,14 +1179,6 @@ class EventLoopTestsMixin: calls.append(self.loop._run_once_counter) self.assertEqual(calls, [1, 3, 5, 6]) - def test_granularity(self): - granularity = self.loop._granularity - self.assertGreater(granularity, 0.0) - # Worst expected granularity: 1 ms on Linux (limited by poll/epoll - # resolution), 15.6 ms on Windows (limited by time.monotonic - # resolution) - self.assertLess(granularity, 0.050) - class SubprocessTestsMixin: diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py new file mode 100644 index 0000000000..4f38cc72c8 --- /dev/null +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -0,0 +1,196 @@ +from asyncio import subprocess +import asyncio +import signal +import sys +import unittest +from test import support +if sys.platform != 'win32': + from asyncio import unix_events + +# Program exiting quickly +PROGRAM_EXIT_FAST = [sys.executable, '-c', 'pass'] + +# Program blocking +PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)'] + +# Program sleeping during 1 second +PROGRAM_SLEEP_1SEC = [sys.executable, '-c', 'import time; time.sleep(1)'] + +# Program copying input to output +PROGRAM_CAT = [ + sys.executable, '-c', + ';'.join(('import sys', + 'data = sys.stdin.buffer.read()', + 'sys.stdout.buffer.write(data)'))] + +class SubprocessMixin: + def test_stdin_stdout(self): + args = PROGRAM_CAT + + @asyncio.coroutine + def run(data): + proc = yield from asyncio.create_subprocess_exec( + *args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + loop=self.loop) + + # feed data + proc.stdin.write(data) + yield from proc.stdin.drain() + proc.stdin.close() + + # get output and exitcode + data = yield from proc.stdout.read() + exitcode = yield from proc.wait() + return (exitcode, data) + + task = run(b'some data') + task = asyncio.wait_for(task, 10.0, loop=self.loop) + exitcode, stdout = self.loop.run_until_complete(task) + self.assertEqual(exitcode, 0) + self.assertEqual(stdout, b'some data') + + def test_communicate(self): + args = PROGRAM_CAT + + @asyncio.coroutine + def run(data): + proc = yield from asyncio.create_subprocess_exec( + *args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + loop=self.loop) + stdout, stderr = yield from proc.communicate(data) + return proc.returncode, stdout + + task = run(b'some data') + task = asyncio.wait_for(task, 10.0, loop=self.loop) + exitcode, stdout = self.loop.run_until_complete(task) + self.assertEqual(exitcode, 0) + self.assertEqual(stdout, b'some data') + + def test_shell(self): + create = asyncio.create_subprocess_shell('exit 7', + loop=self.loop) + proc = self.loop.run_until_complete(create) + exitcode = self.loop.run_until_complete(proc.wait()) + self.assertEqual(exitcode, 7) + + def test_start_new_session(self): + # start the new process in a new session + create = asyncio.create_subprocess_shell('exit 8', + start_new_session=True, + loop=self.loop) + proc = self.loop.run_until_complete(create) + exitcode = self.loop.run_until_complete(proc.wait()) + self.assertEqual(exitcode, 8) + + def test_kill(self): + args = PROGRAM_BLOCKED + create = asyncio.create_subprocess_exec(*args, loop=self.loop) + proc = self.loop.run_until_complete(create) + proc.kill() + returncode = self.loop.run_until_complete(proc.wait()) + if sys.platform == 'win32': + self.assertIsInstance(returncode, int) + # expect 1 but sometimes get 0 + else: + self.assertEqual(-signal.SIGKILL, returncode) + + def test_terminate(self): + args = PROGRAM_BLOCKED + create = asyncio.create_subprocess_exec(*args, loop=self.loop) + proc = self.loop.run_until_complete(create) + proc.terminate() + returncode = self.loop.run_until_complete(proc.wait()) + if sys.platform == 'win32': + self.assertIsInstance(returncode, int) + # expect 1 but sometimes get 0 + else: + self.assertEqual(-signal.SIGTERM, returncode) + + @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP") + def test_send_signal(self): + args = PROGRAM_BLOCKED + create = asyncio.create_subprocess_exec(*args, loop=self.loop) + proc = self.loop.run_until_complete(create) + proc.send_signal(signal.SIGHUP) + returncode = self.loop.run_until_complete(proc.wait()) + self.assertEqual(-signal.SIGHUP, returncode) + + def test_get_subprocess(self): + args = PROGRAM_EXIT_FAST + + @asyncio.coroutine + def run(): + proc = yield from asyncio.create_subprocess_exec(*args, + loop=self.loop) + yield from proc.wait() + + popen = proc.get_subprocess() + popen.wait() + return (proc, popen) + + proc, popen = self.loop.run_until_complete(run()) + self.assertEqual(popen.returncode, proc.returncode) + self.assertEqual(popen.pid, proc.pid) + + def test_broken_pipe(self): + large_data = b'x' * support.PIPE_MAX_SIZE + + create = asyncio.create_subprocess_exec( + *PROGRAM_SLEEP_1SEC, + stdin=subprocess.PIPE, + loop=self.loop) + proc = self.loop.run_until_complete(create) + with self.assertRaises(BrokenPipeError): + self.loop.run_until_complete(proc.communicate(large_data)) + self.loop.run_until_complete(proc.wait()) + + +if sys.platform != 'win32': + # Unix + class SubprocessWatcherMixin(SubprocessMixin): + Watcher = None + + def setUp(self): + policy = asyncio.get_event_loop_policy() + self.loop = policy.new_event_loop() + + # ensure that the event loop is passed explicitly in the code + policy.set_event_loop(None) + + watcher = self.Watcher() + watcher.attach_loop(self.loop) + policy.set_child_watcher(watcher) + + def tearDown(self): + policy = asyncio.get_event_loop_policy() + policy.set_child_watcher(None) + self.loop.close() + policy.set_event_loop(None) + + class SubprocessSafeWatcherTests(SubprocessWatcherMixin, unittest.TestCase): + Watcher = unix_events.SafeChildWatcher + + class SubprocessFastWatcherTests(SubprocessWatcherMixin, unittest.TestCase): + Watcher = unix_events.FastChildWatcher +else: + # Windows + class SubprocessProactorTests(SubprocessMixin, unittest.TestCase): + def setUp(self): + policy = asyncio.get_event_loop_policy() + self.loop = asyncio.ProactorEventLoop() + + # ensure that the event loop is passed explicitly in the code + policy.set_event_loop(None) + + def tearDown(self): + policy = asyncio.get_event_loop_policy() + self.loop.close() + policy.set_event_loop(None) + + +if __name__ == '__main__': + unittest.main() diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py index 846049a245..3c271ebeec 100644 --- a/Lib/test/test_asyncio/test_windows_events.py +++ b/Lib/test/test_asyncio/test_windows_events.py @@ -105,7 +105,7 @@ class ProactorTests(unittest.TestCase): self.loop.run_until_complete(f) elapsed = self.loop.time() - start self.assertFalse(f.result()) - self.assertTrue(0.18 < elapsed < 0.9, elapsed) + self.assertTrue(0.18 < elapsed < 0.5, elapsed) _overlapped.SetEvent(event) |