summaryrefslogtreecommitdiff
path: root/tests/test_concurrency.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_concurrency.py')
-rw-r--r--tests/test_concurrency.py102
1 files changed, 55 insertions, 47 deletions
diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py
index d08ed1ef..e6910dc8 100644
--- a/tests/test_concurrency.py
+++ b/tests/test_concurrency.py
@@ -13,6 +13,9 @@ import sys
import threading
import time
+from types import ModuleType
+from typing import Iterable, Optional
+
from flaky import flaky
import pytest
@@ -44,7 +47,7 @@ except ImportError:
greenlet = None
-def measurable_line(l):
+def measurable_line(l: str) -> bool:
"""Is this a line of code coverage will measure?
Not blank, not a comment, and not "else"
@@ -59,12 +62,12 @@ def measurable_line(l):
return True
-def line_count(s):
+def line_count(s: str) -> int:
"""How many measurable lines are in `s`?"""
return len(list(filter(measurable_line, s.splitlines())))
-def print_simple_annotation(code, linenos):
+def print_simple_annotation(code: str, linenos: Iterable[int]) -> None:
"""Print the lines in `code` with X for each line number in `linenos`."""
for lineno, line in enumerate(code.splitlines(), start=1):
print(" {} {}".format("X" if lineno in linenos else " ", line))
@@ -75,7 +78,7 @@ class LineCountTest(CoverageTest):
run_in_temp_dir = False
- def test_line_count(self):
+ def test_line_count(self) -> None:
CODE = """
# Hey there!
x = 1
@@ -169,7 +172,7 @@ SIMPLE = """
"""
-def cant_trace_msg(concurrency, the_module):
+def cant_trace_msg(concurrency: str, the_module: Optional[ModuleType]) -> Optional[str]:
"""What might coverage.py say about a concurrency setting and imported module?"""
# In the concurrency choices, "multiprocessing" doesn't count, so remove it.
if "multiprocessing" in concurrency:
@@ -197,7 +200,13 @@ class ConcurrencyTest(CoverageTest):
QLIMIT = 1000
- def try_some_code(self, code, concurrency, the_module, expected_out=None):
+ def try_some_code(
+ self,
+ code: str,
+ concurrency: str,
+ the_module: ModuleType,
+ expected_out: Optional[str]=None,
+ ) -> None:
"""Run some concurrency testing code and see that it was all covered.
`code` is the Python code to execute. `concurrency` is the name of
@@ -232,39 +241,40 @@ class ConcurrencyTest(CoverageTest):
# If the test fails, it's helpful to see this info:
fname = abs_file("try_it.py")
linenos = data.lines(fname)
+ assert linenos is not None
print(f"{len(linenos)}: {linenos}")
print_simple_annotation(code, linenos)
lines = line_count(code)
assert line_counts(data)['try_it.py'] == lines
- def test_threads(self):
+ def test_threads(self) -> None:
code = (THREAD + SUM_RANGE_Q + PRINT_SUM_RANGE).format(QLIMIT=self.QLIMIT)
self.try_some_code(code, "thread", threading)
- def test_threads_simple_code(self):
+ def test_threads_simple_code(self) -> None:
code = SIMPLE.format(QLIMIT=self.QLIMIT)
self.try_some_code(code, "thread", threading)
- def test_eventlet(self):
+ def test_eventlet(self) -> None:
code = (EVENTLET + SUM_RANGE_Q + PRINT_SUM_RANGE).format(QLIMIT=self.QLIMIT)
self.try_some_code(code, "eventlet", eventlet)
- def test_eventlet_simple_code(self):
+ def test_eventlet_simple_code(self) -> None:
code = SIMPLE.format(QLIMIT=self.QLIMIT)
self.try_some_code(code, "eventlet", eventlet)
# https://github.com/nedbat/coveragepy/issues/663
@pytest.mark.skipif(env.WINDOWS, reason="gevent has problems on Windows: #663")
- def test_gevent(self):
+ def test_gevent(self) -> None:
code = (GEVENT + SUM_RANGE_Q + PRINT_SUM_RANGE).format(QLIMIT=self.QLIMIT)
self.try_some_code(code, "gevent", gevent)
- def test_gevent_simple_code(self):
+ def test_gevent_simple_code(self) -> None:
code = SIMPLE.format(QLIMIT=self.QLIMIT)
self.try_some_code(code, "gevent", gevent)
- def test_greenlet(self):
+ def test_greenlet(self) -> None:
GREENLET = """\
from greenlet import greenlet
@@ -282,11 +292,11 @@ class ConcurrencyTest(CoverageTest):
"""
self.try_some_code(GREENLET, "greenlet", greenlet, "hello world\n42\n")
- def test_greenlet_simple_code(self):
+ def test_greenlet_simple_code(self) -> None:
code = SIMPLE.format(QLIMIT=self.QLIMIT)
self.try_some_code(code, "greenlet", greenlet)
- def test_bug_330(self):
+ def test_bug_330(self) -> None:
BUG_330 = """\
from weakref import WeakKeyDictionary
import eventlet
@@ -304,7 +314,7 @@ class ConcurrencyTest(CoverageTest):
"""
self.try_some_code(BUG_330, "eventlet", eventlet, "0\n")
- def test_threads_with_gevent(self):
+ def test_threads_with_gevent(self) -> None:
self.make_file("both.py", """\
import queue
import threading
@@ -345,25 +355,25 @@ class ConcurrencyTest(CoverageTest):
last_line = self.squeezed_lines(out)[-1]
assert re.search(r"TOTAL \d+ 0 100%", last_line)
- def test_bad_concurrency(self):
+ def test_bad_concurrency(self) -> None:
with pytest.raises(ConfigError, match="Unknown concurrency choices: nothing"):
self.command_line("run --concurrency=nothing prog.py")
- def test_bad_concurrency_in_config(self):
+ def test_bad_concurrency_in_config(self) -> None:
self.make_file(".coveragerc", "[run]\nconcurrency = nothing\n")
with pytest.raises(ConfigError, match="Unknown concurrency choices: nothing"):
self.command_line("run prog.py")
- def test_no_multiple_light_concurrency(self):
+ def test_no_multiple_light_concurrency(self) -> None:
with pytest.raises(ConfigError, match="Conflicting concurrency settings: eventlet, gevent"):
self.command_line("run --concurrency=gevent,eventlet prog.py")
- def test_no_multiple_light_concurrency_in_config(self):
+ def test_no_multiple_light_concurrency_in_config(self) -> None:
self.make_file(".coveragerc", "[run]\nconcurrency = gevent, eventlet\n")
with pytest.raises(ConfigError, match="Conflicting concurrency settings: eventlet, gevent"):
self.command_line("run prog.py")
- def test_multiprocessing_needs_config_file(self):
+ def test_multiprocessing_needs_config_file(self) -> None:
with pytest.raises(ConfigError, match="multiprocessing requires a configuration file"):
self.command_line("run --concurrency=multiprocessing prog.py")
@@ -372,9 +382,9 @@ class WithoutConcurrencyModuleTest(CoverageTest):
"""Tests of what happens if the requested concurrency isn't installed."""
@pytest.mark.parametrize("module", ["eventlet", "gevent", "greenlet"])
- def test_missing_module(self, module):
+ def test_missing_module(self, module: str) -> None:
self.make_file("prog.py", "a = 1")
- sys.modules[module] = None
+ sys.modules[module] = None # type: ignore[assignment]
msg = f"Couldn't trace with concurrency={module}, the module isn't installed."
with pytest.raises(ConfigError, match=msg):
self.command_line(f"run --concurrency={module} prog.py")
@@ -428,9 +438,9 @@ MULTI_CODE = """
@pytest.fixture(params=["fork", "spawn"], name="start_method")
-def start_method_fixture(request):
+def start_method_fixture(request: pytest.FixtureRequest) -> str:
"""Parameterized fixture to choose the start_method for multiprocessing."""
- start_method = request.param
+ start_method: str = request.param
if start_method not in multiprocessing.get_all_start_methods():
# Windows doesn't support "fork".
pytest.skip(f"start_method={start_method} not supported here")
@@ -443,14 +453,14 @@ class MultiprocessingTest(CoverageTest):
def try_multiprocessing_code(
self,
- code,
- expected_out,
- the_module,
- nprocs,
- start_method,
- concurrency="multiprocessing",
- args="",
- ):
+ code: str,
+ expected_out: Optional[str],
+ the_module: ModuleType,
+ nprocs: int,
+ start_method: str,
+ concurrency: str="multiprocessing",
+ args: str="",
+ ) -> None:
"""Run code using multiprocessing, it should produce `expected_out`."""
self.make_file("multi.py", code)
self.make_file(".coveragerc", f"""\
@@ -459,9 +469,7 @@ class MultiprocessingTest(CoverageTest):
source = .
""")
- cmd = "coverage run {args} multi.py {start_method}".format(
- args=args, start_method=start_method,
- )
+ cmd = f"coverage run {args} multi.py {start_method}"
out = self.run_command(cmd)
expected_cant_trace = cant_trace_msg(concurrency, the_module)
@@ -489,7 +497,7 @@ class MultiprocessingTest(CoverageTest):
last_line = self.squeezed_lines(out)[-1]
assert re.search(r"TOTAL \d+ 0 100%", last_line)
- def test_multiprocessing_simple(self, start_method):
+ def test_multiprocessing_simple(self, start_method: str) -> None:
nprocs = 3
upto = 30
code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto)
@@ -503,7 +511,7 @@ class MultiprocessingTest(CoverageTest):
start_method=start_method,
)
- def test_multiprocessing_append(self, start_method):
+ def test_multiprocessing_append(self, start_method: str) -> None:
nprocs = 3
upto = 30
code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto)
@@ -518,7 +526,7 @@ class MultiprocessingTest(CoverageTest):
start_method=start_method,
)
- def test_multiprocessing_and_gevent(self, start_method):
+ def test_multiprocessing_and_gevent(self, start_method: str) -> None:
nprocs = 3
upto = 30
code = (
@@ -535,7 +543,7 @@ class MultiprocessingTest(CoverageTest):
start_method=start_method,
)
- def test_multiprocessing_with_branching(self, start_method):
+ def test_multiprocessing_with_branching(self, start_method: str) -> None:
nprocs = 3
upto = 30
code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto)
@@ -559,7 +567,7 @@ class MultiprocessingTest(CoverageTest):
last_line = self.squeezed_lines(out)[-1]
assert re.search(r"TOTAL \d+ 0 \d+ 0 100%", last_line)
- def test_multiprocessing_bootstrap_error_handling(self):
+ def test_multiprocessing_bootstrap_error_handling(self) -> None:
# An exception during bootstrapping will be reported.
self.make_file("multi.py", """\
import multiprocessing
@@ -576,7 +584,7 @@ class MultiprocessingTest(CoverageTest):
assert "Exception during multiprocessing bootstrap init" in out
assert "Exception: Crashing because called by _bootstrap" in out
- def test_bug_890(self):
+ def test_bug_890(self) -> None:
# chdir in multiprocessing shouldn't keep us from finding the
# .coveragerc file.
self.make_file("multi.py", """\
@@ -596,11 +604,11 @@ class MultiprocessingTest(CoverageTest):
assert out.splitlines()[-1] == "ok"
-def test_coverage_stop_in_threads():
+def test_coverage_stop_in_threads() -> None:
has_started_coverage = []
has_stopped_coverage = []
- def run_thread(): # pragma: nested
+ def run_thread() -> None: # pragma: nested
"""Check that coverage is stopping properly in threads."""
deadline = time.time() + 5
ident = threading.current_thread().ident
@@ -648,7 +656,7 @@ def test_thread_safe_save_data(tmp_path: pathlib.Path) -> None:
for module_name in module_names:
import_local_file(module_name)
- def random_load(): # pragma: nested
+ def random_load() -> None: # pragma: nested
"""Import modules randomly to stress coverage."""
while should_run[0]:
module_name = random.choice(module_names)
@@ -695,7 +703,7 @@ class SigtermTest(CoverageTest):
"""Tests of our handling of SIGTERM."""
@pytest.mark.parametrize("sigterm", [False, True])
- def test_sigterm_saves_data(self, sigterm):
+ def test_sigterm_saves_data(self, sigterm: bool) -> None:
# A terminated process should save its coverage data.
self.make_file("clobbered.py", """\
import multiprocessing
@@ -741,7 +749,7 @@ class SigtermTest(CoverageTest):
expected = "clobbered.py 17 5 71% 5-10"
assert self.squeezed_lines(out)[2] == expected
- def test_sigterm_still_runs(self):
+ def test_sigterm_still_runs(self) -> None:
# A terminated process still runs its own SIGTERM handler.
self.make_file("handler.py", """\
import multiprocessing