summaryrefslogtreecommitdiff
path: root/Lib/test/test_threading.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_threading.py')
-rw-r--r--Lib/test/test_threading.py139
1 files changed, 92 insertions, 47 deletions
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index f977a7fbbe..c107652d26 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -1,16 +1,16 @@
# Very rudimentary test of threading module
import test.support
-from test.support import verbose
-import os
+from test.support import verbose, strip_python_stderr
import random
import re
import sys
-import threading
-import _thread
+_thread = test.support.import_module('_thread')
+threading = test.support.import_module('threading')
import time
import unittest
import weakref
+import os
import subprocess
from test import lock_tests
@@ -59,7 +59,16 @@ class TestThread(threading.Thread):
(self.name, self.nrunning.get()))
-class ThreadTests(unittest.TestCase):
+class BaseTestCase(unittest.TestCase):
+ def setUp(self):
+ self._threads = test.support.threading_setup()
+
+ def tearDown(self):
+ test.support.threading_cleanup(*self._threads)
+ test.support.reap_children()
+
+
+class ThreadTests(BaseTestCase):
# Create a bunch of threads, let each do some work, wait until all are
# done.
@@ -89,7 +98,8 @@ class ThreadTests(unittest.TestCase):
self.assertTrue(not t.is_alive())
self.assertNotEqual(t.ident, 0)
self.assertFalse(t.ident is None)
- self.assertTrue(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t)))
+ self.assertTrue(re.match('<TestThread\(.*, stopped -?\d+\)>',
+ repr(t)))
if verbose:
print('all tasks done')
self.assertEqual(numrunning.get(), 0)
@@ -115,9 +125,8 @@ class ThreadTests(unittest.TestCase):
try:
threading.stack_size(262144)
except _thread.error:
- if verbose:
- print('platform does not support changing thread stack size')
- return
+ raise unittest.SkipTest(
+ 'platform does not support changing thread stack size')
self.test_various_ops()
threading.stack_size(0)
@@ -128,9 +137,8 @@ class ThreadTests(unittest.TestCase):
try:
threading.stack_size(0x100000)
except _thread.error:
- if verbose:
- print('platform does not support changing thread stack size')
- return
+ raise unittest.SkipTest(
+ 'platform does not support changing thread stack size')
self.test_various_ops()
threading.stack_size(0)
@@ -147,9 +155,8 @@ class ThreadTests(unittest.TestCase):
tid = _thread.start_new_thread(f, (mutex,))
# Wait for the thread to finish.
mutex.acquire()
- self.assertTrue(tid in threading._active)
- self.assertTrue(isinstance(threading._active[tid],
- threading._DummyThread))
+ self.assertIn(tid, threading._active)
+ self.assertIsInstance(threading._active[tid], threading._DummyThread)
del threading._active[tid]
# PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
@@ -158,9 +165,7 @@ class ThreadTests(unittest.TestCase):
try:
import ctypes
except ImportError:
- if verbose:
- print("test_PyThreadState_SetAsyncExc can't import ctypes")
- return # can't do anything
+ raise unittest.SkipTest("cannot import ctypes")
set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
@@ -169,6 +174,27 @@ class ThreadTests(unittest.TestCase):
exception = ctypes.py_object(AsyncExc)
+ # First check it works when setting the exception from the same thread.
+ tid = _thread.get_ident()
+
+ try:
+ result = set_async_exc(ctypes.c_long(tid), exception)
+ # The exception is async, so we might have to keep the VM busy until
+ # it notices.
+ while True:
+ pass
+ except AsyncExc:
+ pass
+ else:
+ # This code is unreachable but it reflects the intent. If we wanted
+ # to be smarter the above loop wouldn't be infinite.
+ self.fail("AsyncExc not raised")
+ try:
+ self.assertEqual(result, 1) # one thread state modified
+ except UnboundLocalError:
+ # The exception was raised too quickly for us to get the result.
+ pass
+
# `worker_started` is set by the thread when it's inside a try/except
# block waiting to catch the asynchronously set AsyncExc exception.
# `worker_saw_exception` is set by the thread upon catching that
@@ -245,9 +271,7 @@ class ThreadTests(unittest.TestCase):
try:
import ctypes
except ImportError:
- if verbose:
- print("test_finalize_with_runnning_thread can't import ctypes")
- return # can't do anything
+ raise unittest.SkipTest("cannot import ctypes")
rc = subprocess.call([sys.executable, "-c", """if 1:
import ctypes, sys, time, _thread
@@ -302,6 +326,8 @@ class ThreadTests(unittest.TestCase):
"""],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
+ self.addCleanup(p.stdout.close)
+ self.addCleanup(p.stderr.close)
stdout, stderr = p.communicate()
rc = p.returncode
self.assertFalse(rc == 2, "interpreted was blocked")
@@ -326,30 +352,30 @@ class ThreadTests(unittest.TestCase):
"""],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
+ self.addCleanup(p.stdout.close)
+ self.addCleanup(p.stderr.close)
stdout, stderr = p.communicate()
self.assertEqual(stdout.strip(),
b"Woke up, sleep function is: <built-in function sleep>")
- stderr = re.sub(br"^\[\d+ refs\]", b"", stderr, re.MULTILINE).strip()
+ stderr = strip_python_stderr(stderr)
self.assertEqual(stderr, b"")
def test_enumerate_after_join(self):
# Try hard to trigger #1703448: a thread is still returned in
# threading.enumerate() after it has been join()ed.
enum = threading.enumerate
- old_interval = sys.getcheckinterval()
+ old_interval = sys.getswitchinterval()
try:
for i in range(1, 100):
- # Try a couple times at each thread-switching interval
- # to get more interleavings.
- sys.setcheckinterval(i // 5)
+ sys.setswitchinterval(i * 0.0002)
t = threading.Thread(target=lambda: None)
t.start()
t.join()
l = enum()
- self.assertFalse(t in l,
+ self.assertNotIn(t, l,
"#1703448 triggered after %d trials: %s" % (i, l))
finally:
- sys.setcheckinterval(old_interval)
+ sys.setswitchinterval(old_interval)
def test_no_refcycle_through_target(self):
class RunSelfFunction(object):
@@ -370,7 +396,7 @@ class ThreadTests(unittest.TestCase):
weak_cyclic_object = weakref.ref(cyclic_object)
cyclic_object.thread.join()
del cyclic_object
- self.assertEqual(None, weak_cyclic_object(),
+ self.assertIsNone(weak_cyclic_object(),
msg=('%d references still around' %
sys.getrefcount(weak_cyclic_object())))
@@ -378,7 +404,7 @@ class ThreadTests(unittest.TestCase):
weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
raising_cyclic_object.thread.join()
del raising_cyclic_object
- self.assertEqual(None, weak_raising_cyclic_object(),
+ self.assertIsNone(weak_raising_cyclic_object(),
msg=('%d references still around' %
sys.getrefcount(weak_raising_cyclic_object())))
@@ -395,8 +421,22 @@ class ThreadTests(unittest.TestCase):
e.isSet()
threading.activeCount()
+ def test_repr_daemon(self):
+ t = threading.Thread()
+ self.assertFalse('daemon' in repr(t))
+ t.daemon = True
+ self.assertTrue('daemon' in repr(t))
+
+ def test_deamon_param(self):
+ t = threading.Thread()
+ self.assertFalse(t.daemon)
+ t = threading.Thread(daemon=False)
+ self.assertFalse(t.daemon)
+ t = threading.Thread(daemon=True)
+ self.assertTrue(t.daemon)
-class ThreadJoinOnShutdown(unittest.TestCase):
+
+class ThreadJoinOnShutdown(BaseTestCase):
def _run_and_join(self, script):
script = """if 1:
@@ -432,11 +472,9 @@ class ThreadJoinOnShutdown(unittest.TestCase):
self._run_and_join(script)
+ @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
def test_2_join_in_forked_process(self):
# Like the test above, but from a forked interpreter
- import os
- if not hasattr(os, 'fork'):
- return
script = """if 1:
childpid = os.fork()
if childpid != 0:
@@ -450,19 +488,16 @@ class ThreadJoinOnShutdown(unittest.TestCase):
"""
self._run_and_join(script)
+ @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
def test_3_join_in_forked_from_thread(self):
# Like the test above, but fork() was called from a worker thread
# In the forked process, the main Thread object must be marked as stopped.
- import os
- if not hasattr(os, 'fork'):
- return
+
# Skip platforms with known problems forking from a worker thread.
# See http://bugs.python.org/issue3863.
if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5',
'os2emx'):
- print('Skipping test_3_join_in_forked_from_thread'
- ' due to known OS bugs on', sys.platform, file=sys.stderr)
- return
+ raise unittest.SkipTest('due to known OS bugs on ' + sys.platform)
script = """if 1:
main_thread = threading.current_thread()
def worker():
@@ -485,9 +520,9 @@ class ThreadJoinOnShutdown(unittest.TestCase):
def assertScriptHasOutput(self, script, expected_output):
p = subprocess.Popen([sys.executable, "-c", script],
stdout=subprocess.PIPE)
- rc = p.wait()
- data = p.stdout.read().decode().replace('\r', '')
- self.assertEqual(rc, 0, "Unexpected error")
+ stdout, stderr = p.communicate()
+ data = stdout.decode().replace('\r', '')
+ self.assertEqual(p.returncode, 0, "Unexpected error")
self.assertEqual(data, expected_output)
@unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
@@ -629,7 +664,7 @@ class ThreadJoinOnShutdown(unittest.TestCase):
self.assertScriptHasOutput(script, output)
-class ThreadingExceptionTests(unittest.TestCase):
+class ThreadingExceptionTests(BaseTestCase):
# A RuntimeError should be raised if Thread.start() is called
# multiple times.
def test_start_thread_again(self):
@@ -650,12 +685,19 @@ class ThreadingExceptionTests(unittest.TestCase):
thread.start()
self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
+ def test_releasing_unacquired_lock(self):
+ lock = threading.Lock()
+ self.assertRaises(RuntimeError, lock.release)
+
class LockTests(lock_tests.LockTests):
locktype = staticmethod(threading.Lock)
-class RLockTests(lock_tests.RLockTests):
- locktype = staticmethod(threading.RLock)
+class PyRLockTests(lock_tests.RLockTests):
+ locktype = staticmethod(threading._PyRLock)
+
+class CRLockTests(lock_tests.RLockTests):
+ locktype = staticmethod(threading._CRLock)
class EventTests(lock_tests.EventTests):
eventtype = staticmethod(threading.Event)
@@ -673,14 +715,17 @@ class SemaphoreTests(lock_tests.SemaphoreTests):
class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
semtype = staticmethod(threading.BoundedSemaphore)
+class BarrierTests(lock_tests.BarrierTests):
+ barriertype = staticmethod(threading.Barrier)
def test_main():
- test.support.run_unittest(LockTests, RLockTests, EventTests,
+ test.support.run_unittest(LockTests, PyRLockTests, CRLockTests, EventTests,
ConditionAsRLockTests, ConditionTests,
SemaphoreTests, BoundedSemaphoreTests,
ThreadTests,
ThreadJoinOnShutdown,
ThreadingExceptionTests,
+ BarrierTests
)
if __name__ == "__main__":