summaryrefslogtreecommitdiff
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rwxr-xr-xLib/test/regrtest.py14
-rw-r--r--Lib/test/support.py11
-rw-r--r--Lib/test/test_buffer.py18
-rw-r--r--Lib/test/test_bytes.py23
-rw-r--r--Lib/test/test_bz2.py136
-rw-r--r--Lib/test/test_cmd_line.py19
-rw-r--r--Lib/test/test_cmd_line_script.py22
-rw-r--r--Lib/test/test_concurrent_futures.py22
-rw-r--r--Lib/test/test_enumerate.py10
-rw-r--r--Lib/test/test_exceptions.py9
-rw-r--r--Lib/test/test_fcntl.py15
-rw-r--r--Lib/test/test_format.py16
-rw-r--r--Lib/test/test_gc.py26
-rw-r--r--Lib/test/test_generators.py21
-rw-r--r--Lib/test/test_genericpath.py12
-rw-r--r--Lib/test/test_hashlib.py127
-rw-r--r--Lib/test/test_httpservers.py11
-rw-r--r--Lib/test/test_importlib/import_/test_fromlist.py2
-rw-r--r--Lib/test/test_io.py8
-rw-r--r--Lib/test/test_iterlen.py62
-rw-r--r--Lib/test/test_itertools.py5
-rw-r--r--Lib/test/test_logging.py128
-rw-r--r--Lib/test/test_mailbox.py2
-rw-r--r--Lib/test/test_mimetypes.py2
-rw-r--r--Lib/test/test_multiprocessing.py172
-rw-r--r--Lib/test/test_operator.py25
-rw-r--r--Lib/test/test_os.py89
-rw-r--r--Lib/test/test_random.py33
-rw-r--r--Lib/test/test_range.py2
-rw-r--r--Lib/test/test_select.py2
-rw-r--r--Lib/test/test_set.py2
-rw-r--r--Lib/test/test_shelve.py13
-rw-r--r--Lib/test/test_shutil.py17
-rw-r--r--Lib/test/test_signal.py2
-rw-r--r--Lib/test/test_site.py2
-rw-r--r--Lib/test/test_socketserver.py16
-rw-r--r--Lib/test/test_sundry.py2
-rw-r--r--Lib/test/test_sys.py2
-rw-r--r--Lib/test/test_sysconfig.py44
-rw-r--r--Lib/test/test_tempfile.py6
-rw-r--r--Lib/test/test_thread.py2
-rw-r--r--Lib/test/test_threading.py6
-rw-r--r--Lib/test/test_threadsignals.py2
-rw-r--r--Lib/test/test_unicode.py30
-rw-r--r--Lib/test/test_unicodedata.py2
-rw-r--r--Lib/test/test_urllib.py18
-rw-r--r--Lib/test/test_urllib2.py1
-rw-r--r--Lib/test/test_venv.py59
48 files changed, 910 insertions, 360 deletions
diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py
index 8f3b689a0a..c342d43fd5 100755
--- a/Lib/test/regrtest.py
+++ b/Lib/test/regrtest.py
@@ -1621,20 +1621,6 @@ _expectations = (
test_ossaudiodev
test_socketserver
"""),
- ('os2emx',
- """
- test_audioop
- test_curses
- test_epoll
- test_kqueue
- test_largefile
- test_mmap
- test_openpty
- test_ossaudiodev
- test_pty
- test_resource
- test_signal
- """),
('freebsd',
"""
test_devpoll
diff --git a/Lib/test/support.py b/Lib/test/support.py
index c5640e0a08..93b94d990f 100644
--- a/Lib/test/support.py
+++ b/Lib/test/support.py
@@ -647,6 +647,17 @@ elif sys.platform != 'darwin':
# the byte 0xff. Skip some unicode filename tests.
pass
+# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
+# decoded from the filesystem encoding (in strict mode). It can be None if we
+# cannot generate such filename.
+TESTFN_UNDECODABLE = None
+for name in (b'abc\xff', b'\xe7w\xf0'):
+ try:
+ os.fsdecode(name)
+ except UnicodeDecodeError:
+ TESTFN_UNDECODABLE = name
+ break
+
# Save the initial cwd
SAVEDCWD = os.getcwd()
diff --git a/Lib/test/test_buffer.py b/Lib/test/test_buffer.py
index 977b282d63..747e2a287e 100644
--- a/Lib/test/test_buffer.py
+++ b/Lib/test/test_buffer.py
@@ -4001,23 +4001,13 @@ class TestBufferProtocol(unittest.TestCase):
# equality-hash invariant
x = ndarray(list(range(12)), shape=[12], format='B')
- a = memoryview(nd)
+ a = memoryview(x)
y = ndarray(list(range(12)), shape=[12], format='b')
- b = memoryview(nd)
+ b = memoryview(y)
- z = ndarray(list(bytes(chr(x), 'latin-1') for x in range(12)),
- shape=[12], format='c')
- c = memoryview(nd)
-
- if (a == b):
- self.assertEqual(hash(a), hash(b))
-
- if (a == c):
- self.assertEqual(hash(a), hash(c))
-
- if (b == c):
- self.assertEqual(hash(b), hash(c))
+ self.assertEqual(a, b)
+ self.assertEqual(hash(a), hash(b))
# non-byte formats
nd = ndarray(list(range(12)), shape=[2,2,3], format='L')
diff --git a/Lib/test/test_bytes.py b/Lib/test/test_bytes.py
index fe6e93935b..8ae90e44ef 100644
--- a/Lib/test/test_bytes.py
+++ b/Lib/test/test_bytes.py
@@ -288,8 +288,22 @@ class BaseBytesTest(unittest.TestCase):
self.assertEqual(self.type2test(b"").join(lst), b"abc")
self.assertEqual(self.type2test(b"").join(tuple(lst)), b"abc")
self.assertEqual(self.type2test(b"").join(iter(lst)), b"abc")
- self.assertEqual(self.type2test(b".").join([b"ab", b"cd"]), b"ab.cd")
- # XXX more...
+ dot_join = self.type2test(b".:").join
+ self.assertEqual(dot_join([b"ab", b"cd"]), b"ab.:cd")
+ self.assertEqual(dot_join([memoryview(b"ab"), b"cd"]), b"ab.:cd")
+ self.assertEqual(dot_join([b"ab", memoryview(b"cd")]), b"ab.:cd")
+ self.assertEqual(dot_join([bytearray(b"ab"), b"cd"]), b"ab.:cd")
+ self.assertEqual(dot_join([b"ab", bytearray(b"cd")]), b"ab.:cd")
+ # Stress it with many items
+ seq = [b"abc"] * 1000
+ expected = b"abc" + b".:abc" * 999
+ self.assertEqual(dot_join(seq), expected)
+ # Error handling and cleanup when some item in the middle of the
+ # sequence has the wrong type.
+ with self.assertRaises(TypeError):
+ dot_join([bytearray(b"ab"), "cd", b"ef"])
+ with self.assertRaises(TypeError):
+ dot_join([memoryview(b"ab"), "cd", b"ef"])
def test_count(self):
b = self.type2test(b'mississippi')
@@ -1267,6 +1281,11 @@ class BytearrayPEP3137Test(unittest.TestCase,
self.assertEqual(val, newval)
self.assertTrue(val is not newval,
expr+' returned val on a mutable object')
+ sep = self.marshal(b'')
+ newval = sep.join([val])
+ self.assertEqual(val, newval)
+ self.assertIsNot(val, newval)
+
class FixedStringTest(test.string_tests.BaseTest):
diff --git a/Lib/test/test_bz2.py b/Lib/test/test_bz2.py
index f4e81db6e2..24a18138fe 100644
--- a/Lib/test/test_bz2.py
+++ b/Lib/test/test_bz2.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
from test import support
-from test.support import TESTFN, bigmemtest, _4G
+from test.support import bigmemtest, _4G
import unittest
from io import BytesIO
@@ -18,10 +18,10 @@ except ImportError:
bz2 = support.import_module('bz2')
from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor
-has_cmdline_bunzip2 = sys.platform not in ("win32", "os2emx")
class BaseTest(unittest.TestCase):
"Base for other testcases."
+
TEXT_LINES = [
b'root:x:0:0:root:/root:/bin/bash\n',
b'bin:x:1:1:bin:/bin:\n',
@@ -49,13 +49,17 @@ class BaseTest(unittest.TestCase):
DATA = b'BZh91AY&SY.\xc8N\x18\x00\x01>_\x80\x00\x10@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe00\x01\x99\xaa\x00\xc0\x03F\x86\x8c#&\x83F\x9a\x03\x06\xa6\xd0\xa6\x93M\x0fQ\xa7\xa8\x06\x804hh\x12$\x11\xa4i4\xf14S\xd2<Q\xb5\x0fH\xd3\xd4\xdd\xd5\x87\xbb\xf8\x94\r\x8f\xafI\x12\xe1\xc9\xf8/E\x00pu\x89\x12]\xc9\xbbDL\nQ\x0e\t1\x12\xdf\xa0\xc0\x97\xac2O9\x89\x13\x94\x0e\x1c7\x0ed\x95I\x0c\xaaJ\xa4\x18L\x10\x05#\x9c\xaf\xba\xbc/\x97\x8a#C\xc8\xe1\x8cW\xf9\xe2\xd0\xd6M\xa7\x8bXa<e\x84t\xcbL\xb3\xa7\xd9\xcd\xd1\xcb\x84.\xaf\xb3\xab\xab\xad`n}\xa0lh\tE,\x8eZ\x15\x17VH>\x88\xe5\xcd9gd6\x0b\n\xe9\x9b\xd5\x8a\x99\xf7\x08.K\x8ev\xfb\xf7xw\xbb\xdf\xa1\x92\xf1\xdd|/";\xa2\xba\x9f\xd5\xb1#A\xb6\xf6\xb3o\xc9\xc5y\\\xebO\xe7\x85\x9a\xbc\xb6f8\x952\xd5\xd7"%\x89>V,\xf7\xa6z\xe2\x9f\xa3\xdf\x11\x11"\xd6E)I\xa9\x13^\xca\xf3r\xd0\x03U\x922\xf26\xec\xb6\xed\x8b\xc3U\x13\x9d\xc5\x170\xa4\xfa^\x92\xacDF\x8a\x97\xd6\x19\xfe\xdd\xb8\xbd\x1a\x9a\x19\xa3\x80ankR\x8b\xe5\xd83]\xa9\xc6\x08\x82f\xf6\xb9"6l$\xb8j@\xc0\x8a\xb0l1..\xbak\x83ls\x15\xbc\xf4\xc1\x13\xbe\xf8E\xb8\x9d\r\xa8\x9dk\x84\xd3n\xfa\xacQ\x07\xb1%y\xaav\xb4\x08\xe0z\x1b\x16\xf5\x04\xe9\xcc\xb9\x08z\x1en7.G\xfc]\xc9\x14\xe1B@\xbb!8`'
def setUp(self):
- self.filename = TESTFN
+ self.filename = support.TESTFN
def tearDown(self):
if os.path.isfile(self.filename):
os.unlink(self.filename)
- if has_cmdline_bunzip2:
+ if sys.platform == "win32":
+ # bunzip2 isn't available to run on Windows.
+ def decompress(self, data):
+ return bz2.decompress(data)
+ else:
def decompress(self, data):
pop = subprocess.Popen("bunzip2", shell=True,
stdin=subprocess.PIPE,
@@ -69,31 +73,21 @@ class BaseTest(unittest.TestCase):
ret = bz2.decompress(data)
return ret
- else:
- # bunzip2 isn't available to run on Windows.
- def decompress(self, data):
- return bz2.decompress(data)
class BZ2FileTest(BaseTest):
- "Test BZ2File type miscellaneous methods."
+ "Test the BZ2File class."
def createTempFile(self, streams=1):
with open(self.filename, "wb") as f:
f.write(self.DATA * streams)
def testBadArgs(self):
- with self.assertRaises(TypeError):
- BZ2File(123.456)
- with self.assertRaises(ValueError):
- BZ2File("/dev/null", "z")
- with self.assertRaises(ValueError):
- BZ2File("/dev/null", "rx")
- with self.assertRaises(ValueError):
- BZ2File("/dev/null", "rbt")
- with self.assertRaises(ValueError):
- BZ2File("/dev/null", compresslevel=0)
- with self.assertRaises(ValueError):
- BZ2File("/dev/null", compresslevel=10)
+ self.assertRaises(TypeError, BZ2File, 123.456)
+ self.assertRaises(ValueError, BZ2File, "/dev/null", "z")
+ self.assertRaises(ValueError, BZ2File, "/dev/null", "rx")
+ self.assertRaises(ValueError, BZ2File, "/dev/null", "rbt")
+ self.assertRaises(ValueError, BZ2File, "/dev/null", compresslevel=0)
+ self.assertRaises(ValueError, BZ2File, "/dev/null", compresslevel=10)
def testRead(self):
self.createTempFile()
@@ -214,9 +208,8 @@ class BZ2FileTest(BaseTest):
self.createTempFile()
bz2f = BZ2File(self.filename)
bz2f.close()
- self.assertRaises(ValueError, bz2f.__next__)
- # This call will deadlock if the above .__next__ call failed to
- # release the lock.
+ self.assertRaises(ValueError, next, bz2f)
+ # This call will deadlock if the above call failed to release the lock.
self.assertRaises(ValueError, bz2f.readlines)
def testWrite(self):
@@ -379,7 +372,7 @@ class BZ2FileTest(BaseTest):
bz2f.close()
self.assertRaises(ValueError, bz2f.seekable)
- bz2f = BZ2File(BytesIO(), mode="w")
+ bz2f = BZ2File(BytesIO(), "w")
try:
self.assertFalse(bz2f.seekable())
finally:
@@ -405,7 +398,7 @@ class BZ2FileTest(BaseTest):
bz2f.close()
self.assertRaises(ValueError, bz2f.readable)
- bz2f = BZ2File(BytesIO(), mode="w")
+ bz2f = BZ2File(BytesIO(), "w")
try:
self.assertFalse(bz2f.readable())
finally:
@@ -422,7 +415,7 @@ class BZ2FileTest(BaseTest):
bz2f.close()
self.assertRaises(ValueError, bz2f.writable)
- bz2f = BZ2File(BytesIO(), mode="w")
+ bz2f = BZ2File(BytesIO(), "w")
try:
self.assertTrue(bz2f.writable())
finally:
@@ -476,7 +469,7 @@ class BZ2FileTest(BaseTest):
# Issue #7205: Using a BZ2File from several threads shouldn't deadlock.
data = b"1" * 2**20
nthreads = 10
- with bz2.BZ2File(self.filename, 'wb') as f:
+ with BZ2File(self.filename, 'wb') as f:
def comp():
for i in range(5):
f.write(data)
@@ -487,28 +480,27 @@ class BZ2FileTest(BaseTest):
t.join()
def testWithoutThreading(self):
- bz2 = support.import_fresh_module("bz2", blocked=("threading",))
- with bz2.BZ2File(self.filename, "wb") as f:
+ module = support.import_fresh_module("bz2", blocked=("threading",))
+ with module.BZ2File(self.filename, "wb") as f:
f.write(b"abc")
- with bz2.BZ2File(self.filename, "rb") as f:
+ with module.BZ2File(self.filename, "rb") as f:
self.assertEqual(f.read(), b"abc")
def testMixedIterationAndReads(self):
self.createTempFile()
linelen = len(self.TEXT_LINES[0])
halflen = linelen // 2
- with bz2.BZ2File(self.filename) as bz2f:
+ with BZ2File(self.filename) as bz2f:
bz2f.read(halflen)
self.assertEqual(next(bz2f), self.TEXT_LINES[0][halflen:])
self.assertEqual(bz2f.read(), self.TEXT[linelen:])
- with bz2.BZ2File(self.filename) as bz2f:
+ with BZ2File(self.filename) as bz2f:
bz2f.readline()
self.assertEqual(next(bz2f), self.TEXT_LINES[1])
self.assertEqual(bz2f.readline(), self.TEXT_LINES[2])
- with bz2.BZ2File(self.filename) as bz2f:
+ with BZ2File(self.filename) as bz2f:
bz2f.readlines()
- with self.assertRaises(StopIteration):
- next(bz2f)
+ self.assertRaises(StopIteration, next, bz2f)
self.assertEqual(bz2f.readlines(), [])
def testMultiStreamOrdering(self):
@@ -576,6 +568,7 @@ class BZ2FileTest(BaseTest):
bz2f.seek(-150, 1)
self.assertEqual(bz2f.read(), self.TEXT[500-150:])
+
class BZ2CompressorTest(BaseTest):
def testCompress(self):
bz2c = BZ2Compressor()
@@ -688,97 +681,102 @@ class CompressDecompressTest(BaseTest):
class OpenTest(BaseTest):
+ "Test the open function."
+
+ def open(self, *args, **kwargs):
+ return bz2.open(*args, **kwargs)
+
def test_binary_modes(self):
- with bz2.open(self.filename, "wb") as f:
+ with self.open(self.filename, "wb") as f:
f.write(self.TEXT)
with open(self.filename, "rb") as f:
- file_data = bz2.decompress(f.read())
+ file_data = self.decompress(f.read())
self.assertEqual(file_data, self.TEXT)
- with bz2.open(self.filename, "rb") as f:
+ with self.open(self.filename, "rb") as f:
self.assertEqual(f.read(), self.TEXT)
- with bz2.open(self.filename, "ab") as f:
+ with self.open(self.filename, "ab") as f:
f.write(self.TEXT)
with open(self.filename, "rb") as f:
- file_data = bz2.decompress(f.read())
+ file_data = self.decompress(f.read())
self.assertEqual(file_data, self.TEXT * 2)
def test_implicit_binary_modes(self):
# Test implicit binary modes (no "b" or "t" in mode string).
- with bz2.open(self.filename, "w") as f:
+ with self.open(self.filename, "w") as f:
f.write(self.TEXT)
with open(self.filename, "rb") as f:
- file_data = bz2.decompress(f.read())
+ file_data = self.decompress(f.read())
self.assertEqual(file_data, self.TEXT)
- with bz2.open(self.filename, "r") as f:
+ with self.open(self.filename, "r") as f:
self.assertEqual(f.read(), self.TEXT)
- with bz2.open(self.filename, "a") as f:
+ with self.open(self.filename, "a") as f:
f.write(self.TEXT)
with open(self.filename, "rb") as f:
- file_data = bz2.decompress(f.read())
+ file_data = self.decompress(f.read())
self.assertEqual(file_data, self.TEXT * 2)
def test_text_modes(self):
text = self.TEXT.decode("ascii")
text_native_eol = text.replace("\n", os.linesep)
- with bz2.open(self.filename, "wt") as f:
+ with self.open(self.filename, "wt") as f:
f.write(text)
with open(self.filename, "rb") as f:
- file_data = bz2.decompress(f.read()).decode("ascii")
+ file_data = self.decompress(f.read()).decode("ascii")
self.assertEqual(file_data, text_native_eol)
- with bz2.open(self.filename, "rt") as f:
+ with self.open(self.filename, "rt") as f:
self.assertEqual(f.read(), text)
- with bz2.open(self.filename, "at") as f:
+ with self.open(self.filename, "at") as f:
f.write(text)
with open(self.filename, "rb") as f:
- file_data = bz2.decompress(f.read()).decode("ascii")
+ file_data = self.decompress(f.read()).decode("ascii")
self.assertEqual(file_data, text_native_eol * 2)
def test_fileobj(self):
- with bz2.open(BytesIO(self.DATA), "r") as f:
+ with self.open(BytesIO(self.DATA), "r") as f:
self.assertEqual(f.read(), self.TEXT)
- with bz2.open(BytesIO(self.DATA), "rb") as f:
+ with self.open(BytesIO(self.DATA), "rb") as f:
self.assertEqual(f.read(), self.TEXT)
text = self.TEXT.decode("ascii")
- with bz2.open(BytesIO(self.DATA), "rt") as f:
+ with self.open(BytesIO(self.DATA), "rt") as f:
self.assertEqual(f.read(), text)
def test_bad_params(self):
# Test invalid parameter combinations.
- with self.assertRaises(ValueError):
- bz2.open(self.filename, "wbt")
- with self.assertRaises(ValueError):
- bz2.open(self.filename, "rb", encoding="utf-8")
- with self.assertRaises(ValueError):
- bz2.open(self.filename, "rb", errors="ignore")
- with self.assertRaises(ValueError):
- bz2.open(self.filename, "rb", newline="\n")
+ self.assertRaises(ValueError,
+ self.open, self.filename, "wbt")
+ self.assertRaises(ValueError,
+ self.open, self.filename, "rb", encoding="utf-8")
+ self.assertRaises(ValueError,
+ self.open, self.filename, "rb", errors="ignore")
+ self.assertRaises(ValueError,
+ self.open, self.filename, "rb", newline="\n")
def test_encoding(self):
# Test non-default encoding.
text = self.TEXT.decode("ascii")
text_native_eol = text.replace("\n", os.linesep)
- with bz2.open(self.filename, "wt", encoding="utf-16-le") as f:
+ with self.open(self.filename, "wt", encoding="utf-16-le") as f:
f.write(text)
with open(self.filename, "rb") as f:
- file_data = bz2.decompress(f.read()).decode("utf-16-le")
+ file_data = self.decompress(f.read()).decode("utf-16-le")
self.assertEqual(file_data, text_native_eol)
- with bz2.open(self.filename, "rt", encoding="utf-16-le") as f:
+ with self.open(self.filename, "rt", encoding="utf-16-le") as f:
self.assertEqual(f.read(), text)
def test_encoding_error_handler(self):
# Test with non-default encoding error handler.
- with bz2.open(self.filename, "wb") as f:
+ with self.open(self.filename, "wb") as f:
f.write(b"foo\xffbar")
- with bz2.open(self.filename, "rt", encoding="ascii", errors="ignore") \
+ with self.open(self.filename, "rt", encoding="ascii", errors="ignore") \
as f:
self.assertEqual(f.read(), "foobar")
def test_newline(self):
# Test with explicit newline (universal newline mode disabled).
text = self.TEXT.decode("ascii")
- with bz2.open(self.filename, "wt", newline="\n") as f:
+ with self.open(self.filename, "wt", newline="\n") as f:
f.write(text)
- with bz2.open(self.filename, "rt", newline="\r") as f:
+ with self.open(self.filename, "rt", newline="\r") as f:
self.assertEqual(f.readlines(), [text])
diff --git a/Lib/test/test_cmd_line.py b/Lib/test/test_cmd_line.py
index 7644db21ee..4b39115de8 100644
--- a/Lib/test/test_cmd_line.py
+++ b/Lib/test/test_cmd_line.py
@@ -216,6 +216,23 @@ class CmdLineTest(unittest.TestCase):
self.assertIn(path1.encode('ascii'), out)
self.assertIn(path2.encode('ascii'), out)
+ def test_empty_PYTHONPATH_issue16309(self):
+ """On Posix, it is documented that setting PATH to the
+ empty string is equivalent to not setting PATH at all,
+ which is an exception to the rule that in a string like
+ "/bin::/usr/bin" the empty string in the middle gets
+ interpreted as '.'"""
+ code = """if 1:
+ import sys
+ path = ":".join(sys.path)
+ path = path.encode("ascii", "backslashreplace")
+ sys.stdout.buffer.write(path)"""
+ rc1, out1, err1 = assert_python_ok('-c', code, PYTHONPATH="")
+ rc2, out2, err2 = assert_python_ok('-c', code)
+ # regarding to Posix specification, outputs should be equal
+ # for empty and unset PYTHONPATH
+ self.assertEquals(out1, out2)
+
def test_displayhook_unencodable(self):
for encoding in ('ascii', 'latin-1', 'utf-8'):
env = os.environ.copy()
@@ -293,7 +310,7 @@ class CmdLineTest(unittest.TestCase):
rc, out, err = assert_python_ok('-c', code)
self.assertEqual(b'', out)
self.assertRegex(err.decode('ascii', 'ignore'),
- 'Exception OSError: .* ignored')
+ 'Exception ignored in.*\nOSError: .*')
def test_closed_stdout(self):
# Issue #13444: if stdout has been explicitly closed, we should
diff --git a/Lib/test/test_cmd_line_script.py b/Lib/test/test_cmd_line_script.py
index 6e097e37a4..86dfc733c1 100644
--- a/Lib/test/test_cmd_line_script.py
+++ b/Lib/test/test_cmd_line_script.py
@@ -366,11 +366,23 @@ class CmdLineTest(unittest.TestCase):
def test_non_utf8(self):
# Issue #16218
with temp_dir() as script_dir:
- script_name = _make_test_script(script_dir,
- '\udcf1\udcea\udcf0\udce8\udcef\udcf2')
- self._check_script(script_name, script_name, script_name,
- script_dir, None,
- importlib.machinery.SourceFileLoader)
+ script_basename = '\u0441\u043a\u0440\u0438\u043f\u0442'
+ try:
+ script_basename.encode(sys.getfilesystemencoding())
+ except UnicodeEncodeError:
+ raise unittest.SkipTest("Filesystem doesn't support "
+ "unicode names")
+ source = 'print("test output")\n'
+ script_name = _make_test_script(script_dir, script_basename, source)
+ if not __debug__:
+ run_args = ('-' + 'O' * sys.flags.optimize, script_name)
+ else:
+ run_args = (script_name,)
+ rc, out, _ = assert_python_ok(*run_args)
+ self.assertEqual(0, rc)
+ expected = ("test output" + os.linesep).encode('ascii')
+ self.assertEqual(expected, out)
+
def test_main():
support.run_unittest(CmdLineTest)
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 6ae450df06..4ad33091b0 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -15,6 +15,7 @@ import sys
import threading
import time
import unittest
+import weakref
from concurrent import futures
from concurrent.futures._base import (
@@ -52,6 +53,11 @@ def sleep_and_print(t, msg):
sys.stdout.flush()
+class MyObject(object):
+ def my_method(self):
+ pass
+
+
class ExecutorMixin:
worker_count = 5
@@ -396,6 +402,22 @@ class ExecutorTest(unittest.TestCase):
self.executor.map(str, [2] * (self.worker_count + 1))
self.executor.shutdown()
+ @test.support.cpython_only
+ def test_no_stale_references(self):
+ # Issue #16284: check that the executors don't unnecessarily hang onto
+ # references.
+ my_object = MyObject()
+ my_object_collected = threading.Event()
+ my_object_callback = weakref.ref(
+ my_object, lambda obj: my_object_collected.set())
+ # Deliberately discarding the future.
+ self.executor.submit(my_object.my_method)
+ del my_object
+
+ collected = my_object_collected.wait(timeout=5.0)
+ self.assertTrue(collected,
+ "Stale reference not collected within timeout.")
+
class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
def test_map_submits_without_iteration(self):
diff --git a/Lib/test/test_enumerate.py b/Lib/test/test_enumerate.py
index 2e904cf878..a2d18d01ad 100644
--- a/Lib/test/test_enumerate.py
+++ b/Lib/test/test_enumerate.py
@@ -1,4 +1,5 @@
import unittest
+import operator
import sys
import pickle
@@ -168,15 +169,12 @@ class TestReversed(unittest.TestCase, PickleTest):
x = range(1)
self.assertEqual(type(reversed(x)), type(iter(x)))
- @support.cpython_only
def test_len(self):
- # This is an implementation detail, not an interface requirement
- from test.test_iterlen import len
for s in ('hello', tuple('hello'), list('hello'), range(5)):
- self.assertEqual(len(reversed(s)), len(s))
+ self.assertEqual(operator.length_hint(reversed(s)), len(s))
r = reversed(s)
list(r)
- self.assertEqual(len(r), 0)
+ self.assertEqual(operator.length_hint(r), 0)
class SeqWithWeirdLen:
called = False
def __len__(self):
@@ -187,7 +185,7 @@ class TestReversed(unittest.TestCase, PickleTest):
def __getitem__(self, index):
return index
r = reversed(SeqWithWeirdLen())
- self.assertRaises(ZeroDivisionError, len, r)
+ self.assertRaises(ZeroDivisionError, operator.length_hint, r)
def test_gc(self):
diff --git a/Lib/test/test_exceptions.py b/Lib/test/test_exceptions.py
index 413f4dd62b..8e5a767f86 100644
--- a/Lib/test/test_exceptions.py
+++ b/Lib/test/test_exceptions.py
@@ -8,7 +8,7 @@ import weakref
import errno
from test.support import (TESTFN, unlink, run_unittest, captured_output,
- gc_collect, cpython_only, no_tracing)
+ check_warnings, gc_collect, cpython_only, no_tracing)
class NaiveException(Exception):
def __init__(self, x):
@@ -939,9 +939,10 @@ class ImportErrorTests(unittest.TestCase):
def test_non_str_argument(self):
# Issue #15778
- arg = b'abc'
- exc = ImportError(arg)
- self.assertEqual(str(arg), str(exc))
+ with check_warnings(('', BytesWarning), quiet=True):
+ arg = b'abc'
+ exc = ImportError(arg)
+ self.assertEqual(str(arg), str(exc))
def test_main():
diff --git a/Lib/test/test_fcntl.py b/Lib/test/test_fcntl.py
index 29af99cf16..6d9ee0bf5a 100644
--- a/Lib/test/test_fcntl.py
+++ b/Lib/test/test_fcntl.py
@@ -1,7 +1,4 @@
"""Test program for the fcntl C module.
-
-OS/2+EMX doesn't support the file locking operations.
-
"""
import os
import struct
@@ -35,8 +32,6 @@ def get_lockdata():
fcntl.F_WRLCK, 0)
elif sys.platform in ['aix3', 'aix4', 'hp-uxB', 'unixware7']:
lockdata = struct.pack('hhlllii', fcntl.F_WRLCK, 0, 0, 0, 0, 0, 0)
- elif sys.platform in ['os2emx']:
- lockdata = None
else:
lockdata = struct.pack('hh'+start_len+'hh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
if lockdata:
@@ -62,18 +57,16 @@ class TestFcntl(unittest.TestCase):
rv = fcntl.fcntl(self.f.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
if verbose:
print('Status from fcntl with O_NONBLOCK: ', rv)
- if sys.platform not in ['os2emx']:
- rv = fcntl.fcntl(self.f.fileno(), fcntl.F_SETLKW, lockdata)
- if verbose:
- print('String from fcntl with F_SETLKW: ', repr(rv))
+ rv = fcntl.fcntl(self.f.fileno(), fcntl.F_SETLKW, lockdata)
+ if verbose:
+ print('String from fcntl with F_SETLKW: ', repr(rv))
self.f.close()
def test_fcntl_file_descriptor(self):
# again, but pass the file rather than numeric descriptor
self.f = open(TESTFN, 'wb')
rv = fcntl.fcntl(self.f, fcntl.F_SETFL, os.O_NONBLOCK)
- if sys.platform not in ['os2emx']:
- rv = fcntl.fcntl(self.f, fcntl.F_SETLKW, lockdata)
+ rv = fcntl.fcntl(self.f, fcntl.F_SETLKW, lockdata)
self.f.close()
def test_fcntl_64_bit(self):
diff --git a/Lib/test/test_format.py b/Lib/test/test_format.py
index b6e25409db..e6b0d20b87 100644
--- a/Lib/test/test_format.py
+++ b/Lib/test/test_format.py
@@ -307,6 +307,22 @@ class FormatTest(unittest.TestCase):
finally:
locale.setlocale(locale.LC_ALL, oldloc)
+ @support.cpython_only
+ def test_optimisations(self):
+ text = "abcde" # 5 characters
+
+ self.assertIs("%s" % text, text)
+ self.assertIs("%.5s" % text, text)
+ self.assertIs("%.10s" % text, text)
+ self.assertIs("%1s" % text, text)
+ self.assertIs("%5s" % text, text)
+
+ self.assertIs("{0}".format(text), text)
+ self.assertIs("{0:s}".format(text), text)
+ self.assertIs("{0:.5s}".format(text), text)
+ self.assertIs("{0:.10s}".format(text), text)
+ self.assertIs("{0:1s}".format(text), text)
+ self.assertIs("{0:5s}".format(text), text)
def test_main():
diff --git a/Lib/test/test_gc.py b/Lib/test/test_gc.py
index c59b72eacf..85dbc97bb2 100644
--- a/Lib/test/test_gc.py
+++ b/Lib/test/test_gc.py
@@ -610,6 +610,32 @@ class GCTests(unittest.TestCase):
stderr = run_command(code % "gc.DEBUG_SAVEALL")
self.assertNotIn(b"uncollectable objects at shutdown", stderr)
+ def test_get_stats(self):
+ stats = gc.get_stats()
+ self.assertEqual(len(stats), 3)
+ for st in stats:
+ self.assertIsInstance(st, dict)
+ self.assertEqual(set(st),
+ {"collected", "collections", "uncollectable"})
+ self.assertGreaterEqual(st["collected"], 0)
+ self.assertGreaterEqual(st["collections"], 0)
+ self.assertGreaterEqual(st["uncollectable"], 0)
+ # Check that collection counts are incremented correctly
+ if gc.isenabled():
+ self.addCleanup(gc.enable)
+ gc.disable()
+ old = gc.get_stats()
+ gc.collect(0)
+ new = gc.get_stats()
+ self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
+ self.assertEqual(new[1]["collections"], old[1]["collections"])
+ self.assertEqual(new[2]["collections"], old[2]["collections"])
+ gc.collect(2)
+ new = gc.get_stats()
+ self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
+ self.assertEqual(new[1]["collections"], old[1]["collections"])
+ self.assertEqual(new[2]["collections"], old[2]["collections"] + 1)
+
class GCCallbackTests(unittest.TestCase):
def setUp(self):
diff --git a/Lib/test/test_generators.py b/Lib/test/test_generators.py
index 06f67c2ea6..f5c1a7d80c 100644
--- a/Lib/test/test_generators.py
+++ b/Lib/test/test_generators.py
@@ -1728,9 +1728,7 @@ Our ill-behaved code should be invoked during GC:
>>> g = f()
>>> next(g)
>>> del g
->>> sys.stderr.getvalue().startswith(
-... "Exception RuntimeError: 'generator ignored GeneratorExit' in "
-... )
+>>> "RuntimeError: generator ignored GeneratorExit" in sys.stderr.getvalue()
True
>>> sys.stderr = old
@@ -1840,22 +1838,23 @@ to test.
... sys.stderr = io.StringIO()
... class Leaker:
... def __del__(self):
-... raise RuntimeError
+... def invoke(message):
+... raise RuntimeError(message)
+... invoke("test")
...
... l = Leaker()
... del l
... err = sys.stderr.getvalue().strip()
-... err.startswith(
-... "Exception RuntimeError: RuntimeError() in <"
-... )
-... err.endswith("> ignored")
-... len(err.splitlines())
+... "Exception ignored in" in err
+... "RuntimeError: test" in err
+... "Traceback" in err
+... "in invoke" in err
... finally:
... sys.stderr = old
True
True
-1
-
+True
+True
These refleak tests should perhaps be in a testfile of their own,
diff --git a/Lib/test/test_genericpath.py b/Lib/test/test_genericpath.py
index 3eadd5893c..600ffc03d9 100644
--- a/Lib/test/test_genericpath.py
+++ b/Lib/test/test_genericpath.py
@@ -311,14 +311,10 @@ class CommonTest(GenericTest):
@unittest.skipIf(sys.platform == 'darwin',
"Mac OS X denies the creation of a directory with an invalid utf8 name")
def test_nonascii_abspath(self):
- name = b'\xe7w\xf0'
- if sys.platform == 'win32':
- try:
- os.fsdecode(name)
- except UnicodeDecodeError:
- self.skipTest("the filename %a is not decodable "
- "from the ANSI code page %s"
- % (name, sys.getfilesystemencoding()))
+ if support.TESTFN_UNDECODABLE:
+ name = support.TESTFN_UNDECODABLE
+ else:
+ name = b'a\xffb\xe7w\xf0'
# Test non-ASCII, non-UTF8 bytes in the path.
with warnings.catch_warnings():
diff --git a/Lib/test/test_hashlib.py b/Lib/test/test_hashlib.py
index 32f85e9b48..54201a1269 100644
--- a/Lib/test/test_hashlib.py
+++ b/Lib/test/test_hashlib.py
@@ -36,7 +36,10 @@ def hexstr(s):
class HashLibTestCase(unittest.TestCase):
supported_hash_names = ( 'md5', 'MD5', 'sha1', 'SHA1',
'sha224', 'SHA224', 'sha256', 'SHA256',
- 'sha384', 'SHA384', 'sha512', 'SHA512' )
+ 'sha384', 'SHA384', 'sha512', 'SHA512',
+ 'sha3_224', 'sha3_256', 'sha3_384',
+ 'sha3_512', 'SHA3_224', 'SHA3_256',
+ 'SHA3_384', 'SHA3_512' )
# Issue #14693: fallback modules are always compiled under POSIX
_warn_on_extension_import = os.name == 'posix' or COMPILED_WITH_PYDEBUG
@@ -93,6 +96,12 @@ class HashLibTestCase(unittest.TestCase):
if _sha512:
self.constructors_to_test['sha384'].add(_sha512.sha384)
self.constructors_to_test['sha512'].add(_sha512.sha512)
+ _sha3 = self._conditional_import_module('_sha3')
+ if _sha3:
+ self.constructors_to_test['sha3_224'].add(_sha3.sha3_224)
+ self.constructors_to_test['sha3_256'].add(_sha3.sha3_256)
+ self.constructors_to_test['sha3_384'].add(_sha3.sha3_384)
+ self.constructors_to_test['sha3_512'].add(_sha3.sha3_512)
super(HashLibTestCase, self).__init__(*args, **kwargs)
@@ -158,6 +167,7 @@ class HashLibTestCase(unittest.TestCase):
self.assertEqual(m1.digest(), m2.digest())
def check(self, name, data, digest):
+ digest = digest.lower()
constructors = self.constructors_to_test[name]
# 2 is for hashlib.name(...) and hashlib.new(name, ...)
self.assertGreaterEqual(len(constructors), 2)
@@ -183,6 +193,10 @@ class HashLibTestCase(unittest.TestCase):
self.check_no_unicode('sha256')
self.check_no_unicode('sha384')
self.check_no_unicode('sha512')
+ self.check_no_unicode('sha3_224')
+ self.check_no_unicode('sha3_256')
+ self.check_no_unicode('sha3_384')
+ self.check_no_unicode('sha3_512')
def test_case_md5_0(self):
self.check('md5', b'', 'd41d8cd98f00b204e9800998ecf8427e')
@@ -318,11 +332,122 @@ class HashLibTestCase(unittest.TestCase):
"e718483d0ce769644e2e42c7bc15b4638e1f98b13b2044285632a803afa973eb"+
"de0ff244877ea60a4cb0432ce577c31beb009c5c2c49aa2e4eadb217ad8cc09b")
+ # SHA-3 family
+ def test_case_sha3_224_0(self):
+ self.check('sha3_224', b"",
+ "F71837502BA8E10837BDD8D365ADB85591895602FC552B48B7390ABD")
+
+ def test_case_sha3_224_1(self):
+ self.check('sha3_224', bytes.fromhex("CC"),
+ "A9CAB59EB40A10B246290F2D6086E32E3689FAF1D26B470C899F2802")
+
+ def test_case_sha3_224_2(self):
+ self.check('sha3_224', bytes.fromhex("41FB"),
+ "615BA367AFDC35AAC397BC7EB5D58D106A734B24986D5D978FEFD62C")
+
+ def test_case_sha3_224_3(self):
+ self.check('sha3_224', bytes.fromhex(
+ "433C5303131624C0021D868A30825475E8D0BD3052A022180398F4CA4423B9"+
+ "8214B6BEAAC21C8807A2C33F8C93BD42B092CC1B06CEDF3224D5ED1EC29784"+
+ "444F22E08A55AA58542B524B02CD3D5D5F6907AFE71C5D7462224A3F9D9E53"+
+ "E7E0846DCBB4CE"),
+ "62B10F1B6236EBC2DA72957742A8D4E48E213B5F8934604BFD4D2C3A")
+
+ @bigmemtest(size=_4G + 5, memuse=1)
+ def test_case_sha3_224_huge(self, size):
+ if size == _4G + 5:
+ try:
+ self.check('sha3_224', b'A'*size,
+ '58ef60057c9dddb6a87477e9ace5a26f0d9db01881cf9b10a9f8c224')
+ except OverflowError:
+ pass # 32-bit arch
+
+
+ def test_case_sha3_256_0(self):
+ self.check('sha3_256', b"",
+ "C5D2460186F7233C927E7DB2DCC703C0E500B653CA82273B7BFAD8045D85A470")
+
+ def test_case_sha3_256_1(self):
+ self.check('sha3_256', bytes.fromhex("CC"),
+ "EEAD6DBFC7340A56CAEDC044696A168870549A6A7F6F56961E84A54BD9970B8A")
+
+ def test_case_sha3_256_2(self):
+ self.check('sha3_256', bytes.fromhex("41FB"),
+ "A8EACEDA4D47B3281A795AD9E1EA2122B407BAF9AABCB9E18B5717B7873537D2")
+
+ def test_case_sha3_256_3(self):
+ self.check('sha3_256', bytes.fromhex(
+ "433C5303131624C0021D868A30825475E8D0BD3052A022180398F4CA4423B9"+
+ "8214B6BEAAC21C8807A2C33F8C93BD42B092CC1B06CEDF3224D5ED1EC29784"+
+ "444F22E08A55AA58542B524B02CD3D5D5F6907AFE71C5D7462224A3F9D9E53"+
+ "E7E0846DCBB4CE"),
+ "CE87A5173BFFD92399221658F801D45C294D9006EE9F3F9D419C8D427748DC41")
+
+
+ def test_case_sha3_384_0(self):
+ self.check('sha3_384', b"",
+ "2C23146A63A29ACF99E73B88F8C24EAA7DC60AA771780CCC006AFBFA8FE2479B"+
+ "2DD2B21362337441AC12B515911957FF")
+
+ def test_case_sha3_384_1(self):
+ self.check('sha3_384', bytes.fromhex("CC"),
+ "1B84E62A46E5A201861754AF5DC95C4A1A69CAF4A796AE405680161E29572641"+
+ "F5FA1E8641D7958336EE7B11C58F73E9")
+
+ def test_case_sha3_384_2(self):
+ self.check('sha3_384', bytes.fromhex("41FB"),
+ "495CCE2714CD72C8C53C3363D22C58B55960FE26BE0BF3BBC7A3316DD563AD1D"+
+ "B8410E75EEFEA655E39D4670EC0B1792")
+
+ def test_case_sha3_384_3(self):
+ self.check('sha3_384', bytes.fromhex(
+ "433C5303131624C0021D868A30825475E8D0BD3052A022180398F4CA4423B9"+
+ "8214B6BEAAC21C8807A2C33F8C93BD42B092CC1B06CEDF3224D5ED1EC29784"+
+ "444F22E08A55AA58542B524B02CD3D5D5F6907AFE71C5D7462224A3F9D9E53"+
+ "E7E0846DCBB4CE"),
+ "135114508DD63E279E709C26F7817C0482766CDE49132E3EDF2EEDD8996F4E35"+
+ "96D184100B384868249F1D8B8FDAA2C9")
+
+
+ def test_case_sha3_512_0(self):
+ self.check('sha3_512', b"",
+ "0EAB42DE4C3CEB9235FC91ACFFE746B29C29A8C366B7C60E4E67C466F36A4304"+
+ "C00FA9CAF9D87976BA469BCBE06713B435F091EF2769FB160CDAB33D3670680E")
+
+ def test_case_sha3_512_1(self):
+ self.check('sha3_512', bytes.fromhex("CC"),
+ "8630C13CBD066EA74BBE7FE468FEC1DEE10EDC1254FB4C1B7C5FD69B646E4416"+
+ "0B8CE01D05A0908CA790DFB080F4B513BC3B6225ECE7A810371441A5AC666EB9")
+
+ def test_case_sha3_512_2(self):
+ self.check('sha3_512', bytes.fromhex("41FB"),
+ "551DA6236F8B96FCE9F97F1190E901324F0B45E06DBBB5CDB8355D6ED1DC34B3"+
+ "F0EAE7DCB68622FF232FA3CECE0D4616CDEB3931F93803662A28DF1CD535B731")
+
+ def test_case_sha3_512_3(self):
+ self.check('sha3_512', bytes.fromhex(
+ "433C5303131624C0021D868A30825475E8D0BD3052A022180398F4CA4423B9"+
+ "8214B6BEAAC21C8807A2C33F8C93BD42B092CC1B06CEDF3224D5ED1EC29784"+
+ "444F22E08A55AA58542B524B02CD3D5D5F6907AFE71C5D7462224A3F9D9E53"+
+ "E7E0846DCBB4CE"),
+ "527D28E341E6B14F4684ADB4B824C496C6482E51149565D3D17226828884306B"+
+ "51D6148A72622C2B75F5D3510B799D8BDC03EAEDE453676A6EC8FE03A1AD0EAB")
+
+
def test_gil(self):
# Check things work fine with an input larger than the size required
# for multithreaded operation (which is hardwired to 2048).
gil_minsize = 2048
+ for name in self.supported_hash_names:
+ m = hashlib.new(name)
+ m.update(b'1')
+ m.update(b'#' * gil_minsize)
+ m.update(b'1')
+
+ m = hashlib.new(name, b'x' * gil_minsize)
+ m.update(b'1')
+
m = hashlib.md5()
m.update(b'1')
m.update(b'#' * gil_minsize)
diff --git a/Lib/test/test_httpservers.py b/Lib/test/test_httpservers.py
index 75133c9379..c5db620421 100644
--- a/Lib/test/test_httpservers.py
+++ b/Lib/test/test_httpservers.py
@@ -92,6 +92,9 @@ class BaseHTTPServerTestCase(BaseTestCase):
def do_KEYERROR(self):
self.send_error(999)
+ def do_NOTFOUND(self):
+ self.send_error(404)
+
def do_CUSTOM(self):
self.send_response(999)
self.send_header('Content-Type', 'text/html')
@@ -211,6 +214,14 @@ class BaseHTTPServerTestCase(BaseTestCase):
self.assertEqual(res.getheader('X-Special'), 'Dängerous Mind')
self.assertEqual(res.read(), 'Ärger mit Unicode'.encode('utf-8'))
+ def test_error_content_length(self):
+ # Issue #16088: standard error responses should have a content-length
+ self.con.request('NOTFOUND', '/')
+ res = self.con.getresponse()
+ self.assertEqual(res.status, 404)
+ data = res.read()
+ self.assertEqual(int(res.getheader('Content-Length')), len(data))
+
class SimpleHTTPServerTestCase(BaseTestCase):
class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
diff --git a/Lib/test/test_importlib/import_/test_fromlist.py b/Lib/test/test_importlib/import_/test_fromlist.py
index 27b227076e..c16c33710f 100644
--- a/Lib/test/test_importlib/import_/test_fromlist.py
+++ b/Lib/test/test_importlib/import_/test_fromlist.py
@@ -80,7 +80,7 @@ class HandlingFromlist(unittest.TestCase):
with util.import_state(meta_path=[importer]):
with self.assertRaises(ImportError) as exc:
import_util.import_('pkg', fromlist=['mod'])
- self.assertEquals('i_do_not_exist', exc.exception.name)
+ self.assertEqual('i_do_not_exist', exc.exception.name)
def test_empty_string(self):
with util.mock_modules('pkg.__init__', 'pkg.mod') as importer:
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index 555ad74447..a4124697a8 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -815,6 +815,14 @@ class SizeofTest:
bufio = self.tp(rawio, buffer_size=bufsize2)
self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
+ @support.cpython_only
+ def test_buffer_freeing(self) :
+ bufsize = 4096
+ rawio = self.MockRawIO()
+ bufio = self.tp(rawio, buffer_size=bufsize)
+ size = sys.getsizeof(bufio) - bufsize
+ bufio.close()
+ self.assertEqual(sys.getsizeof(bufio), size)
class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
read_mode = "rb"
diff --git a/Lib/test/test_iterlen.py b/Lib/test/test_iterlen.py
index 7469a319f7..57f7101bee 100644
--- a/Lib/test/test_iterlen.py
+++ b/Lib/test/test_iterlen.py
@@ -45,31 +45,21 @@ import unittest
from test import support
from itertools import repeat
from collections import deque
-from builtins import len as _len
+from operator import length_hint
n = 10
-def len(obj):
- try:
- return _len(obj)
- except TypeError:
- try:
- # note: this is an internal undocumented API,
- # don't rely on it in your own programs
- return obj.__length_hint__()
- except AttributeError:
- raise TypeError
class TestInvariantWithoutMutations(unittest.TestCase):
def test_invariant(self):
it = self.it
for i in reversed(range(1, n+1)):
- self.assertEqual(len(it), i)
+ self.assertEqual(length_hint(it), i)
next(it)
- self.assertEqual(len(it), 0)
+ self.assertEqual(length_hint(it), 0)
self.assertRaises(StopIteration, next, it)
- self.assertEqual(len(it), 0)
+ self.assertEqual(length_hint(it), 0)
class TestTemporarilyImmutable(TestInvariantWithoutMutations):
@@ -78,12 +68,12 @@ class TestTemporarilyImmutable(TestInvariantWithoutMutations):
# length immutability during iteration
it = self.it
- self.assertEqual(len(it), n)
+ self.assertEqual(length_hint(it), n)
next(it)
- self.assertEqual(len(it), n-1)
+ self.assertEqual(length_hint(it), n-1)
self.mutate()
self.assertRaises(RuntimeError, next, it)
- self.assertEqual(len(it), 0)
+ self.assertEqual(length_hint(it), 0)
## ------- Concrete Type Tests -------
@@ -92,10 +82,6 @@ class TestRepeat(TestInvariantWithoutMutations):
def setUp(self):
self.it = repeat(None, n)
- def test_no_len_for_infinite_repeat(self):
- # The repeat() object can also be infinite
- self.assertRaises(TypeError, len, repeat(None))
-
class TestXrange(TestInvariantWithoutMutations):
def setUp(self):
@@ -167,14 +153,15 @@ class TestList(TestInvariantWithoutMutations):
it = iter(d)
next(it)
next(it)
- self.assertEqual(len(it), n-2)
+ self.assertEqual(length_hint(it), n - 2)
d.append(n)
- self.assertEqual(len(it), n-1) # grow with append
+ self.assertEqual(length_hint(it), n - 1) # grow with append
d[1:] = []
- self.assertEqual(len(it), 0)
+ self.assertEqual(length_hint(it), 0)
self.assertEqual(list(it), [])
d.extend(range(20))
- self.assertEqual(len(it), 0)
+ self.assertEqual(length_hint(it), 0)
+
class TestListReversed(TestInvariantWithoutMutations):
@@ -186,32 +173,41 @@ class TestListReversed(TestInvariantWithoutMutations):
it = reversed(d)
next(it)
next(it)
- self.assertEqual(len(it), n-2)
+ self.assertEqual(length_hint(it), n - 2)
d.append(n)
- self.assertEqual(len(it), n-2) # ignore append
+ self.assertEqual(length_hint(it), n - 2) # ignore append
d[1:] = []
- self.assertEqual(len(it), 0)
+ self.assertEqual(length_hint(it), 0)
self.assertEqual(list(it), []) # confirm invariant
d.extend(range(20))
- self.assertEqual(len(it), 0)
+ self.assertEqual(length_hint(it), 0)
## -- Check to make sure exceptions are not suppressed by __length_hint__()
class BadLen(object):
- def __iter__(self): return iter(range(10))
+ def __iter__(self):
+ return iter(range(10))
+
def __len__(self):
raise RuntimeError('hello')
+
class BadLengthHint(object):
- def __iter__(self): return iter(range(10))
+ def __iter__(self):
+ return iter(range(10))
+
def __length_hint__(self):
raise RuntimeError('hello')
+
class NoneLengthHint(object):
- def __iter__(self): return iter(range(10))
+ def __iter__(self):
+ return iter(range(10))
+
def __length_hint__(self):
- return None
+ return NotImplemented
+
class TestLengthHintExceptions(unittest.TestCase):
diff --git a/Lib/test/test_itertools.py b/Lib/test/test_itertools.py
index 90e85a9b25..ece7f99568 100644
--- a/Lib/test/test_itertools.py
+++ b/Lib/test/test_itertools.py
@@ -1723,9 +1723,8 @@ class TestVariousIteratorArgs(unittest.TestCase):
class LengthTransparency(unittest.TestCase):
def test_repeat(self):
- from test.test_iterlen import len
- self.assertEqual(len(repeat(None, 50)), 50)
- self.assertRaises(TypeError, len, repeat(None))
+ self.assertEqual(operator.length_hint(repeat(None, 50)), 50)
+ self.assertEqual(operator.length_hint(repeat(None), 12), 12)
class RegressionTests(unittest.TestCase):
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
index cb908fb460..92514533d9 100644
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -26,6 +26,7 @@ import logging.handlers
import logging.config
import codecs
+import configparser
import datetime
import pickle
import io
@@ -81,7 +82,7 @@ class BaseTest(unittest.TestCase):
"""Base class for logging tests."""
log_format = "%(name)s -> %(levelname)s: %(message)s"
- expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$"
+ expected_log_pat = r"^([\w.]+) -> (\w+): (\d+)$"
message_num = 0
def setUp(self):
@@ -150,14 +151,17 @@ class BaseTest(unittest.TestCase):
finally:
logging._releaseLock()
- def assert_log_lines(self, expected_values, stream=None):
+ def assert_log_lines(self, expected_values, stream=None, pat=None):
"""Match the collected log lines against the regular expression
self.expected_log_pat, and compare the extracted group values to
the expected_values list of tuples."""
stream = stream or self.stream
- pat = re.compile(self.expected_log_pat)
+ pat = re.compile(pat or self.expected_log_pat)
try:
- stream.reset()
+ if hasattr(stream, 'reset'):
+ stream.reset()
+ elif hasattr(stream, 'seek'):
+ stream.seek(0)
actual_lines = stream.readlines()
except AttributeError:
# StringIO.StringIO lacks a reset() method.
@@ -435,7 +439,7 @@ class CustomLevelsAndFiltersTest(BaseTest):
"""Test various filtering possibilities with custom logging levels."""
# Skip the logger name group.
- expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
+ expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
def setUp(self):
BaseTest.setUp(self)
@@ -996,7 +1000,7 @@ class MemoryHandlerTest(BaseTest):
"""Tests for the MemoryHandler."""
# Do not bother with a logger name group.
- expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
+ expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
def setUp(self):
BaseTest.setUp(self)
@@ -1049,7 +1053,7 @@ class ConfigFileTest(BaseTest):
"""Reading logging config from a .ini-style config file."""
- expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$"
+ expected_log_pat = r"^(\w+) \+\+ (\w+)$"
# config0 is a standard configuration.
config0 = """
@@ -1276,6 +1280,24 @@ class ConfigFileTest(BaseTest):
# Original logger output is empty.
self.assert_log_lines([])
+ def test_config0_using_cp_ok(self):
+ # A simple config file which overrides the default settings.
+ with captured_stdout() as output:
+ file = io.StringIO(textwrap.dedent(self.config0))
+ cp = configparser.ConfigParser()
+ cp.read_file(file)
+ logging.config.fileConfig(cp)
+ logger = logging.getLogger()
+ # Won't output anything
+ logger.info(self.next_message())
+ # Outputs a message
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('ERROR', '2'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+
def test_config1_ok(self, config=config1):
# A config file defining a sub-parser as well.
with captured_stdout() as output:
@@ -1419,16 +1441,19 @@ class SocketHandlerTest(BaseTest):
self.assertEqual(self.log_output, "spam\neggs\n")
def test_noserver(self):
+ # Avoid timing-related failures due to SocketHandler's own hard-wired
+ # one-second timeout on socket.create_connection() (issue #16264).
+ self.sock_hdlr.retryStart = 2.5
# Kill the server
self.server.stop(2.0)
- #The logging call should try to connect, which should fail
+ # The logging call should try to connect, which should fail
try:
raise RuntimeError('Deliberate mistake')
except RuntimeError:
self.root_logger.exception('Never sent')
self.root_logger.error('Never sent, either')
now = time.time()
- self.assertTrue(self.sock_hdlr.retryTime > now)
+ self.assertGreater(self.sock_hdlr.retryTime, now)
time.sleep(self.sock_hdlr.retryTime - now + 0.001)
self.root_logger.error('Nor this')
@@ -1754,7 +1779,7 @@ class WarningsTest(BaseTest):
logger.removeHandler(h)
s = stream.getvalue()
h.close()
- self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0)
+ self.assertGreater(s.find("UserWarning: I'm warning you...\n"), 0)
#See if an explicit file uses the original implementation
a_file = io.StringIO()
@@ -1792,7 +1817,7 @@ class ConfigDictTest(BaseTest):
"""Reading logging config from a dictionary."""
- expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$"
+ expected_log_pat = r"^(\w+) \+\+ (\w+)$"
# config0 is a standard configuration.
config0 = {
@@ -2601,10 +2626,10 @@ class ConfigDictTest(BaseTest):
self.assertRaises(Exception, self.apply_config, self.config13)
@unittest.skipUnless(threading, 'listen() needs threading to work')
- def setup_via_listener(self, text):
+ def setup_via_listener(self, text, verify=None):
text = text.encode("utf-8")
# Ask for a randomly assigned port (by using port 0)
- t = logging.config.listen(0)
+ t = logging.config.listen(0, verify)
t.start()
t.ready.wait()
# Now get the port allocated
@@ -2664,6 +2689,69 @@ class ConfigDictTest(BaseTest):
# Original logger output is empty.
self.assert_log_lines([])
+ @unittest.skipUnless(threading, 'Threading required for this test.')
+ def test_listen_verify(self):
+
+ def verify_fail(stuff):
+ return None
+
+ def verify_reverse(stuff):
+ return stuff[::-1]
+
+ logger = logging.getLogger("compiler.parser")
+ to_send = textwrap.dedent(ConfigFileTest.config1)
+ # First, specify a verification function that will fail.
+ # We expect to see no output, since our configuration
+ # never took effect.
+ with captured_stdout() as output:
+ self.setup_via_listener(to_send, verify_fail)
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ self.assert_log_lines([], stream=output)
+ # Original logger output has the stuff we logged.
+ self.assert_log_lines([
+ ('INFO', '1'),
+ ('ERROR', '2'),
+ ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
+
+ # Now, perform no verification. Our configuration
+ # should take effect.
+
+ with captured_stdout() as output:
+ self.setup_via_listener(to_send) # no verify callable specified
+ logger = logging.getLogger("compiler.parser")
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '3'),
+ ('ERROR', '4'),
+ ], stream=output)
+ # Original logger output still has the stuff we logged before.
+ self.assert_log_lines([
+ ('INFO', '1'),
+ ('ERROR', '2'),
+ ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
+
+ # Now, perform verification which transforms the bytes.
+
+ with captured_stdout() as output:
+ self.setup_via_listener(to_send[::-1], verify_reverse)
+ logger = logging.getLogger("compiler.parser")
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '5'),
+ ('ERROR', '6'),
+ ], stream=output)
+ # Original logger output still has the stuff we logged before.
+ self.assert_log_lines([
+ ('INFO', '1'),
+ ('ERROR', '2'),
+ ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
+
def test_baseconfig(self):
d = {
'atuple': (1, 2, 3),
@@ -2716,14 +2804,14 @@ class ChildLoggerTest(BaseTest):
l2 = logging.getLogger('def.ghi')
c1 = r.getChild('xyz')
c2 = r.getChild('uvw.xyz')
- self.assertTrue(c1 is logging.getLogger('xyz'))
- self.assertTrue(c2 is logging.getLogger('uvw.xyz'))
+ self.assertIs(c1, logging.getLogger('xyz'))
+ self.assertIs(c2, logging.getLogger('uvw.xyz'))
c1 = l1.getChild('def')
c2 = c1.getChild('ghi')
c3 = l1.getChild('def.ghi')
- self.assertTrue(c1 is logging.getLogger('abc.def'))
- self.assertTrue(c2 is logging.getLogger('abc.def.ghi'))
- self.assertTrue(c2 is c3)
+ self.assertIs(c1, logging.getLogger('abc.def'))
+ self.assertIs(c2, logging.getLogger('abc.def.ghi'))
+ self.assertIs(c2, c3)
class DerivedLogRecord(logging.LogRecord):
@@ -2766,7 +2854,7 @@ class LogRecordFactoryTest(BaseTest):
class QueueHandlerTest(BaseTest):
# Do not bother with a logger name group.
- expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
+ expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
def setUp(self):
BaseTest.setUp(self)
@@ -3804,7 +3892,7 @@ class NTEventLogHandlerTest(BaseTest):
h.handle(r)
h.close()
# Now see if the event is recorded
- self.assertTrue(num_recs < win32evtlog.GetNumberOfEventLogRecords(elh))
+ self.assertLess(num_recs, win32evtlog.GetNumberOfEventLogRecords(elh))
flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \
win32evtlog.EVENTLOG_SEQUENTIAL_READ
found = False
diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py
index 6b1e933966..d1cc92e253 100644
--- a/Lib/test/test_mailbox.py
+++ b/Lib/test/test_mailbox.py
@@ -596,7 +596,7 @@ class TestMaildir(TestMailbox, unittest.TestCase):
def setUp(self):
TestMailbox.setUp(self)
- if os.name in ('nt', 'os2') or sys.platform == 'cygwin':
+ if (os.name == 'nt') or (sys.platform == 'cygwin'):
self._box.colon = '!'
def assertMailboxEmpty(self):
diff --git a/Lib/test/test_mimetypes.py b/Lib/test/test_mimetypes.py
index 91da28927d..593fdb0a42 100644
--- a/Lib/test/test_mimetypes.py
+++ b/Lib/test/test_mimetypes.py
@@ -22,6 +22,8 @@ class MimeTypesTestCase(unittest.TestCase):
eq(self.db.guess_type("foo.tgz"), ("application/x-tar", "gzip"))
eq(self.db.guess_type("foo.tar.gz"), ("application/x-tar", "gzip"))
eq(self.db.guess_type("foo.tar.Z"), ("application/x-tar", "compress"))
+ eq(self.db.guess_type("foo.tar.bz2"), ("application/x-tar", "bzip2"))
+ eq(self.db.guess_type("foo.tar.xz"), ("application/x-tar", "xz"))
def test_data_urls(self):
eq = self.assertEqual
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index b2a964cec6..9c7a202d96 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -19,6 +19,7 @@ import socket
import random
import logging
import struct
+import operator
import test.support
import test.script_helper
@@ -1617,6 +1618,18 @@ def mul(x, y):
class _TestPool(BaseTestCase):
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ cls.pool = cls.Pool(4)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.pool.terminate()
+ cls.pool.join()
+ cls.pool = None
+ super().tearDownClass()
+
def test_apply(self):
papply = self.pool.apply
self.assertEqual(papply(sqr, (5,)), sqr(5))
@@ -1708,15 +1721,6 @@ class _TestPool(BaseTestCase):
p.join()
def test_terminate(self):
- if self.TYPE == 'manager':
- # On Unix a forked process increfs each shared object to
- # which its parent process held a reference. If the
- # forked process gets terminated then there is likely to
- # be a reference leak. So to prevent
- # _TestZZZNumberOfObjects from failing we skip this test
- # when using a manager.
- return
-
result = self.pool.map_async(
time.sleep, [0.1 for i in range(10000)], chunksize=1
)
@@ -1838,35 +1842,6 @@ class _TestPoolWorkerLifetime(BaseTestCase):
for (j, res) in enumerate(results):
self.assertEqual(res.get(), sqr(j))
-
-#
-# Test that manager has expected number of shared objects left
-#
-
-class _TestZZZNumberOfObjects(BaseTestCase):
- # Because test cases are sorted alphabetically, this one will get
- # run after all the other tests for the manager. It tests that
- # there have been no "reference leaks" for the manager's shared
- # objects. Note the comment in _TestPool.test_terminate().
-
- # If some other test using ManagerMixin.manager fails, then the
- # raised exception may keep alive a frame which holds a reference
- # to a managed object. This will cause test_number_of_objects to
- # also fail.
- ALLOWED_TYPES = ('manager',)
-
- def test_number_of_objects(self):
- EXPECTED_NUMBER = 1 # the pool object is still alive
- multiprocessing.active_children() # discard dead process objs
- gc.collect() # do garbage collection
- refs = self.manager._number_of_objects()
- debug_info = self.manager._debug_info()
- if refs != EXPECTED_NUMBER:
- print(self.manager._debug_info())
- print(debug_info)
-
- self.assertEqual(refs, EXPECTED_NUMBER)
-
#
# Test of creating a customized manager class
#
@@ -2903,15 +2878,6 @@ class TestInvalidHandle(unittest.TestCase):
# Functions used to create test cases from the base ones in this module
#
-def get_attributes(Source, names):
- d = {}
- for name in names:
- obj = getattr(Source, name)
- if type(obj) == type(get_attributes):
- obj = staticmethod(obj)
- d[name] = obj
- return d
-
def create_test_cases(Mixin, type):
result = {}
glob = globals()
@@ -2924,10 +2890,10 @@ def create_test_cases(Mixin, type):
assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
if type in base.ALLOWED_TYPES:
newname = 'With' + Type + name[1:]
- class Temp(base, unittest.TestCase, Mixin):
+ class Temp(base, Mixin, unittest.TestCase):
pass
result[newname] = Temp
- Temp.__name__ = newname
+ Temp.__name__ = Temp.__qualname__ = newname
Temp.__module__ = Mixin.__module__
return result
@@ -2938,12 +2904,24 @@ def create_test_cases(Mixin, type):
class ProcessesMixin(object):
TYPE = 'processes'
Process = multiprocessing.Process
- locals().update(get_attributes(multiprocessing, (
- 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
- 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'RawValue',
- 'RawArray', 'current_process', 'active_children', 'Pipe',
- 'connection', 'JoinableQueue', 'Pool'
- )))
+ connection = multiprocessing.connection
+ current_process = staticmethod(multiprocessing.current_process)
+ active_children = staticmethod(multiprocessing.active_children)
+ Pool = staticmethod(multiprocessing.Pool)
+ Pipe = staticmethod(multiprocessing.Pipe)
+ Queue = staticmethod(multiprocessing.Queue)
+ JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
+ Lock = staticmethod(multiprocessing.Lock)
+ RLock = staticmethod(multiprocessing.RLock)
+ Semaphore = staticmethod(multiprocessing.Semaphore)
+ BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
+ Condition = staticmethod(multiprocessing.Condition)
+ Event = staticmethod(multiprocessing.Event)
+ Barrier = staticmethod(multiprocessing.Barrier)
+ Value = staticmethod(multiprocessing.Value)
+ Array = staticmethod(multiprocessing.Array)
+ RawValue = staticmethod(multiprocessing.RawValue)
+ RawArray = staticmethod(multiprocessing.RawArray)
testcases_processes = create_test_cases(ProcessesMixin, type='processes')
globals().update(testcases_processes)
@@ -2952,12 +2930,42 @@ globals().update(testcases_processes)
class ManagerMixin(object):
TYPE = 'manager'
Process = multiprocessing.Process
- manager = object.__new__(multiprocessing.managers.SyncManager)
- locals().update(get_attributes(manager, (
- 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
- 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'list', 'dict',
- 'Namespace', 'JoinableQueue', 'Pool'
- )))
+ Queue = property(operator.attrgetter('manager.Queue'))
+ JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
+ Lock = property(operator.attrgetter('manager.Lock'))
+ RLock = property(operator.attrgetter('manager.RLock'))
+ Semaphore = property(operator.attrgetter('manager.Semaphore'))
+ BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
+ Condition = property(operator.attrgetter('manager.Condition'))
+ Event = property(operator.attrgetter('manager.Event'))
+ Barrier = property(operator.attrgetter('manager.Barrier'))
+ Value = property(operator.attrgetter('manager.Value'))
+ Array = property(operator.attrgetter('manager.Array'))
+ list = property(operator.attrgetter('manager.list'))
+ dict = property(operator.attrgetter('manager.dict'))
+ Namespace = property(operator.attrgetter('manager.Namespace'))
+
+ @classmethod
+ def Pool(cls, *args, **kwds):
+ return cls.manager.Pool(*args, **kwds)
+
+ @classmethod
+ def setUpClass(cls):
+ cls.manager = multiprocessing.Manager()
+
+ @classmethod
+ def tearDownClass(cls):
+ multiprocessing.active_children() # discard dead process objs
+ gc.collect() # do garbage collection
+ if cls.manager._number_of_objects() != 0:
+ # This is not really an error since some tests do not
+ # ensure that all processes which hold a reference to a
+ # managed object have been joined.
+ print('Shared objects which still exist at manager shutdown:')
+ print(cls.manager._debug_info())
+ cls.manager.shutdown()
+ cls.manager.join()
+ cls.manager = None
testcases_manager = create_test_cases(ManagerMixin, type='manager')
globals().update(testcases_manager)
@@ -2966,16 +2974,27 @@ globals().update(testcases_manager)
class ThreadsMixin(object):
TYPE = 'threads'
Process = multiprocessing.dummy.Process
- locals().update(get_attributes(multiprocessing.dummy, (
- 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
- 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'current_process',
- 'active_children', 'Pipe', 'connection', 'dict', 'list',
- 'Namespace', 'JoinableQueue', 'Pool'
- )))
+ connection = multiprocessing.dummy.connection
+ current_process = staticmethod(multiprocessing.dummy.current_process)
+ active_children = staticmethod(multiprocessing.dummy.active_children)
+ Pool = staticmethod(multiprocessing.Pool)
+ Pipe = staticmethod(multiprocessing.dummy.Pipe)
+ Queue = staticmethod(multiprocessing.dummy.Queue)
+ JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
+ Lock = staticmethod(multiprocessing.dummy.Lock)
+ RLock = staticmethod(multiprocessing.dummy.RLock)
+ Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
+ BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
+ Condition = staticmethod(multiprocessing.dummy.Condition)
+ Event = staticmethod(multiprocessing.dummy.Event)
+ Barrier = staticmethod(multiprocessing.dummy.Barrier)
+ Value = staticmethod(multiprocessing.dummy.Value)
+ Array = staticmethod(multiprocessing.dummy.Array)
testcases_threads = create_test_cases(ThreadsMixin, type='threads')
globals().update(testcases_threads)
+
class OtherTest(unittest.TestCase):
# TODO: add more tests for deliver/answer challenge.
def test_deliver_challenge_auth_failure(self):
@@ -3407,12 +3426,6 @@ def test_main(run=None):
multiprocessing.get_logger().setLevel(LOG_LEVEL)
- ProcessesMixin.pool = multiprocessing.Pool(4)
- ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
- ManagerMixin.manager.__init__()
- ManagerMixin.manager.start()
- ManagerMixin.pool = ManagerMixin.manager.Pool(4)
-
testcases = (
sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
@@ -3422,18 +3435,7 @@ def test_main(run=None):
loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
- try:
- run(suite)
- finally:
- ThreadsMixin.pool.terminate()
- ProcessesMixin.pool.terminate()
- ManagerMixin.pool.terminate()
- ManagerMixin.pool.join()
- ManagerMixin.manager.shutdown()
- ManagerMixin.manager.join()
- ThreadsMixin.pool.join()
- ProcessesMixin.pool.join()
- del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
+ run(suite)
def main():
test_main(unittest.TextTestRunner(verbosity=2).run)
diff --git a/Lib/test/test_operator.py b/Lib/test/test_operator.py
index fa608b9a52..b445ee0ada 100644
--- a/Lib/test/test_operator.py
+++ b/Lib/test/test_operator.py
@@ -410,6 +410,31 @@ class OperatorTestCase(unittest.TestCase):
self.assertEqual(operator.__ixor__ (c, 5), "ixor")
self.assertEqual(operator.__iconcat__ (c, c), "iadd")
+ def test_length_hint(self):
+ class X(object):
+ def __init__(self, value):
+ self.value = value
+
+ def __length_hint__(self):
+ if type(self.value) is type:
+ raise self.value
+ else:
+ return self.value
+
+ self.assertEqual(operator.length_hint([], 2), 0)
+ self.assertEqual(operator.length_hint(iter([1, 2, 3])), 3)
+
+ self.assertEqual(operator.length_hint(X(2)), 2)
+ self.assertEqual(operator.length_hint(X(NotImplemented), 4), 4)
+ self.assertEqual(operator.length_hint(X(TypeError), 12), 12)
+ with self.assertRaises(TypeError):
+ operator.length_hint(X("abc"))
+ with self.assertRaises(ValueError):
+ operator.length_hint(X(-2))
+ with self.assertRaises(LookupError):
+ operator.length_hint(X(LookupError))
+
+
def test_main(verbose=None):
import sys
test_classes = (
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
index 7d6b3779a4..9cf4070737 100644
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -2046,6 +2046,94 @@ class TermsizeTests(unittest.TestCase):
self.assertEqual(expected, actual)
+class OSErrorTests(unittest.TestCase):
+ def setUp(self):
+ class Str(str):
+ pass
+
+ self.bytes_filenames = []
+ self.unicode_filenames = []
+ if support.TESTFN_UNENCODABLE is not None:
+ decoded = support.TESTFN_UNENCODABLE
+ else:
+ decoded = support.TESTFN
+ self.unicode_filenames.append(decoded)
+ self.unicode_filenames.append(Str(decoded))
+ if support.TESTFN_UNDECODABLE is not None:
+ encoded = support.TESTFN_UNDECODABLE
+ else:
+ encoded = os.fsencode(support.TESTFN)
+ self.bytes_filenames.append(encoded)
+ self.bytes_filenames.append(memoryview(encoded))
+
+ self.filenames = self.bytes_filenames + self.unicode_filenames
+
+ def test_oserror_filename(self):
+ funcs = [
+ (self.filenames, os.chdir,),
+ (self.filenames, os.chmod, 0o777),
+ (self.filenames, os.listdir,),
+ (self.filenames, os.lstat,),
+ (self.filenames, os.open, os.O_RDONLY),
+ (self.filenames, os.rmdir,),
+ (self.filenames, os.stat,),
+ (self.filenames, os.unlink,),
+ ]
+ if sys.platform == "win32":
+ funcs.extend((
+ (self.bytes_filenames, os.rename, b"dst"),
+ (self.bytes_filenames, os.replace, b"dst"),
+ (self.unicode_filenames, os.rename, "dst"),
+ (self.unicode_filenames, os.replace, "dst"),
+ ))
+ else:
+ funcs.extend((
+ (self.filenames, os.rename, "dst"),
+ (self.filenames, os.replace, "dst"),
+ ))
+ if hasattr(os, "chown"):
+ funcs.append((self.filenames, os.chown, 0, 0))
+ if hasattr(os, "lchown"):
+ funcs.append((self.filenames, os.lchown, 0, 0))
+ if hasattr(os, "truncate"):
+ funcs.append((self.filenames, os.truncate, 0))
+ if hasattr(os, "chflags"):
+ funcs.extend((
+ (self.filenames, os.chflags, 0),
+ (self.filenames, os.lchflags, 0),
+ ))
+ if hasattr(os, "chroot"):
+ funcs.append((self.filenames, os.chroot,))
+ if hasattr(os, "link"):
+ if sys.platform == "win32":
+ funcs.append((self.bytes_filenames, os.link, b"dst"))
+ funcs.append((self.unicode_filenames, os.link, "dst"))
+ else:
+ funcs.append((self.filenames, os.link, "dst"))
+ if hasattr(os, "listxattr"):
+ funcs.extend((
+ (self.filenames, os.listxattr,),
+ (self.filenames, os.getxattr, "user.test"),
+ (self.filenames, os.setxattr, "user.test", b'user'),
+ (self.filenames, os.removexattr, "user.test"),
+ ))
+ if hasattr(os, "lchmod"):
+ funcs.append((self.filenames, os.lchmod, 0o777))
+ if hasattr(os, "readlink"):
+ if sys.platform == "win32":
+ funcs.append((self.unicode_filenames, os.readlink,))
+ else:
+ funcs.append((self.filenames, os.readlink,))
+
+ for filenames, func, *func_args in funcs:
+ for name in filenames:
+ try:
+ func(name, *func_args)
+ except OSError as err:
+ self.assertIs(err.filename, name)
+ else:
+ self.fail("No exception thrown by {}".format(func))
+
@support.reap_threads
def test_main():
support.run_unittest(
@@ -2074,6 +2162,7 @@ def test_main():
ExtendedAttributeTests,
Win32DeprecatedBytesAPI,
TermsizeTests,
+ OSErrorTests,
)
if __name__ == "__main__":
diff --git a/Lib/test/test_random.py b/Lib/test/test_random.py
index b5931baf4e..a9aec70f75 100644
--- a/Lib/test/test_random.py
+++ b/Lib/test/test_random.py
@@ -46,6 +46,39 @@ class TestBasicOps(unittest.TestCase):
self.assertRaises(TypeError, self.gen.seed, 1, 2, 3, 4)
self.assertRaises(TypeError, type(self.gen), [])
+ def test_shuffle(self):
+ shuffle = self.gen.shuffle
+ lst = []
+ shuffle(lst)
+ self.assertEqual(lst, [])
+ lst = [37]
+ shuffle(lst)
+ self.assertEqual(lst, [37])
+ seqs = [list(range(n)) for n in range(10)]
+ shuffled_seqs = [list(range(n)) for n in range(10)]
+ for shuffled_seq in shuffled_seqs:
+ shuffle(shuffled_seq)
+ for (seq, shuffled_seq) in zip(seqs, shuffled_seqs):
+ self.assertEqual(len(seq), len(shuffled_seq))
+ self.assertEqual(set(seq), set(shuffled_seq))
+
+ # The above tests all would pass if the shuffle was a
+ # no-op. The following non-deterministic test covers that. It
+ # asserts that the shuffled sequence of 1000 distinct elements
+ # must be different from the original one. Although there is
+ # mathematically a non-zero probability that this could
+ # actually happen in a genuinely random shuffle, it is
+ # completely negligible, given that the number of possible
+ # permutations of 1000 objects is 1000! (factorial of 1000),
+ # which is considerably larger than the number of atoms in the
+ # universe...
+ lst = list(range(1000))
+ shuffled_lst = list(range(1000))
+ shuffle(shuffled_lst)
+ self.assertTrue(lst != shuffled_lst)
+ shuffle(lst)
+ self.assertTrue(lst != shuffled_lst)
+
def test_choice(self):
choice = self.gen.choice
with self.assertRaises(IndexError):
diff --git a/Lib/test/test_range.py b/Lib/test/test_range.py
index 2a13bfeabd..f088387c33 100644
--- a/Lib/test/test_range.py
+++ b/Lib/test/test_range.py
@@ -313,7 +313,7 @@ class RangeTest(unittest.TestCase):
self.assertRaises(TypeError, range, IN())
# Test use of user-defined classes in slice indices.
- self.assertEqual(list(range(10)[:I(5)]), list(range(5)))
+ self.assertEqual(range(10)[:I(5)], range(5))
with self.assertRaises(RuntimeError):
range(0, 10)[:IX()]
diff --git a/Lib/test/test_select.py b/Lib/test/test_select.py
index ddb9a0f67e..496709cc56 100644
--- a/Lib/test/test_select.py
+++ b/Lib/test/test_select.py
@@ -5,7 +5,7 @@ import sys
import unittest
from test import support
-@unittest.skipIf(sys.platform[:3] in ('win', 'os2', 'riscos'),
+@unittest.skipIf((sys.platform[:3]=='win') or (sys.platform=='riscos'),
"can't easily test on this system")
class SelectTestCase(unittest.TestCase):
diff --git a/Lib/test/test_set.py b/Lib/test/test_set.py
index 8e9e5878b2..da627238cc 100644
--- a/Lib/test/test_set.py
+++ b/Lib/test/test_set.py
@@ -848,8 +848,6 @@ class TestBasicOps(unittest.TestCase):
for v in self.set:
self.assertIn(v, self.values)
setiter = iter(self.set)
- # note: __length_hint__ is an internal undocumented API,
- # don't rely on it in your own programs
self.assertEqual(setiter.__length_hint__(), len(self.set))
def test_pickling(self):
diff --git a/Lib/test/test_shelve.py b/Lib/test/test_shelve.py
index 13c126566d..bd51d868fe 100644
--- a/Lib/test/test_shelve.py
+++ b/Lib/test/test_shelve.py
@@ -148,6 +148,19 @@ class TestCase(unittest.TestCase):
p2 = d[encodedkey]
self.assertNotEqual(p1, p2) # Write creates new object in store
+ def test_with(self):
+ d1 = {}
+ with shelve.Shelf(d1, protocol=2, writeback=False) as s:
+ s['key1'] = [1,2,3,4]
+ self.assertEqual(s['key1'], [1,2,3,4])
+ self.assertEqual(len(s), 1)
+ self.assertRaises(ValueError, len, s)
+ try:
+ s['key1']
+ except ValueError:
+ pass
+ else:
+ self.fail('Closed shelf should not find a key')
from test import mapping_tests
diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py
index eb0e9a4ee7..c3997dc93d 100644
--- a/Lib/test/test_shutil.py
+++ b/Lib/test/test_shutil.py
@@ -18,7 +18,8 @@ from shutil import (_make_tarball, _make_zipfile, make_archive,
register_archive_format, unregister_archive_format,
get_archive_formats, Error, unpack_archive,
register_unpack_format, RegistryError,
- unregister_unpack_format, get_unpack_formats)
+ unregister_unpack_format, get_unpack_formats,
+ SameFileError)
import tarfile
import warnings
@@ -688,7 +689,7 @@ class TestShutil(unittest.TestCase):
with open(src, 'w') as f:
f.write('cheddar')
os.link(src, dst)
- self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
+ self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
with open(src, 'r') as f:
self.assertEqual(f.read(), 'cheddar')
os.remove(dst)
@@ -708,7 +709,7 @@ class TestShutil(unittest.TestCase):
# to TESTFN/TESTFN/cheese, while it should point at
# TESTFN/cheese.
os.symlink('cheese', dst)
- self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
+ self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
with open(src, 'r') as f:
self.assertEqual(f.read(), 'cheddar')
os.remove(dst)
@@ -1215,6 +1216,16 @@ class TestShutil(unittest.TestCase):
self.assertTrue(os.path.exists(rv))
self.assertEqual(read_file(src_file), read_file(dst_file))
+ def test_copyfile_same_file(self):
+ # copyfile() should raise SameFileError if the source and destination
+ # are the same.
+ src_dir = self.mkdtemp()
+ src_file = os.path.join(src_dir, 'foo')
+ write_file(src_file, 'foo')
+ self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
+ # But Error should work too, to stay backward compatible.
+ self.assertRaises(Error, shutil.copyfile, src_file, src_file)
+
def test_copytree_return_value(self):
# copytree returns its destination path.
src_dir = self.mkdtemp()
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
index 6be259bdb9..1b73634755 100644
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -15,7 +15,7 @@ try:
except ImportError:
threading = None
-if sys.platform in ('os2', 'riscos'):
+if sys.platform == 'riscos':
raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
diff --git a/Lib/test/test_site.py b/Lib/test/test_site.py
index 29286c710b..5090239ba9 100644
--- a/Lib/test/test_site.py
+++ b/Lib/test/test_site.py
@@ -222,7 +222,7 @@ class HelperFunctionsTests(unittest.TestCase):
site.PREFIXES = ['xoxo']
dirs = site.getsitepackages()
- if sys.platform in ('os2emx', 'riscos'):
+ if sys.platform == 'riscos':
self.assertEqual(len(dirs), 1)
wanted = os.path.join('xoxo', 'Lib', 'site-packages')
self.assertEqual(dirs[0], wanted)
diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py
index 353f8ff3d4..58fc5d03b3 100644
--- a/Lib/test/test_socketserver.py
+++ b/Lib/test/test_socketserver.py
@@ -27,7 +27,7 @@ TEST_STR = b"hello world\n"
HOST = test.support.HOST
HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
-HAVE_FORKING = hasattr(os, "fork") and os.name != "os2"
+HAVE_FORKING = hasattr(os, "fork")
def signal_alarm(n):
"""Call signal.alarm when it exists (i.e. not on Windows)."""
@@ -93,21 +93,7 @@ class SocketServerTest(unittest.TestCase):
# XXX: We need a way to tell AF_UNIX to pick its own name
# like AF_INET provides port==0.
dir = None
- if os.name == 'os2':
- dir = '\socket'
fn = tempfile.mktemp(prefix='unix_socket.', dir=dir)
- if os.name == 'os2':
- # AF_UNIX socket names on OS/2 require a specific prefix
- # which can't include a drive letter and must also use
- # backslashes as directory separators
- if fn[1] == ':':
- fn = fn[2:]
- if fn[0] in (os.sep, os.altsep):
- fn = fn[1:]
- if os.sep == '/':
- fn = fn.replace(os.sep, os.altsep)
- else:
- fn = fn.replace(os.altsep, os.sep)
self.test_files.append(fn)
return fn
diff --git a/Lib/test/test_sundry.py b/Lib/test/test_sundry.py
index fcccdf701a..ea1325a167 100644
--- a/Lib/test/test_sundry.py
+++ b/Lib/test/test_sundry.py
@@ -13,7 +13,6 @@ class TestUntestedModules(unittest.TestCase):
import distutils.bcppcompiler
import distutils.ccompiler
import distutils.cygwinccompiler
- import distutils.emxccompiler
import distutils.filelist
if sys.platform.startswith('win'):
import distutils.msvccompiler
@@ -48,7 +47,6 @@ class TestUntestedModules(unittest.TestCase):
import macurl2path
import mailcap
import nturl2path
- import os2emxpath
import pstats
import py_compile
import sndhdr
diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py
index e5ec85c488..a1074c33d9 100644
--- a/Lib/test/test_sys.py
+++ b/Lib/test/test_sys.py
@@ -482,7 +482,7 @@ class SysModuleTest(unittest.TestCase):
def test_thread_info(self):
info = sys.thread_info
self.assertEqual(len(info), 3)
- self.assertIn(info.name, ('nt', 'os2', 'pthread', 'solaris', None))
+ self.assertIn(info.name, ('nt', 'pthread', 'solaris', None))
self.assertIn(info.lock, ('semaphore', 'mutex+cond', None))
def test_43581(self):
diff --git a/Lib/test/test_sysconfig.py b/Lib/test/test_sysconfig.py
index 92193602f9..5293649878 100644
--- a/Lib/test/test_sysconfig.py
+++ b/Lib/test/test_sysconfig.py
@@ -234,7 +234,7 @@ class TestSysConfig(unittest.TestCase):
self.assertTrue(os.path.isfile(config_h), config_h)
def test_get_scheme_names(self):
- wanted = ('nt', 'nt_user', 'os2', 'os2_home', 'osx_framework_user',
+ wanted = ('nt', 'nt_user', 'osx_framework_user',
'posix_home', 'posix_prefix', 'posix_user')
self.assertEqual(get_scheme_names(), wanted)
@@ -305,14 +305,13 @@ class TestSysConfig(unittest.TestCase):
if 'MACOSX_DEPLOYMENT_TARGET' in env:
del env['MACOSX_DEPLOYMENT_TARGET']
- with open('/dev/null', 'w') as devnull_fp:
- p = subprocess.Popen([
- sys.executable, '-c',
- 'import sysconfig; print(sysconfig.get_platform())',
- ],
- stdout=subprocess.PIPE,
- stderr=devnull_fp,
- env=env)
+ p = subprocess.Popen([
+ sys.executable, '-c',
+ 'import sysconfig; print(sysconfig.get_platform())',
+ ],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL,
+ env=env)
test_platform = p.communicate()[0].strip()
test_platform = test_platform.decode('utf-8')
status = p.wait()
@@ -325,20 +324,19 @@ class TestSysConfig(unittest.TestCase):
env = os.environ.copy()
env['MACOSX_DEPLOYMENT_TARGET'] = '10.1'
- with open('/dev/null') as dev_null:
- p = subprocess.Popen([
- sys.executable, '-c',
- 'import sysconfig; print(sysconfig.get_platform())',
- ],
- stdout=subprocess.PIPE,
- stderr=dev_null,
- env=env)
- test_platform = p.communicate()[0].strip()
- test_platform = test_platform.decode('utf-8')
- status = p.wait()
-
- self.assertEqual(status, 0)
- self.assertEqual(my_platform, test_platform)
+ p = subprocess.Popen([
+ sys.executable, '-c',
+ 'import sysconfig; print(sysconfig.get_platform())',
+ ],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL,
+ env=env)
+ test_platform = p.communicate()[0].strip()
+ test_platform = test_platform.decode('utf-8')
+ status = p.wait()
+
+ self.assertEqual(status, 0)
+ self.assertEqual(my_platform, test_platform)
def test_srcdir(self):
# See Issues #15322, #15364.
diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py
index d79f319c19..8d161d9d88 100644
--- a/Lib/test/test_tempfile.py
+++ b/Lib/test/test_tempfile.py
@@ -276,7 +276,7 @@ class TestMkstempInner(BaseTestCase):
file = self.do_create()
mode = stat.S_IMODE(os.stat(file.name).st_mode)
expected = 0o600
- if sys.platform in ('win32', 'os2emx'):
+ if sys.platform == 'win32':
# There's no distinction among 'user', 'group' and 'world';
# replicate the 'user' bits.
user = expected >> 6
@@ -310,7 +310,7 @@ class TestMkstempInner(BaseTestCase):
# On Windows a spawn* /path/ with embedded spaces shouldn't be quoted,
# but an arg with embedded spaces should be decorated with double
# quotes on each end
- if sys.platform in ('win32',):
+ if sys.platform == 'win32':
decorated = '"%s"' % sys.executable
tester = '"%s"' % tester
else:
@@ -479,7 +479,7 @@ class TestMkdtemp(BaseTestCase):
mode = stat.S_IMODE(os.stat(dir).st_mode)
mode &= 0o777 # Mask off sticky bits inherited from /tmp
expected = 0o700
- if sys.platform in ('win32', 'os2emx'):
+ if sys.platform == 'win32':
# There's no distinction among 'user', 'group' and 'world';
# replicate the 'user' bits.
user = expected >> 6
diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py
index a191e157bc..f9a721b03b 100644
--- a/Lib/test/test_thread.py
+++ b/Lib/test/test_thread.py
@@ -68,7 +68,7 @@ class ThreadRunningTests(BasicThreadTest):
thread.stack_size(0)
self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")
- if os.name not in ("nt", "os2", "posix"):
+ if os.name not in ("nt", "posix"):
return
tss_supported = True
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index bb8d9b6767..13c017da1d 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -451,8 +451,7 @@ class ThreadJoinOnShutdown(BaseTestCase):
# #12316 and #11870), and fork() from a worker thread is known to trigger
# problems with some operating systems (issue #3863): skip problematic tests
# on platforms known to behave badly.
- platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5',
- 'os2emx')
+ platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5')
def _run_and_join(self, script):
script = """if 1:
@@ -754,7 +753,8 @@ class ThreadingExceptionTests(BaseTestCase):
lock = threading.Lock()
self.assertRaises(RuntimeError, lock.release)
- @unittest.skipUnless(sys.platform == 'darwin', 'test macosx problem')
+ @unittest.skipUnless(sys.platform == 'darwin' and test.support.python_is_optimized(),
+ 'test macosx problem')
def test_recursion_limit(self):
# Issue 9670
# test that excessive recursion within a non-main thread causes
diff --git a/Lib/test/test_threadsignals.py b/Lib/test/test_threadsignals.py
index f975a75e85..3a0a493c0d 100644
--- a/Lib/test/test_threadsignals.py
+++ b/Lib/test/test_threadsignals.py
@@ -8,7 +8,7 @@ from test.support import run_unittest, import_module
thread = import_module('_thread')
import time
-if sys.platform[:3] in ('win', 'os2') or sys.platform=='riscos':
+if (sys.platform[:3] == 'win') or (sys.platform=='riscos'):
raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
process_pid = os.getpid()
diff --git a/Lib/test/test_unicode.py b/Lib/test/test_unicode.py
index a811c4c8ac..7da5c8befc 100644
--- a/Lib/test/test_unicode.py
+++ b/Lib/test/test_unicode.py
@@ -1963,9 +1963,10 @@ class UnicodeTest(string_tests.CommonTest,
# Test PyUnicode_FromFormat()
def test_from_format(self):
support.import_module('ctypes')
- from ctypes import (pythonapi, py_object,
+ from ctypes import (
+ pythonapi, py_object, sizeof,
c_int, c_long, c_longlong, c_ssize_t,
- c_uint, c_ulong, c_ulonglong, c_size_t)
+ c_uint, c_ulong, c_ulonglong, c_size_t, c_void_p)
name = "PyUnicode_FromFormat"
_PyUnicode_FromFormat = getattr(pythonapi, name)
_PyUnicode_FromFormat.restype = py_object
@@ -2016,6 +2017,31 @@ class UnicodeTest(string_tests.CommonTest,
self.assertEqual(PyUnicode_FromFormat(b'%llu', c_ulonglong(123)), '123')
self.assertEqual(PyUnicode_FromFormat(b'%zu', c_size_t(123)), '123')
+ # test long output
+ min_longlong = -(2 ** (8 * sizeof(c_longlong) - 1))
+ max_longlong = -min_longlong - 1
+ self.assertEqual(PyUnicode_FromFormat(b'%lld', c_longlong(min_longlong)), str(min_longlong))
+ self.assertEqual(PyUnicode_FromFormat(b'%lld', c_longlong(max_longlong)), str(max_longlong))
+ max_ulonglong = 2 ** (8 * sizeof(c_ulonglong)) - 1
+ self.assertEqual(PyUnicode_FromFormat(b'%llu', c_ulonglong(max_ulonglong)), str(max_ulonglong))
+ PyUnicode_FromFormat(b'%p', c_void_p(-1))
+
+ # test padding (width and/or precision)
+ self.assertEqual(PyUnicode_FromFormat(b'%010i', c_int(123)), '123'.rjust(10, '0'))
+ self.assertEqual(PyUnicode_FromFormat(b'%100i', c_int(123)), '123'.rjust(100))
+ self.assertEqual(PyUnicode_FromFormat(b'%.100i', c_int(123)), '123'.rjust(100, '0'))
+ self.assertEqual(PyUnicode_FromFormat(b'%100.80i', c_int(123)), '123'.rjust(80, '0').rjust(100))
+
+ self.assertEqual(PyUnicode_FromFormat(b'%010u', c_uint(123)), '123'.rjust(10, '0'))
+ self.assertEqual(PyUnicode_FromFormat(b'%100u', c_uint(123)), '123'.rjust(100))
+ self.assertEqual(PyUnicode_FromFormat(b'%.100u', c_uint(123)), '123'.rjust(100, '0'))
+ self.assertEqual(PyUnicode_FromFormat(b'%100.80u', c_uint(123)), '123'.rjust(80, '0').rjust(100))
+
+ self.assertEqual(PyUnicode_FromFormat(b'%010x', c_int(0x123)), '123'.rjust(10, '0'))
+ self.assertEqual(PyUnicode_FromFormat(b'%100x', c_int(0x123)), '123'.rjust(100))
+ self.assertEqual(PyUnicode_FromFormat(b'%.100x', c_int(0x123)), '123'.rjust(100, '0'))
+ self.assertEqual(PyUnicode_FromFormat(b'%100.80x', c_int(0x123)), '123'.rjust(80, '0').rjust(100))
+
# test %A
text = PyUnicode_FromFormat(b'%%A:%A', 'abc\xe9\uabcd\U0010ffff')
self.assertEqual(text, r"%A:'abc\xe9\uabcd\U0010ffff'")
diff --git a/Lib/test/test_unicodedata.py b/Lib/test/test_unicodedata.py
index 99aa0033cd..677508b222 100644
--- a/Lib/test/test_unicodedata.py
+++ b/Lib/test/test_unicodedata.py
@@ -80,7 +80,7 @@ class UnicodeDatabaseTest(unittest.TestCase):
class UnicodeFunctionsTest(UnicodeDatabaseTest):
# update this, if the database changes
- expectedchecksum = '17fe2f12b788e4fff5479b469c4404bb6ecf841f'
+ expectedchecksum = 'ebd64e81553c9cb37f424f5616254499fcd8849e'
def test_function_checksum(self):
data = []
h = hashlib.sha1()
diff --git a/Lib/test/test_urllib.py b/Lib/test/test_urllib.py
index 623eb5ef2f..d8c3512fa8 100644
--- a/Lib/test/test_urllib.py
+++ b/Lib/test/test_urllib.py
@@ -270,9 +270,10 @@ Content-Type: text/html; charset=iso-8859-1
def test_missing_localfile(self):
# Test for #10836
- # 3.3 - URLError is not captured, explicit IOError is raised.
- with self.assertRaises(IOError):
+ with self.assertRaises(urllib.error.URLError) as e:
urlopen('file://localhost/a/file/which/doesnot/exists.py')
+ self.assertTrue(e.exception.filename)
+ self.assertTrue(e.exception.reason)
def test_file_notexists(self):
fd, tmp_file = tempfile.mkstemp()
@@ -285,20 +286,21 @@ Content-Type: text/html; charset=iso-8859-1
os.close(fd)
os.unlink(tmp_file)
self.assertFalse(os.path.exists(tmp_file))
- # 3.3 - IOError instead of URLError
- with self.assertRaises(IOError):
+ with self.assertRaises(urllib.error.URLError):
urlopen(tmp_fileurl)
def test_ftp_nohost(self):
test_ftp_url = 'ftp:///path'
- # 3.3 - IOError instead of URLError
- with self.assertRaises(IOError):
+ with self.assertRaises(urllib.error.URLError) as e:
urlopen(test_ftp_url)
+ self.assertFalse(e.exception.filename)
+ self.assertTrue(e.exception.reason)
def test_ftp_nonexisting(self):
- # 3.3 - IOError instead of URLError
- with self.assertRaises(IOError):
+ with self.assertRaises(urllib.error.URLError) as e:
urlopen('ftp://localhost/a/file/which/doesnot/exists.py')
+ self.assertFalse(e.exception.filename)
+ self.assertTrue(e.exception.reason)
def test_userpass_inurl(self):
diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py
index 7ae155331e..00ee669874 100644
--- a/Lib/test/test_urllib2.py
+++ b/Lib/test/test_urllib2.py
@@ -302,6 +302,7 @@ class MockHTTPClass:
self.req_headers = []
self.data = None
self.raise_on_endheaders = False
+ self.sock = None
self._tunnel_headers = {}
def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
diff --git a/Lib/test/test_venv.py b/Lib/test/test_venv.py
index 96968444c6..2a528c92bd 100644
--- a/Lib/test/test_venv.py
+++ b/Lib/test/test_venv.py
@@ -113,13 +113,68 @@ class BasicTest(BaseTest):
out, err = p.communicate()
self.assertEqual(out.strip(), expected.encode())
+ if sys.platform == 'win32':
+ ENV_SUBDIRS = (
+ ('Scripts',),
+ ('Include',),
+ ('Lib',),
+ ('Lib', 'site-packages'),
+ )
+ else:
+ ENV_SUBDIRS = (
+ ('bin',),
+ ('include',),
+ ('lib',),
+ ('lib', 'python%d.%d' % sys.version_info[:2]),
+ ('lib', 'python%d.%d' % sys.version_info[:2], 'site-packages'),
+ )
+
+ def create_contents(self, paths, filename):
+ """
+ Create some files in the environment which are unrelated
+ to the virtual environment.
+ """
+ for subdirs in paths:
+ d = os.path.join(self.env_dir, *subdirs)
+ os.mkdir(d)
+ fn = os.path.join(d, filename)
+ with open(fn, 'wb') as f:
+ f.write(b'Still here?')
+
def test_overwrite_existing(self):
"""
- Test control of overwriting an existing environment directory.
+ Test creating environment in an existing directory.
"""
- self.assertRaises(ValueError, venv.create, self.env_dir)
+ self.create_contents(self.ENV_SUBDIRS, 'foo')
+ venv.create(self.env_dir)
+ for subdirs in self.ENV_SUBDIRS:
+ fn = os.path.join(self.env_dir, *(subdirs + ('foo',)))
+ self.assertTrue(os.path.exists(fn))
+ with open(fn, 'rb') as f:
+ self.assertEqual(f.read(), b'Still here?')
+
builder = venv.EnvBuilder(clear=True)
builder.create(self.env_dir)
+ for subdirs in self.ENV_SUBDIRS:
+ fn = os.path.join(self.env_dir, *(subdirs + ('foo',)))
+ self.assertFalse(os.path.exists(fn))
+
+ def clear_directory(self, path):
+ for fn in os.listdir(path):
+ fn = os.path.join(path, fn)
+ if os.path.islink(fn) or os.path.isfile(fn):
+ os.remove(fn)
+ elif os.path.isdir(fn):
+ shutil.rmtree(fn)
+
+ def test_unoverwritable_fails(self):
+ #create a file clashing with directories in the env dir
+ for paths in self.ENV_SUBDIRS[:3]:
+ fn = os.path.join(self.env_dir, *paths)
+ with open(fn, 'wb') as f:
+ f.write(b'')
+ self.assertRaises((ValueError, OSError), venv.create, self.env_dir)
+ self.clear_directory(self.env_dir)
def test_upgrade(self):
"""