diff options
author | Larry Hastings <larry@hastings.org> | 2014-05-17 21:05:10 -0700 |
---|---|---|
committer | Larry Hastings <larry@hastings.org> | 2014-05-17 21:05:10 -0700 |
commit | 3a260d228b32b04a88d947b887bf81759e8e5f10 (patch) | |
tree | e4a34e9cbbf877cf021ffba743ddbf54e17526b7 /Lib/test | |
parent | 2110603344316d927e6d639275c12f5da78601d5 (diff) | |
parent | b1a1ec3151155a1ae65831793b4a5b7a87d9d09f (diff) | |
download | cpython-git-3a260d228b32b04a88d947b887bf81759e8e5f10.tar.gz |
Merge.
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/script_helper.py | 12 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_selector_events.py | 5 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_streams.py | 39 | ||||
-rw-r--r-- | Lib/test/test_cmd_line_script.py | 49 | ||||
-rw-r--r-- | Lib/test/test_code_module.py | 2 | ||||
-rw-r--r-- | Lib/test/test_faulthandler.py | 25 | ||||
-rw-r--r-- | Lib/test/test_fileinput.py | 10 | ||||
-rw-r--r-- | Lib/test/test_gc.py | 32 | ||||
-rw-r--r-- | Lib/test/test_importlib/test_api.py | 15 | ||||
-rw-r--r-- | Lib/test/test_io.py | 32 | ||||
-rw-r--r-- | Lib/test/test_long.py | 7 | ||||
-rw-r--r-- | Lib/test/test_re.py | 5 | ||||
-rw-r--r-- | Lib/test/test_signal.py | 2 | ||||
-rw-r--r-- | Lib/test/test_subprocess.py | 1 |
14 files changed, 222 insertions, 14 deletions
diff --git a/Lib/test/script_helper.py b/Lib/test/script_helper.py index af0545bac2..555934966a 100644 --- a/Lib/test/script_helper.py +++ b/Lib/test/script_helper.py @@ -78,7 +78,7 @@ def assert_python_failure(*args, **env_vars): """ return _assert_python(False, *args, **env_vars) -def spawn_python(*args, **kw): +def spawn_python(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw): """Run a Python subprocess with the given arguments. kw is extra keyword args to pass to subprocess.Popen. Returns a Popen @@ -86,8 +86,16 @@ def spawn_python(*args, **kw): """ cmd_line = [sys.executable, '-E'] cmd_line.extend(args) + # Under Fedora (?), GNU readline can output junk on stderr when initialized, + # depending on the TERM setting. Setting TERM=vt100 is supposed to disable + # that. References: + # - http://reinout.vanrees.org/weblog/2009/08/14/readline-invisible-character-hack.html + # - http://stackoverflow.com/questions/15760712/python-readline-module-prints-escape-character-during-import + # - http://lists.gnu.org/archive/html/bug-readline/2007-08/msg00004.html + env = kw.setdefault('env', dict(os.environ)) + env['TERM'] = 'vt100' return subprocess.Popen(cmd_line, stdin=subprocess.PIPE, - stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + stdout=stdout, stderr=stderr, **kw) def kill_python(p): diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index 964b2e8ec8..0735237cca 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -121,8 +121,9 @@ class BaseSelectorEventLoopTests(unittest.TestCase): self.assertIsNone(self.loop._write_to_self()) def test_write_to_self_exception(self): - self.loop._csock.send.side_effect = OSError() - self.assertRaises(OSError, self.loop._write_to_self) + # _write_to_self() swallows OSError + self.loop._csock.send.side_effect = RuntimeError() + self.assertRaises(RuntimeError, self.loop._write_to_self) def test_sock_recv(self): sock = mock.Mock() diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py index 031499e814..1ecc8eb1fa 100644 --- a/Lib/test/test_asyncio/test_streams.py +++ b/Lib/test/test_asyncio/test_streams.py @@ -1,7 +1,9 @@ """Tests for streams.py.""" import gc +import os import socket +import sys import unittest from unittest import mock try: @@ -583,6 +585,43 @@ class StreamReaderTests(unittest.TestCase): server.stop() self.assertEqual(msg, b"hello world!\n") + @unittest.skipIf(sys.platform == 'win32', "Don't have pipes") + def test_read_all_from_pipe_reader(self): + # See Tulip issue 168. This test is derived from the example + # subprocess_attach_read_pipe.py, but we configure the + # StreamReader's limit so that twice it is less than the size + # of the data writter. Also we must explicitly attach a child + # watcher to the event loop. + + code = """\ +import os, sys +fd = int(sys.argv[1]) +os.write(fd, b'data') +os.close(fd) +""" + rfd, wfd = os.pipe() + args = [sys.executable, '-c', code, str(wfd)] + + pipe = open(rfd, 'rb', 0) + reader = asyncio.StreamReader(loop=self.loop, limit=1) + protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop) + transport, _ = self.loop.run_until_complete( + self.loop.connect_read_pipe(lambda: protocol, pipe)) + + watcher = asyncio.SafeChildWatcher() + watcher.attach_loop(self.loop) + try: + asyncio.set_child_watcher(watcher) + proc = self.loop.run_until_complete( + asyncio.create_subprocess_exec(*args, pass_fds={wfd}, loop=self.loop)) + self.loop.run_until_complete(proc.wait()) + finally: + asyncio.set_child_watcher(None) + + os.close(wfd) + data = self.loop.run_until_complete(reader.read(-1)) + self.assertEqual(data, b'data') + if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_cmd_line_script.py b/Lib/test/test_cmd_line_script.py index 1e6746d434..88a9e2bbb5 100644 --- a/Lib/test/test_cmd_line_script.py +++ b/Lib/test/test_cmd_line_script.py @@ -1,5 +1,6 @@ # tests command line execution of scripts +import contextlib import importlib import importlib.machinery import zipimport @@ -8,6 +9,7 @@ import sys import os import os.path import py_compile +import subprocess import textwrap from test import support @@ -173,6 +175,53 @@ class CmdLineTest(unittest.TestCase): expected = repr(importlib.machinery.BuiltinImporter).encode("utf-8") self.assertIn(expected, out) + @contextlib.contextmanager + def interactive_python(self, separate_stderr=False): + if separate_stderr: + p = spawn_python('-i', bufsize=1, stderr=subprocess.PIPE) + stderr = p.stderr + else: + p = spawn_python('-i', bufsize=1, stderr=subprocess.STDOUT) + stderr = p.stdout + try: + # Drain stderr until prompt + while True: + data = stderr.read(4) + if data == b">>> ": + break + stderr.readline() + yield p + finally: + kill_python(p) + stderr.close() + + def check_repl_stdout_flush(self, separate_stderr=False): + with self.interactive_python(separate_stderr) as p: + p.stdin.write(b"print('foo')\n") + p.stdin.flush() + self.assertEqual(b'foo', p.stdout.readline().strip()) + + def check_repl_stderr_flush(self, separate_stderr=False): + with self.interactive_python(separate_stderr) as p: + p.stdin.write(b"1/0\n") + p.stdin.flush() + stderr = p.stderr if separate_stderr else p.stdout + self.assertIn(b'Traceback ', stderr.readline()) + self.assertIn(b'File "<stdin>"', stderr.readline()) + self.assertIn(b'ZeroDivisionError', stderr.readline()) + + def test_repl_stdout_flush(self): + self.check_repl_stdout_flush() + + def test_repl_stdout_flush_separate_stderr(self): + self.check_repl_stdout_flush(True) + + def test_repl_stderr_flush(self): + self.check_repl_stderr_flush() + + def test_repl_stderr_flush_separate_stderr(self): + self.check_repl_stderr_flush(True) + def test_basic_script(self): with temp_dir() as script_dir: script_name = _make_test_script(script_dir, 'script') diff --git a/Lib/test/test_code_module.py b/Lib/test/test_code_module.py index 5fd21dc32c..7a80a808b1 100644 --- a/Lib/test/test_code_module.py +++ b/Lib/test/test_code_module.py @@ -51,7 +51,7 @@ class TestInteractiveConsole(unittest.TestCase): self.infunc.side_effect = ["undefined", EOFError('Finished')] self.console.interact() for call in self.stderr.method_calls: - if 'NameError:' in ''.join(call[1]): + if 'NameError' in ''.join(call[1]): break else: raise AssertionError("No syntax error from console") diff --git a/Lib/test/test_faulthandler.py b/Lib/test/test_faulthandler.py index ebd99edc8f..b0fc279706 100644 --- a/Lib/test/test_faulthandler.py +++ b/Lib/test/test_faulthandler.py @@ -591,6 +591,31 @@ sys.exit(exitcode) def test_register_chain(self): self.check_register(chain=True) + @contextmanager + def check_stderr_none(self): + stderr = sys.stderr + try: + sys.stderr = None + with self.assertRaises(RuntimeError) as cm: + yield + self.assertEqual(str(cm.exception), "sys.stderr is None") + finally: + sys.stderr = stderr + + def test_stderr_None(self): + # Issue #21497: provide an helpful error if sys.stderr is None, + # instead of just an attribute error: "None has no attribute fileno". + with self.check_stderr_none(): + faulthandler.enable() + with self.check_stderr_none(): + faulthandler.dump_traceback() + if hasattr(faulthandler, 'dump_traceback_later'): + with self.check_stderr_none(): + faulthandler.dump_traceback_later(1e-3) + if hasattr(faulthandler, "register"): + with self.check_stderr_none(): + faulthandler.register(signal.SIGUSR1) + if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_fileinput.py b/Lib/test/test_fileinput.py index eba55c99a4..1d089f52b8 100644 --- a/Lib/test/test_fileinput.py +++ b/Lib/test/test_fileinput.py @@ -19,11 +19,12 @@ try: except ImportError: gzip = None -from io import StringIO +from io import BytesIO, StringIO from fileinput import FileInput, hook_encoded from test.support import verbose, TESTFN, run_unittest, check_warnings from test.support import unlink as safe_unlink +from unittest import mock # The fileinput module has 2 interfaces: the FileInput class which does @@ -232,6 +233,13 @@ class FileInputTests(unittest.TestCase): finally: remove_tempfiles(t1) + def test_stdin_binary_mode(self): + with mock.patch('sys.stdin') as m_stdin: + m_stdin.buffer = BytesIO(b'spam, bacon, sausage, and spam') + fi = FileInput(files=['-'], mode='rb') + lines = list(fi) + self.assertEqual(lines, [b'spam, bacon, sausage, and spam']) + def test_file_opening_hook(self): try: # cannot use openhook and inplace mode diff --git a/Lib/test/test_gc.py b/Lib/test/test_gc.py index 7eb104a6b5..c0be53795f 100644 --- a/Lib/test/test_gc.py +++ b/Lib/test/test_gc.py @@ -580,6 +580,38 @@ class GCTests(unittest.TestCase): # would be damaged, with an empty __dict__. self.assertEqual(x, None) + def test_bug21435(self): + # This is a poor test - its only virtue is that it happened to + # segfault on Tim's Windows box before the patch for 21435 was + # applied. That's a nasty bug relying on specific pieces of cyclic + # trash appearing in exactly the right order in finalize_garbage()'s + # input list. + # But there's no reliable way to force that order from Python code, + # so over time chances are good this test won't really be testing much + # of anything anymore. Still, if it blows up, there's _some_ + # problem ;-) + gc.collect() + + class A: + pass + + class B: + def __init__(self, x): + self.x = x + + def __del__(self): + self.attr = None + + def do_work(): + a = A() + b = B(A()) + + a.attr = b + b.attr = a + + do_work() + gc.collect() # this blows up (bad C pointer) when it fails + @cpython_only def test_garbage_at_shutdown(self): import subprocess diff --git a/Lib/test/test_importlib/test_api.py b/Lib/test/test_importlib/test_api.py index 744001b4c0..2a2d42bbfe 100644 --- a/Lib/test/test_importlib/test_api.py +++ b/Lib/test/test_importlib/test_api.py @@ -241,13 +241,13 @@ class ReloadTests: '__file__': path, '__cached__': cached, '__doc__': None, - '__builtins__': __builtins__, } support.create_empty_file(path) module = self.init.import_module(name) - ns = vars(module) + ns = vars(module).copy() loader = ns.pop('__loader__') spec = ns.pop('__spec__') + ns.pop('__builtins__', None) # An implementation detail. self.assertEqual(spec.name, name) self.assertEqual(spec.loader, loader) self.assertEqual(loader.path, path) @@ -263,14 +263,14 @@ class ReloadTests: '__cached__': cached, '__path__': [os.path.dirname(init_path)], '__doc__': None, - '__builtins__': __builtins__, } os.mkdir(name) os.rename(path, init_path) reloaded = self.init.reload(module) - ns = vars(reloaded) + ns = vars(reloaded).copy() loader = ns.pop('__loader__') spec = ns.pop('__spec__') + ns.pop('__builtins__', None) # An implementation detail. self.assertEqual(spec.name, name) self.assertEqual(spec.loader, loader) self.assertIs(reloaded, module) @@ -295,10 +295,11 @@ class ReloadTests: with open(bad_path, 'w') as init_file: init_file.write('eggs = None') module = self.init.import_module(name) - ns = vars(module) + ns = vars(module).copy() loader = ns.pop('__loader__') path = ns.pop('__path__') spec = ns.pop('__spec__') + ns.pop('__builtins__', None) # An implementation detail. self.assertEqual(spec.name, name) self.assertIs(spec.loader, None) self.assertIsNot(loader, None) @@ -319,14 +320,14 @@ class ReloadTests: '__cached__': cached, '__path__': [os.path.dirname(init_path)], '__doc__': None, - '__builtins__': __builtins__, 'eggs': None, } os.rename(bad_path, init_path) reloaded = self.init.reload(module) - ns = vars(reloaded) + ns = vars(reloaded).copy() loader = ns.pop('__loader__') spec = ns.pop('__spec__') + ns.pop('__builtins__', None) # An implementation detail. self.assertEqual(spec.name, name) self.assertEqual(spec.loader, loader) self.assertIs(reloaded, module) diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 267537fdeb..ef1e05622b 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -2615,6 +2615,38 @@ class TextIOWrapperTest(unittest.TestCase): txt.write('5') self.assertEqual(b''.join(raw._write_stack), b'123\n45') + def test_bufio_write_through(self): + # Issue #21396: write_through=True doesn't force a flush() + # on the underlying binary buffered object. + flush_called, write_called = [], [] + class BufferedWriter(self.BufferedWriter): + def flush(self, *args, **kwargs): + flush_called.append(True) + return super().flush(*args, **kwargs) + def write(self, *args, **kwargs): + write_called.append(True) + return super().write(*args, **kwargs) + + rawio = self.BytesIO() + data = b"a" + bufio = BufferedWriter(rawio, len(data)*2) + textio = self.TextIOWrapper(bufio, encoding='ascii', + write_through=True) + # write to the buffered io but don't overflow the buffer + text = data.decode('ascii') + textio.write(text) + + # buffer.flush is not called with write_through=True + self.assertFalse(flush_called) + # buffer.write *is* called with write_through=True + self.assertTrue(write_called) + self.assertEqual(rawio.getvalue(), b"") # no flush + + write_called = [] # reset + textio.write(text * 10) # total content is larger than bufio buffer + self.assertTrue(write_called) + self.assertEqual(rawio.getvalue(), data * 11) # all flushed + def test_read_nonbytes(self): # Issue #17106 # Crash when underlying read() returns non-bytes diff --git a/Lib/test/test_long.py b/Lib/test/test_long.py index 13152ecf12..5f14795649 100644 --- a/Lib/test/test_long.py +++ b/Lib/test/test_long.py @@ -1235,6 +1235,13 @@ class LongTest(unittest.TestCase): for n in map(int, integers): self.assertEqual(n, 0) + def test_shift_bool(self): + # Issue #21422: ensure that bool << int and bool >> int return int + for value in (True, False): + for shift in (0, 2): + self.assertEqual(type(value << shift), int) + self.assertEqual(type(value >> shift), int) + def test_main(): support.run_unittest(LongTest) diff --git a/Lib/test/test_re.py b/Lib/test/test_re.py index 33ccd15398..0c8a52f23a 100644 --- a/Lib/test/test_re.py +++ b/Lib/test/test_re.py @@ -1223,6 +1223,11 @@ class ReTests(unittest.TestCase): pat.scanner(string='abracadabra', pos=3, endpos=10).search().span(), (7, 9)) + def test_bug_20998(self): + # Issue #20998: Fullmatch of repeated single character pattern + # with ignore case. + self.assertEqual(re.fullmatch('[a-c]+', 'ABC', re.I).span(), (0, 3)) + class PatternReprTests(unittest.TestCase): def check(self, pattern, expected): diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index a6f2c64857..74f74af0b4 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -454,7 +454,7 @@ class SiginterruptTest(unittest.TestCase): stdout = first_line + stdout exitcode = process.wait() if exitcode not in (2, 3): - raise Exception("Child error (exit code %s): %s" + raise Exception("Child error (exit code %s): %r" % (exitcode, stdout)) return (exitcode == 3) diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 4c8d493732..32ffb5fa4b 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -786,6 +786,7 @@ class ProcessTestCase(BaseTestCase): stdout=subprocess.PIPE, universal_newlines=1) p.stdin.write("line1\n") + p.stdin.flush() self.assertEqual(p.stdout.readline(), "line1\n") p.stdin.write("line3\n") p.stdin.close() |