summaryrefslogtreecommitdiff
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/test_asyncio/test_events.py35
-rw-r--r--Lib/test/test_asyncio/test_subprocess.py65
2 files changed, 83 insertions, 17 deletions
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
index 12af62b209..4b957d8f63 100644
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -1551,9 +1551,10 @@ class SubprocessTestsMixin:
stdin = transp.get_pipe_transport(0)
stdin.write(b'Python The Winner')
self.loop.run_until_complete(proto.got_data[1].wait())
- transp.close()
+ with test_utils.disable_logger():
+ transp.close()
self.loop.run_until_complete(proto.completed)
- self.check_terminated(proto.returncode)
+ self.check_killed(proto.returncode)
self.assertEqual(b'Python The Winner', proto.data[1])
def test_subprocess_interactive(self):
@@ -1567,21 +1568,20 @@ class SubprocessTestsMixin:
self.loop.run_until_complete(proto.connected)
self.assertEqual('CONNECTED', proto.state)
- try:
- stdin = transp.get_pipe_transport(0)
- stdin.write(b'Python ')
- self.loop.run_until_complete(proto.got_data[1].wait())
- proto.got_data[1].clear()
- self.assertEqual(b'Python ', proto.data[1])
-
- stdin.write(b'The Winner')
- self.loop.run_until_complete(proto.got_data[1].wait())
- self.assertEqual(b'Python The Winner', proto.data[1])
- finally:
- transp.close()
+ stdin = transp.get_pipe_transport(0)
+ stdin.write(b'Python ')
+ self.loop.run_until_complete(proto.got_data[1].wait())
+ proto.got_data[1].clear()
+ self.assertEqual(b'Python ', proto.data[1])
+ stdin.write(b'The Winner')
+ self.loop.run_until_complete(proto.got_data[1].wait())
+ self.assertEqual(b'Python The Winner', proto.data[1])
+
+ with test_utils.disable_logger():
+ transp.close()
self.loop.run_until_complete(proto.completed)
- self.check_terminated(proto.returncode)
+ self.check_killed(proto.returncode)
def test_subprocess_shell(self):
connect = self.loop.subprocess_shell(
@@ -1739,9 +1739,10 @@ class SubprocessTestsMixin:
# GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using
# WriteFile() we get ERROR_BROKEN_PIPE as expected.)
self.assertEqual(b'ERR:OSError', proto.data[2])
- transp.close()
+ with test_utils.disable_logger():
+ transp.close()
self.loop.run_until_complete(proto.completed)
- self.check_terminated(proto.returncode)
+ self.check_killed(proto.returncode)
def test_subprocess_wait_no_same_group(self):
# start the new process in a new session
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py
index ecc2c9d8a8..4f197f394a 100644
--- a/Lib/test/test_asyncio/test_subprocess.py
+++ b/Lib/test/test_asyncio/test_subprocess.py
@@ -4,6 +4,7 @@ import unittest
from unittest import mock
import asyncio
+from asyncio import base_subprocess
from asyncio import subprocess
from asyncio import test_utils
try:
@@ -23,6 +24,70 @@ PROGRAM_CAT = [
'data = sys.stdin.buffer.read()',
'sys.stdout.buffer.write(data)'))]
+class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport):
+ def _start(self, *args, **kwargs):
+ self._proc = mock.Mock()
+ self._proc.stdin = None
+ self._proc.stdout = None
+ self._proc.stderr = None
+
+
+class SubprocessTransportTests(test_utils.TestCase):
+ def setUp(self):
+ self.loop = self.new_test_loop()
+ self.set_event_loop(self.loop)
+
+
+ def create_transport(self, waiter=None):
+ protocol = mock.Mock()
+ protocol.connection_made._is_coroutine = False
+ protocol.process_exited._is_coroutine = False
+ transport = TestSubprocessTransport(
+ self.loop, protocol, ['test'], False,
+ None, None, None, 0, waiter=waiter)
+ return (transport, protocol)
+
+ def test_close(self):
+ waiter = asyncio.Future(loop=self.loop)
+ transport, protocol = self.create_transport(waiter)
+ transport._process_exited(0)
+ transport.close()
+
+ # The loop didn't run yet
+ self.assertFalse(protocol.connection_made.called)
+
+ # methods must raise ProcessLookupError if the transport was closed
+ self.assertRaises(ValueError, transport.send_signal, signal.SIGTERM)
+ self.assertRaises(ValueError, transport.terminate)
+ self.assertRaises(ValueError, transport.kill)
+
+ self.loop.run_until_complete(waiter)
+
+ def test_proc_exited(self):
+ waiter = asyncio.Future(loop=self.loop)
+ transport, protocol = self.create_transport(waiter)
+ transport._process_exited(6)
+ self.loop.run_until_complete(waiter)
+
+ self.assertEqual(transport.get_returncode(), 6)
+
+ self.assertTrue(protocol.connection_made.called)
+ self.assertTrue(protocol.process_exited.called)
+ self.assertTrue(protocol.connection_lost.called)
+ self.assertEqual(protocol.connection_lost.call_args[0], (None,))
+
+ self.assertFalse(transport._closed)
+ self.assertIsNone(transport._loop)
+ self.assertIsNone(transport._proc)
+ self.assertIsNone(transport._protocol)
+
+ # methods must raise ProcessLookupError if the process exited
+ self.assertRaises(ProcessLookupError,
+ transport.send_signal, signal.SIGTERM)
+ self.assertRaises(ProcessLookupError, transport.terminate)
+ self.assertRaises(ProcessLookupError, transport.kill)
+
+
class SubprocessMixin:
def test_stdin_stdout(self):