summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNed Batchelder <ned@nedbatchelder.com>2023-01-03 06:51:59 -0500
committerNed Batchelder <ned@nedbatchelder.com>2023-01-03 06:51:59 -0500
commit9d2e1b055cf7ed02eba725b86d476c24a718178d (patch)
tree439fdff14ddadb20022dbf535d7aa627d8dd710e
parentc3ee30c1cfd133f1e36a4a8992b531a0dc7ec5a9 (diff)
downloadpython-coveragepy-git-9d2e1b055cf7ed02eba725b86d476c24a718178d.tar.gz
mypy: test_concurrency.py, test_python.py
-rw-r--r--tests/mixins.py25
-rw-r--r--tests/test_concurrency.py102
-rw-r--r--tests/test_python.py3
-rw-r--r--tox.ini5
4 files changed, 76 insertions, 59 deletions
diff --git a/tests/mixins.py b/tests/mixins.py
index 0f578637..f8cc5085 100644
--- a/tests/mixins.py
+++ b/tests/mixins.py
@@ -13,6 +13,7 @@ import os.path
import sys
from typing import Iterator, Tuple
+from typing import Iterable, Optional
import pytest
@@ -24,26 +25,26 @@ class PytestBase:
"""A base class to connect to pytest in a test class hierarchy."""
@pytest.fixture(autouse=True)
- def connect_to_pytest(self, request, monkeypatch):
+ def connect_to_pytest(self, request, monkeypatch) -> None:
"""Captures pytest facilities for use by other test helpers."""
# pylint: disable=attribute-defined-outside-init
self._pytest_request = request
self._monkeypatch = monkeypatch
self.setUp()
- def setUp(self):
+ def setUp(self) -> None:
"""Per-test initialization. Override this as you wish."""
pass
- def addCleanup(self, fn, *args):
+ def addCleanup(self, fn, *args) -> None:
"""Like unittest's addCleanup: code to call when the test is done."""
self._pytest_request.addfinalizer(lambda: fn(*args))
- def set_environ(self, name, value):
+ def set_environ(self, name, value) -> None:
"""Set an environment variable `name` to be `value`."""
self._monkeypatch.setenv(name, value)
- def del_environ(self, name):
+ def del_environ(self, name) -> None:
"""Delete an environment variable, unless we set it."""
self._monkeypatch.delenv(name, raising=False)
@@ -72,7 +73,13 @@ class TempDirMixin:
else:
yield
- def make_file(self, filename, text="", bytes=b"", newline=None):
+ def make_file(
+ self,
+ filename: str,
+ text: str="",
+ bytes: bytes=b"",
+ newline: Optional[str]=None,
+ ) -> str:
"""Make a file. See `tests.helpers.make_file`"""
# pylint: disable=redefined-builtin # bytes
assert self.run_in_temp_dir, "Only use make_file when running in a temp dir"
@@ -83,7 +90,7 @@ class RestoreModulesMixin:
"""Auto-restore the imported modules at the end of each test."""
@pytest.fixture(autouse=True)
- def _module_saving(self):
+ def _module_saving(self) -> Iterable[None]:
"""Remove modules we imported during the test."""
self._sys_module_saver = SysModuleSaver()
try:
@@ -91,7 +98,7 @@ class RestoreModulesMixin:
finally:
self._sys_module_saver.restore()
- def clean_local_file_imports(self):
+ def clean_local_file_imports(self) -> None:
"""Clean up the results of calls to `import_local_file`.
Use this if you need to `import_local_file` the same file twice in
@@ -120,7 +127,7 @@ class StdStreamCapturingMixin:
"""
@pytest.fixture(autouse=True)
- def _capcapsys(self, capsys):
+ def _capcapsys(self, capsys: pytest.CaptureFixture[str]) -> None:
"""Grab the fixture so our methods can use it."""
self.capsys = capsys
diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py
index d08ed1ef..e6910dc8 100644
--- a/tests/test_concurrency.py
+++ b/tests/test_concurrency.py
@@ -13,6 +13,9 @@ import sys
import threading
import time
+from types import ModuleType
+from typing import Iterable, Optional
+
from flaky import flaky
import pytest
@@ -44,7 +47,7 @@ except ImportError:
greenlet = None
-def measurable_line(l):
+def measurable_line(l: str) -> bool:
"""Is this a line of code coverage will measure?
Not blank, not a comment, and not "else"
@@ -59,12 +62,12 @@ def measurable_line(l):
return True
-def line_count(s):
+def line_count(s: str) -> int:
"""How many measurable lines are in `s`?"""
return len(list(filter(measurable_line, s.splitlines())))
-def print_simple_annotation(code, linenos):
+def print_simple_annotation(code: str, linenos: Iterable[int]) -> None:
"""Print the lines in `code` with X for each line number in `linenos`."""
for lineno, line in enumerate(code.splitlines(), start=1):
print(" {} {}".format("X" if lineno in linenos else " ", line))
@@ -75,7 +78,7 @@ class LineCountTest(CoverageTest):
run_in_temp_dir = False
- def test_line_count(self):
+ def test_line_count(self) -> None:
CODE = """
# Hey there!
x = 1
@@ -169,7 +172,7 @@ SIMPLE = """
"""
-def cant_trace_msg(concurrency, the_module):
+def cant_trace_msg(concurrency: str, the_module: Optional[ModuleType]) -> Optional[str]:
"""What might coverage.py say about a concurrency setting and imported module?"""
# In the concurrency choices, "multiprocessing" doesn't count, so remove it.
if "multiprocessing" in concurrency:
@@ -197,7 +200,13 @@ class ConcurrencyTest(CoverageTest):
QLIMIT = 1000
- def try_some_code(self, code, concurrency, the_module, expected_out=None):
+ def try_some_code(
+ self,
+ code: str,
+ concurrency: str,
+ the_module: ModuleType,
+ expected_out: Optional[str]=None,
+ ) -> None:
"""Run some concurrency testing code and see that it was all covered.
`code` is the Python code to execute. `concurrency` is the name of
@@ -232,39 +241,40 @@ class ConcurrencyTest(CoverageTest):
# If the test fails, it's helpful to see this info:
fname = abs_file("try_it.py")
linenos = data.lines(fname)
+ assert linenos is not None
print(f"{len(linenos)}: {linenos}")
print_simple_annotation(code, linenos)
lines = line_count(code)
assert line_counts(data)['try_it.py'] == lines
- def test_threads(self):
+ def test_threads(self) -> None:
code = (THREAD + SUM_RANGE_Q + PRINT_SUM_RANGE).format(QLIMIT=self.QLIMIT)
self.try_some_code(code, "thread", threading)
- def test_threads_simple_code(self):
+ def test_threads_simple_code(self) -> None:
code = SIMPLE.format(QLIMIT=self.QLIMIT)
self.try_some_code(code, "thread", threading)
- def test_eventlet(self):
+ def test_eventlet(self) -> None:
code = (EVENTLET + SUM_RANGE_Q + PRINT_SUM_RANGE).format(QLIMIT=self.QLIMIT)
self.try_some_code(code, "eventlet", eventlet)
- def test_eventlet_simple_code(self):
+ def test_eventlet_simple_code(self) -> None:
code = SIMPLE.format(QLIMIT=self.QLIMIT)
self.try_some_code(code, "eventlet", eventlet)
# https://github.com/nedbat/coveragepy/issues/663
@pytest.mark.skipif(env.WINDOWS, reason="gevent has problems on Windows: #663")
- def test_gevent(self):
+ def test_gevent(self) -> None:
code = (GEVENT + SUM_RANGE_Q + PRINT_SUM_RANGE).format(QLIMIT=self.QLIMIT)
self.try_some_code(code, "gevent", gevent)
- def test_gevent_simple_code(self):
+ def test_gevent_simple_code(self) -> None:
code = SIMPLE.format(QLIMIT=self.QLIMIT)
self.try_some_code(code, "gevent", gevent)
- def test_greenlet(self):
+ def test_greenlet(self) -> None:
GREENLET = """\
from greenlet import greenlet
@@ -282,11 +292,11 @@ class ConcurrencyTest(CoverageTest):
"""
self.try_some_code(GREENLET, "greenlet", greenlet, "hello world\n42\n")
- def test_greenlet_simple_code(self):
+ def test_greenlet_simple_code(self) -> None:
code = SIMPLE.format(QLIMIT=self.QLIMIT)
self.try_some_code(code, "greenlet", greenlet)
- def test_bug_330(self):
+ def test_bug_330(self) -> None:
BUG_330 = """\
from weakref import WeakKeyDictionary
import eventlet
@@ -304,7 +314,7 @@ class ConcurrencyTest(CoverageTest):
"""
self.try_some_code(BUG_330, "eventlet", eventlet, "0\n")
- def test_threads_with_gevent(self):
+ def test_threads_with_gevent(self) -> None:
self.make_file("both.py", """\
import queue
import threading
@@ -345,25 +355,25 @@ class ConcurrencyTest(CoverageTest):
last_line = self.squeezed_lines(out)[-1]
assert re.search(r"TOTAL \d+ 0 100%", last_line)
- def test_bad_concurrency(self):
+ def test_bad_concurrency(self) -> None:
with pytest.raises(ConfigError, match="Unknown concurrency choices: nothing"):
self.command_line("run --concurrency=nothing prog.py")
- def test_bad_concurrency_in_config(self):
+ def test_bad_concurrency_in_config(self) -> None:
self.make_file(".coveragerc", "[run]\nconcurrency = nothing\n")
with pytest.raises(ConfigError, match="Unknown concurrency choices: nothing"):
self.command_line("run prog.py")
- def test_no_multiple_light_concurrency(self):
+ def test_no_multiple_light_concurrency(self) -> None:
with pytest.raises(ConfigError, match="Conflicting concurrency settings: eventlet, gevent"):
self.command_line("run --concurrency=gevent,eventlet prog.py")
- def test_no_multiple_light_concurrency_in_config(self):
+ def test_no_multiple_light_concurrency_in_config(self) -> None:
self.make_file(".coveragerc", "[run]\nconcurrency = gevent, eventlet\n")
with pytest.raises(ConfigError, match="Conflicting concurrency settings: eventlet, gevent"):
self.command_line("run prog.py")
- def test_multiprocessing_needs_config_file(self):
+ def test_multiprocessing_needs_config_file(self) -> None:
with pytest.raises(ConfigError, match="multiprocessing requires a configuration file"):
self.command_line("run --concurrency=multiprocessing prog.py")
@@ -372,9 +382,9 @@ class WithoutConcurrencyModuleTest(CoverageTest):
"""Tests of what happens if the requested concurrency isn't installed."""
@pytest.mark.parametrize("module", ["eventlet", "gevent", "greenlet"])
- def test_missing_module(self, module):
+ def test_missing_module(self, module: str) -> None:
self.make_file("prog.py", "a = 1")
- sys.modules[module] = None
+ sys.modules[module] = None # type: ignore[assignment]
msg = f"Couldn't trace with concurrency={module}, the module isn't installed."
with pytest.raises(ConfigError, match=msg):
self.command_line(f"run --concurrency={module} prog.py")
@@ -428,9 +438,9 @@ MULTI_CODE = """
@pytest.fixture(params=["fork", "spawn"], name="start_method")
-def start_method_fixture(request):
+def start_method_fixture(request: pytest.FixtureRequest) -> str:
"""Parameterized fixture to choose the start_method for multiprocessing."""
- start_method = request.param
+ start_method: str = request.param
if start_method not in multiprocessing.get_all_start_methods():
# Windows doesn't support "fork".
pytest.skip(f"start_method={start_method} not supported here")
@@ -443,14 +453,14 @@ class MultiprocessingTest(CoverageTest):
def try_multiprocessing_code(
self,
- code,
- expected_out,
- the_module,
- nprocs,
- start_method,
- concurrency="multiprocessing",
- args="",
- ):
+ code: str,
+ expected_out: Optional[str],
+ the_module: ModuleType,
+ nprocs: int,
+ start_method: str,
+ concurrency: str="multiprocessing",
+ args: str="",
+ ) -> None:
"""Run code using multiprocessing, it should produce `expected_out`."""
self.make_file("multi.py", code)
self.make_file(".coveragerc", f"""\
@@ -459,9 +469,7 @@ class MultiprocessingTest(CoverageTest):
source = .
""")
- cmd = "coverage run {args} multi.py {start_method}".format(
- args=args, start_method=start_method,
- )
+ cmd = f"coverage run {args} multi.py {start_method}"
out = self.run_command(cmd)
expected_cant_trace = cant_trace_msg(concurrency, the_module)
@@ -489,7 +497,7 @@ class MultiprocessingTest(CoverageTest):
last_line = self.squeezed_lines(out)[-1]
assert re.search(r"TOTAL \d+ 0 100%", last_line)
- def test_multiprocessing_simple(self, start_method):
+ def test_multiprocessing_simple(self, start_method: str) -> None:
nprocs = 3
upto = 30
code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto)
@@ -503,7 +511,7 @@ class MultiprocessingTest(CoverageTest):
start_method=start_method,
)
- def test_multiprocessing_append(self, start_method):
+ def test_multiprocessing_append(self, start_method: str) -> None:
nprocs = 3
upto = 30
code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto)
@@ -518,7 +526,7 @@ class MultiprocessingTest(CoverageTest):
start_method=start_method,
)
- def test_multiprocessing_and_gevent(self, start_method):
+ def test_multiprocessing_and_gevent(self, start_method: str) -> None:
nprocs = 3
upto = 30
code = (
@@ -535,7 +543,7 @@ class MultiprocessingTest(CoverageTest):
start_method=start_method,
)
- def test_multiprocessing_with_branching(self, start_method):
+ def test_multiprocessing_with_branching(self, start_method: str) -> None:
nprocs = 3
upto = 30
code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto)
@@ -559,7 +567,7 @@ class MultiprocessingTest(CoverageTest):
last_line = self.squeezed_lines(out)[-1]
assert re.search(r"TOTAL \d+ 0 \d+ 0 100%", last_line)
- def test_multiprocessing_bootstrap_error_handling(self):
+ def test_multiprocessing_bootstrap_error_handling(self) -> None:
# An exception during bootstrapping will be reported.
self.make_file("multi.py", """\
import multiprocessing
@@ -576,7 +584,7 @@ class MultiprocessingTest(CoverageTest):
assert "Exception during multiprocessing bootstrap init" in out
assert "Exception: Crashing because called by _bootstrap" in out
- def test_bug_890(self):
+ def test_bug_890(self) -> None:
# chdir in multiprocessing shouldn't keep us from finding the
# .coveragerc file.
self.make_file("multi.py", """\
@@ -596,11 +604,11 @@ class MultiprocessingTest(CoverageTest):
assert out.splitlines()[-1] == "ok"
-def test_coverage_stop_in_threads():
+def test_coverage_stop_in_threads() -> None:
has_started_coverage = []
has_stopped_coverage = []
- def run_thread(): # pragma: nested
+ def run_thread() -> None: # pragma: nested
"""Check that coverage is stopping properly in threads."""
deadline = time.time() + 5
ident = threading.current_thread().ident
@@ -648,7 +656,7 @@ def test_thread_safe_save_data(tmp_path: pathlib.Path) -> None:
for module_name in module_names:
import_local_file(module_name)
- def random_load(): # pragma: nested
+ def random_load() -> None: # pragma: nested
"""Import modules randomly to stress coverage."""
while should_run[0]:
module_name = random.choice(module_names)
@@ -695,7 +703,7 @@ class SigtermTest(CoverageTest):
"""Tests of our handling of SIGTERM."""
@pytest.mark.parametrize("sigterm", [False, True])
- def test_sigterm_saves_data(self, sigterm):
+ def test_sigterm_saves_data(self, sigterm: bool) -> None:
# A terminated process should save its coverage data.
self.make_file("clobbered.py", """\
import multiprocessing
@@ -741,7 +749,7 @@ class SigtermTest(CoverageTest):
expected = "clobbered.py 17 5 71% 5-10"
assert self.squeezed_lines(out)[2] == expected
- def test_sigterm_still_runs(self):
+ def test_sigterm_still_runs(self) -> None:
# A terminated process still runs its own SIGTERM handler.
self.make_file("handler.py", """\
import multiprocessing
diff --git a/tests/test_python.py b/tests/test_python.py
index c8c58f4e..14dbebef 100644
--- a/tests/test_python.py
+++ b/tests/test_python.py
@@ -24,13 +24,14 @@ class GetZipBytesTest(CoverageTest):
"encoding",
["utf-8", "gb2312", "hebrew", "shift_jis", "cp1252"],
)
- def test_get_encoded_zip_files(self, encoding):
+ def test_get_encoded_zip_files(self, encoding: str) -> None:
# See igor.py, do_zipmods, for the text of these files.
zip_file = "tests/zipmods.zip"
sys.path.append(zip_file) # So we can import the files.
filename = zip_file + "/encoded_" + encoding + ".py"
filename = os_sep(filename)
zip_data = get_zip_bytes(filename)
+ assert zip_data is not None
zip_text = zip_data.decode(encoding)
assert 'All OK' in zip_text
# Run the code to see that we really got it encoded properly.
diff --git a/tox.ini b/tox.ini
index a5cf258c..fc255dc2 100644
--- a/tox.ini
+++ b/tox.ini
@@ -101,8 +101,9 @@ setenv =
C_FN=coverage/files.py coverage/inorout.py coverage/jsonreport.py coverage/lcovreport.py coverage/multiproc.py coverage/numbits.py
C_OP=coverage/parser.py coverage/phystokens.py coverage/plugin.py coverage/plugin_support.py coverage/python.py
C_QZ=coverage/report.py coverage/results.py coverage/sqldata.py coverage/tomlconfig.py coverage/types.py coverage/version.py coverage/xmlreport.py
- T_AN=tests/test_annotate.py tests/test_api.py tests/test_arcs.py tests/test_cmdline.py tests/test_collector.py tests/goldtest.py tests/helpers.py tests/test_html.py tests/test_xml.py
- TYPEABLE={env:C__B} {env:C_CC} {env:C_DE} {env:C_FN} {env:C_OP} {env:C_QZ} {env:T_AN}
+ T_AC=tests/test_annotate.py tests/test_api.py tests/test_arcs.py tests/test_cmdline.py tests/test_collector.py tests/test_concurrency.py
+ T_DZ=tests/goldtest.py tests/helpers.py tests/test_html.py tests/test_python.py tests/test_xml.py
+ TYPEABLE={env:C__B} {env:C_CC} {env:C_DE} {env:C_FN} {env:C_OP} {env:C_QZ} {env:T_AC} {env:T_DZ}
commands =
# PYVERSIONS