summaryrefslogtreecommitdiff
path: root/coverage/control.py
diff options
context:
space:
mode:
Diffstat (limited to 'coverage/control.py')
-rw-r--r--coverage/control.py137
1 files changed, 73 insertions, 64 deletions
diff --git a/coverage/control.py b/coverage/control.py
index 37e61cfb..69db200b 100644
--- a/coverage/control.py
+++ b/coverage/control.py
@@ -3,18 +3,24 @@
"""Core control stuff for coverage.py."""
+from __future__ import annotations
+
import atexit
import collections
import contextlib
import os
import os.path
import platform
+import re
import signal
import sys
import threading
import time
import warnings
+from types import FrameType
+from typing import Any, Callable, Dict, Generator, List, Optional, Union
+
from coverage import env
from coverage.annotate import AnnotateReporter
from coverage.collector import Collector, CTracer
@@ -22,7 +28,7 @@ from coverage.config import read_coverage_config
from coverage.context import should_start_context_test_function, combine_context_switchers
from coverage.data import CoverageData, combine_parallel_data
from coverage.debug import DebugControl, short_stack, write_formatted_info
-from coverage.disposition import disposition_debug_msg
+from coverage.disposition import FileDisposition, disposition_debug_msg
from coverage.exceptions import ConfigError, CoverageException, CoverageWarning, PluginError
from coverage.files import PathAliases, abs_file, relative_filename, set_relative_directory
from coverage.html import HtmlReporter
@@ -37,18 +43,20 @@ from coverage.python import PythonFileReporter
from coverage.report import render_report
from coverage.results import Analysis
from coverage.summary import SummaryReporter
+from coverage.types import TConfigurable, TConfigSection, TConfigValue, TSysInfo
from coverage.xmlreport import XmlReporter
try:
from coverage.multiproc import patch_multiprocessing
+ has_patch_multiprocessing = True
except ImportError: # pragma: only jython
# Jython has no multiprocessing module.
- patch_multiprocessing = None
+ has_patch_multiprocessing = False
os = isolate_module(os)
@contextlib.contextmanager
-def override_config(cov, **kwargs):
+def override_config(cov: Coverage, **kwargs: Any) -> Generator[None, None, None]:
"""Temporarily tweak the configuration of `cov`.
The arguments are applied to `cov.config` with the `from_args` method.
@@ -66,7 +74,7 @@ def override_config(cov, **kwargs):
DEFAULT_DATAFILE = DefaultValue("MISSING")
_DEFAULT_DATAFILE = DEFAULT_DATAFILE # Just in case, for backwards compatibility
-class Coverage:
+class Coverage(TConfigurable):
"""Programmatic access to coverage.py.
To use::
@@ -88,10 +96,10 @@ class Coverage:
"""
# The stack of started Coverage instances.
- _instances = []
+ _instances: List[Coverage] = []
@classmethod
- def current(cls):
+ def current(cls) -> Optional[Coverage]:
"""Get the latest started `Coverage` instance, if any.
Returns: a `Coverage` instance, or None.
@@ -122,7 +130,7 @@ class Coverage:
check_preimported=False,
context=None,
messages=False,
- ): # pylint: disable=too-many-arguments
+ ) -> None: # pylint: disable=too-many-arguments
"""
Many of these arguments duplicate and override values that can be
provided in a configuration file. Parameters that are missing here
@@ -217,8 +225,6 @@ class Coverage:
if data_file is DEFAULT_DATAFILE:
data_file = None
- self.config = None
-
# This is injectable by tests.
self._debug_file = None
@@ -229,20 +235,22 @@ class Coverage:
self._warn_no_data = True
self._warn_unimported_source = True
self._warn_preimported_source = check_preimported
- self._no_warn_slugs = None
+ self._no_warn_slugs: List[str] = []
self._messages = messages
# A record of all the warnings that have been issued.
- self._warnings = []
+ self._warnings: List[str] = []
# Other instance attributes, set later.
- self._data = self._collector = None
- self._plugins = None
- self._inorout = None
+ self._debug: DebugControl
+ self._plugins: Plugins
+ self._inorout: InOrOut
+ self._data: CoverageData
+ self._collector: Collector
+ self._file_mapper: Callable[[str], str]
+
self._data_suffix = self._run_suffix = None
- self._exclude_re = None
- self._debug = None
- self._file_mapper = None
+ self._exclude_re: Dict[str, re.Pattern[str]] = {}
self._old_sigterm = None
# State machine variables:
@@ -282,7 +290,7 @@ class Coverage:
if not env.METACOV:
_prevent_sub_process_measurement()
- def _init(self):
+ def _init(self) -> None:
"""Set all the initial state.
This is called by the public methods to initialize state. This lets us
@@ -322,7 +330,7 @@ class Coverage:
# this is a bit childish. :)
plugin.configure([self, self.config][int(time.time()) % 2])
- def _post_init(self):
+ def _post_init(self) -> None:
"""Stuff to do after everything is initialized."""
if self._should_write_debug:
self._should_write_debug = False
@@ -333,7 +341,7 @@ class Coverage:
if self.config._crash and self.config._crash in short_stack(limit=4):
raise Exception(f"Crashing because called by {self.config._crash}")
- def _write_startup_debug(self):
+ def _write_startup_debug(self) -> None:
"""Write out debug info at startup if needed."""
wrote_any = False
with self._debug.without_callers():
@@ -357,7 +365,7 @@ class Coverage:
if wrote_any:
write_formatted_info(self._debug.write, "end", ())
- def _should_trace(self, filename, frame):
+ def _should_trace(self, filename: str, frame: FrameType) -> FileDisposition:
"""Decide whether to trace execution in `filename`.
Calls `_should_trace_internal`, and returns the FileDisposition.
@@ -368,7 +376,7 @@ class Coverage:
self._debug.write(disposition_debug_msg(disp))
return disp
- def _check_include_omit_etc(self, filename, frame):
+ def _check_include_omit_etc(self, filename: str, frame: FrameType) -> bool:
"""Check a file name against the include/omit/etc, rules, verbosely.
Returns a boolean: True if the file should be traced, False if not.
@@ -384,7 +392,7 @@ class Coverage:
return not reason
- def _warn(self, msg, slug=None, once=False):
+ def _warn(self, msg: str, slug: Optional[str]=None, once: bool=False) -> None:
"""Use `msg` as a warning.
For warning suppression, use `slug` as the shorthand.
@@ -393,31 +401,32 @@ class Coverage:
slug.)
"""
- if self._no_warn_slugs is None:
- if self.config is not None:
+ if not self._no_warn_slugs:
+ # _warn() can be called before self.config is set in __init__...
+ if hasattr(self, "config"):
self._no_warn_slugs = list(self.config.disable_warnings)
- if self._no_warn_slugs is not None:
- if slug in self._no_warn_slugs:
- # Don't issue the warning
- return
+ if slug in self._no_warn_slugs:
+ # Don't issue the warning
+ return
self._warnings.append(msg)
if slug:
msg = f"{msg} ({slug})"
- if self._debug is not None and self._debug.should('pid'):
+ if hasattr(self, "_debug") and self._debug.should('pid'):
msg = f"[{os.getpid()}] {msg}"
warnings.warn(msg, category=CoverageWarning, stacklevel=2)
if once:
+ assert slug is not None
self._no_warn_slugs.append(slug)
- def _message(self, msg):
+ def _message(self, msg: str) -> None:
"""Write a message to the user, if configured to do so."""
if self._messages:
print(msg)
- def get_option(self, option_name):
+ def get_option(self, option_name: str) -> Optional[TConfigValue]:
"""Get an option from the configuration.
`option_name` is a colon-separated string indicating the section and
@@ -428,14 +437,14 @@ class Coverage:
selected.
As a special case, an `option_name` of ``"paths"`` will return an
- OrderedDict with the entire ``[paths]`` section value.
+ dictionary with the entire ``[paths]`` section value.
.. versionadded:: 4.0
"""
return self.config.get_option(option_name)
- def set_option(self, option_name, value):
+ def set_option(self, option_name: str, value: Union[TConfigValue, TConfigSection]) -> None:
"""Set an option in the configuration.
`option_name` is a colon-separated string indicating the section and
@@ -460,17 +469,17 @@ class Coverage:
branch = True
As a special case, an `option_name` of ``"paths"`` will replace the
- entire ``[paths]`` section. The value should be an OrderedDict.
+ entire ``[paths]`` section. The value should be a dictionary.
.. versionadded:: 4.0
"""
self.config.set_option(option_name, value)
- def load(self):
+ def load(self) -> None:
"""Load previously-collected coverage data from the data file."""
self._init()
- if self._collector:
+ if hasattr(self, "_collector"):
self._collector.reset()
should_skip = self.config.parallel and not os.path.exists(self.config.data_file)
if not should_skip:
@@ -479,12 +488,12 @@ class Coverage:
if not should_skip:
self._data.read()
- def _init_for_start(self):
+ def _init_for_start(self) -> None:
"""Initialization for start()"""
# Construct the collector.
- concurrency = self.config.concurrency or []
+ concurrency: List[str] = self.config.concurrency or []
if "multiprocessing" in concurrency:
- if not patch_multiprocessing:
+ if not has_patch_multiprocessing:
raise ConfigError( # pragma: only jython
"multiprocessing is not supported on this Python"
)
@@ -550,11 +559,11 @@ class Coverage:
# Create the file classifying substructure.
self._inorout = InOrOut(
+ config=self.config,
warn=self._warn,
debug=(self._debug if self._debug.should('trace') else None),
include_namespace_packages=self.config.include_namespace_packages,
)
- self._inorout.configure(self.config)
self._inorout.plugins = self._plugins
self._inorout.disp_class = self._collector.file_disposition_class
@@ -573,7 +582,7 @@ class Coverage:
def _init_data(self, suffix):
"""Create a data file if we don't have one yet."""
- if self._data is None:
+ if not hasattr(self, "_data"):
# Create the data file. We do this at construction time so that the
# data file will be written into the directory where the process
# started rather than wherever the process eventually chdir'd to.
@@ -586,7 +595,7 @@ class Coverage:
no_disk=self._no_disk,
)
- def start(self):
+ def start(self) -> None:
"""Start measuring code coverage.
Coverage measurement only occurs in functions called after
@@ -618,7 +627,7 @@ class Coverage:
self._started = True
self._instances.append(self)
- def stop(self):
+ def stop(self) -> None:
"""Stop measuring code coverage."""
if self._instances:
if self._instances[-1] is self:
@@ -627,7 +636,7 @@ class Coverage:
self._collector.stop()
self._started = False
- def _atexit(self, event="atexit"):
+ def _atexit(self, event="atexit") -> None:
"""Clean up on process shutdown."""
if self._debug.should("process"):
self._debug.write(f"{event}: pid: {os.getpid()}, instance: {self!r}")
@@ -636,7 +645,7 @@ class Coverage:
if self._auto_save:
self.save()
- def _on_sigterm(self, signum_unused, frame_unused):
+ def _on_sigterm(self, signum_unused, frame_unused) -> None:
"""A handler for signal.SIGTERM."""
self._atexit("sigterm")
# Statements after here won't be seen by metacov because we just wrote
@@ -644,7 +653,7 @@ class Coverage:
signal.signal(signal.SIGTERM, self._old_sigterm) # pragma: not covered
os.kill(os.getpid(), signal.SIGTERM) # pragma: not covered
- def erase(self):
+ def erase(self) -> None:
"""Erase previously collected coverage data.
This removes the in-memory data collected in this session as well as
@@ -653,14 +662,14 @@ class Coverage:
"""
self._init()
self._post_init()
- if self._collector:
+ if hasattr(self, "_collector"):
self._collector.reset()
self._init_data(suffix=None)
self._data.erase(parallel=self.config.parallel)
- self._data = None
+ del self._data
self._inited_for_start = False
- def switch_context(self, new_context):
+ def switch_context(self, new_context) -> None:
"""Switch to a new dynamic context.
`new_context` is a string to use as the :ref:`dynamic context
@@ -681,13 +690,13 @@ class Coverage:
self._collector.switch_context(new_context)
- def clear_exclude(self, which='exclude'):
+ def clear_exclude(self, which='exclude') -> None:
"""Clear the exclude list."""
self._init()
setattr(self.config, which + "_list", [])
self._exclude_regex_stale()
- def exclude(self, regex, which='exclude'):
+ def exclude(self, regex, which='exclude') -> None:
"""Exclude source lines from execution consideration.
A number of lists of regular expressions are maintained. Each list
@@ -707,7 +716,7 @@ class Coverage:
excl_list.append(regex)
self._exclude_regex_stale()
- def _exclude_regex_stale(self):
+ def _exclude_regex_stale(self) -> None:
"""Drop all the compiled exclusion regexes, a list was modified."""
self._exclude_re.clear()
@@ -728,7 +737,7 @@ class Coverage:
self._init()
return getattr(self.config, which + "_list")
- def save(self):
+ def save(self) -> None:
"""Save the collected coverage data to the data file."""
data = self.get_data()
data.write()
@@ -745,7 +754,7 @@ class Coverage:
aliases.add(pattern, result)
return aliases
- def combine(self, data_paths=None, strict=False, keep=False):
+ def combine(self, data_paths=None, strict=False, keep=False) -> None:
"""Combine together a number of similarly-named coverage data files.
All coverage data files whose name starts with `data_file` (from the
@@ -803,12 +812,12 @@ class Coverage:
if not plugin._coverage_enabled:
self._collector.plugin_was_disabled(plugin)
- if self._collector and self._collector.flush_data():
+ if hasattr(self, "_collector") and self._collector.flush_data():
self._post_save_work()
return self._data
- def _post_save_work(self):
+ def _post_save_work(self) -> None:
"""After saving data, look for warnings, post-work, etc.
Warn about things that should have happened but didn't.
@@ -928,7 +937,7 @@ class Coverage:
file_reporters = [self._get_file_reporter(morf) for morf in morfs]
return file_reporters
- def _prepare_data_for_reporting(self):
+ def _prepare_data_for_reporting(self) -> None:
"""Re-map data before reporting, to get implicit 'combine' behavior."""
if self.config.paths:
mapped_data = CoverageData(warn=self._warn, debug=self._debug, no_disk=True)
@@ -1213,7 +1222,7 @@ class Coverage:
):
return render_report(self.config.lcov_output, LcovReporter(self), morfs, self._message)
- def sys_info(self):
+ def sys_info(self) -> TSysInfo:
"""Return a list of (key, value) pairs showing internal information."""
import coverage as covmod
@@ -1234,7 +1243,7 @@ class Coverage:
info = [
('coverage_version', covmod.__version__),
('coverage_module', covmod.__file__),
- ('tracer', self._collector.tracer_name() if self._collector else "-none-"),
+ ('tracer', self._collector.tracer_name() if hasattr(self, "_collector") else "-none-"),
('CTracer', 'available' if CTracer else "unavailable"),
('plugins.file_tracers', plugin_info(self._plugins.file_tracers)),
('plugins.configurers', plugin_info(self._plugins.configurers)),
@@ -1245,7 +1254,7 @@ class Coverage:
('config_contents',
repr(self.config._config_contents) if self.config._config_contents else '-none-'
),
- ('data_file', self._data.data_filename() if self._data is not None else "-none-"),
+ ('data_file', self._data.data_filename() if hasattr(self, "_data") else "-none-"),
('python', sys.version.replace('\n', '')),
('platform', platform.platform()),
('implementation', platform.python_implementation()),
@@ -1266,7 +1275,7 @@ class Coverage:
('command_line', " ".join(getattr(sys, 'argv', ['-none-']))),
]
- if self._inorout:
+ if hasattr(self, "_inorout"):
info.extend(self._inorout.sys_info())
info.extend(CoverageData.sys_info())
@@ -1282,7 +1291,7 @@ if int(os.environ.get("COVERAGE_DEBUG_CALLS", 0)): # pragma: debugg
Coverage = decorate_methods(show_calls(show_args=True), butnot=['get_data'])(Coverage)
-def process_startup():
+def process_startup() -> None:
"""Call this at Python start-up to perhaps measure coverage.
If the environment variable COVERAGE_PROCESS_START is defined, coverage
@@ -1335,7 +1344,7 @@ def process_startup():
return cov
-def _prevent_sub_process_measurement():
+def _prevent_sub_process_measurement() -> None:
"""Stop any subprocess auto-measurement from writing data."""
auto_created_coverage = getattr(process_startup, "coverage", None)
if auto_created_coverage is not None: