summaryrefslogtreecommitdiff
path: root/tests/test_concurrency.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_concurrency.py')
-rw-r--r--tests/test_concurrency.py31
1 files changed, 16 insertions, 15 deletions
diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py
index 2469e296..cc9622d1 100644
--- a/tests/test_concurrency.py
+++ b/tests/test_concurrency.py
@@ -20,6 +20,7 @@ from coverage.files import abs_file
from tests.coveragetest import CoverageTest
from tests.helpers import remove_files
+import re
# These libraries aren't always available, we'll skip tests if they aren't.
@@ -91,7 +92,7 @@ class LineCountTest(CoverageTest):
print("done")
"""
- self.assertEqual(line_count(CODE), 5)
+ assert line_count(CODE) == 5
# The code common to all the concurrency models.
@@ -227,14 +228,14 @@ class ConcurrencyTest(CoverageTest):
expected_cant_trace = cant_trace_msg(concurrency, the_module)
if expected_cant_trace is not None:
- self.assertEqual(out, expected_cant_trace)
+ assert out == expected_cant_trace
else:
# We can fully measure the code if we are using the C tracer, which
# can support all the concurrency, or if we are using threads.
if expected_out is None:
expected_out = "%d\n" % (sum(range(self.QLIMIT)))
print(code)
- self.assertEqual(out, expected_out)
+ assert out == expected_out
# Read the coverage file and see that try_it.py has all its lines
# executed.
@@ -248,7 +249,7 @@ class ConcurrencyTest(CoverageTest):
print_simple_annotation(code, linenos)
lines = line_count(code)
- self.assertEqual(line_counts(data)['try_it.py'], lines)
+ assert line_counts(data)['try_it.py'] == lines
def test_threads(self):
code = (THREAD + SUM_RANGE_Q + PRINT_SUM_RANGE).format(QLIMIT=self.QLIMIT)
@@ -399,17 +400,17 @@ class MultiprocessingTest(CoverageTest):
expected_cant_trace = cant_trace_msg(concurrency, the_module)
if expected_cant_trace is not None:
- self.assertEqual(out, expected_cant_trace)
+ assert out == expected_cant_trace
else:
- self.assertEqual(out.rstrip(), expected_out)
- self.assertEqual(len(glob.glob(".coverage.*")), nprocs + 1)
+ assert out.rstrip() == expected_out
+ assert len(glob.glob(".coverage.*")) == nprocs + 1
out = self.run_command("coverage combine")
- self.assertEqual(out, "")
+ assert out == ""
out = self.run_command("coverage report -m")
last_line = self.squeezed_lines(out)[-1]
- self.assertRegex(last_line, r"TOTAL \d+ 0 100%")
+ assert re.search(r"TOTAL \d+ 0 100%", last_line)
def test_multiprocessing_simple(self):
nprocs = 3
@@ -459,14 +460,14 @@ class MultiprocessingTest(CoverageTest):
continue
out = self.run_command("coverage run --rcfile=multi.rc multi.py %s" % (start_method,))
- self.assertEqual(out.rstrip(), expected_out)
+ assert out.rstrip() == expected_out
out = self.run_command("coverage combine")
- self.assertEqual(out, "")
+ assert out == ""
out = self.run_command("coverage report -m")
last_line = self.squeezed_lines(out)[-1]
- self.assertRegex(last_line, r"TOTAL \d+ 0 \d+ 0 100%")
+ assert re.search(r"TOTAL \d+ 0 \d+ 0 100%", last_line)
def test_multiprocessing_with_branching(self):
nprocs = 3
@@ -490,8 +491,8 @@ class MultiprocessingTest(CoverageTest):
_crash = _bootstrap
""")
out = self.run_command("coverage run multi.py")
- self.assertIn("Exception during multiprocessing bootstrap init", out)
- self.assertIn("Exception: Crashing because called by _bootstrap", out)
+ assert "Exception during multiprocessing bootstrap init" in out
+ assert "Exception: Crashing because called by _bootstrap" in out
def test_bug890(self):
# chdir in multiprocessing shouldn't keep us from finding the
@@ -510,7 +511,7 @@ class MultiprocessingTest(CoverageTest):
concurrency = multiprocessing
""")
out = self.run_command("coverage run multi.py")
- self.assertEqual(out.splitlines()[-1], "ok")
+ assert out.splitlines()[-1] == "ok"
def test_coverage_stop_in_threads():