diff options
Diffstat (limited to 'coverage/control.py')
-rw-r--r-- | coverage/control.py | 137 |
1 files changed, 73 insertions, 64 deletions
diff --git a/coverage/control.py b/coverage/control.py index 37e61cfb..69db200b 100644 --- a/coverage/control.py +++ b/coverage/control.py @@ -3,18 +3,24 @@ """Core control stuff for coverage.py.""" +from __future__ import annotations + import atexit import collections import contextlib import os import os.path import platform +import re import signal import sys import threading import time import warnings +from types import FrameType +from typing import Any, Callable, Dict, Generator, List, Optional, Union + from coverage import env from coverage.annotate import AnnotateReporter from coverage.collector import Collector, CTracer @@ -22,7 +28,7 @@ from coverage.config import read_coverage_config from coverage.context import should_start_context_test_function, combine_context_switchers from coverage.data import CoverageData, combine_parallel_data from coverage.debug import DebugControl, short_stack, write_formatted_info -from coverage.disposition import disposition_debug_msg +from coverage.disposition import FileDisposition, disposition_debug_msg from coverage.exceptions import ConfigError, CoverageException, CoverageWarning, PluginError from coverage.files import PathAliases, abs_file, relative_filename, set_relative_directory from coverage.html import HtmlReporter @@ -37,18 +43,20 @@ from coverage.python import PythonFileReporter from coverage.report import render_report from coverage.results import Analysis from coverage.summary import SummaryReporter +from coverage.types import TConfigurable, TConfigSection, TConfigValue, TSysInfo from coverage.xmlreport import XmlReporter try: from coverage.multiproc import patch_multiprocessing + has_patch_multiprocessing = True except ImportError: # pragma: only jython # Jython has no multiprocessing module. - patch_multiprocessing = None + has_patch_multiprocessing = False os = isolate_module(os) @contextlib.contextmanager -def override_config(cov, **kwargs): +def override_config(cov: Coverage, **kwargs: Any) -> Generator[None, None, None]: """Temporarily tweak the configuration of `cov`. The arguments are applied to `cov.config` with the `from_args` method. @@ -66,7 +74,7 @@ def override_config(cov, **kwargs): DEFAULT_DATAFILE = DefaultValue("MISSING") _DEFAULT_DATAFILE = DEFAULT_DATAFILE # Just in case, for backwards compatibility -class Coverage: +class Coverage(TConfigurable): """Programmatic access to coverage.py. To use:: @@ -88,10 +96,10 @@ class Coverage: """ # The stack of started Coverage instances. - _instances = [] + _instances: List[Coverage] = [] @classmethod - def current(cls): + def current(cls) -> Optional[Coverage]: """Get the latest started `Coverage` instance, if any. Returns: a `Coverage` instance, or None. @@ -122,7 +130,7 @@ class Coverage: check_preimported=False, context=None, messages=False, - ): # pylint: disable=too-many-arguments + ) -> None: # pylint: disable=too-many-arguments """ Many of these arguments duplicate and override values that can be provided in a configuration file. Parameters that are missing here @@ -217,8 +225,6 @@ class Coverage: if data_file is DEFAULT_DATAFILE: data_file = None - self.config = None - # This is injectable by tests. self._debug_file = None @@ -229,20 +235,22 @@ class Coverage: self._warn_no_data = True self._warn_unimported_source = True self._warn_preimported_source = check_preimported - self._no_warn_slugs = None + self._no_warn_slugs: List[str] = [] self._messages = messages # A record of all the warnings that have been issued. - self._warnings = [] + self._warnings: List[str] = [] # Other instance attributes, set later. - self._data = self._collector = None - self._plugins = None - self._inorout = None + self._debug: DebugControl + self._plugins: Plugins + self._inorout: InOrOut + self._data: CoverageData + self._collector: Collector + self._file_mapper: Callable[[str], str] + self._data_suffix = self._run_suffix = None - self._exclude_re = None - self._debug = None - self._file_mapper = None + self._exclude_re: Dict[str, re.Pattern[str]] = {} self._old_sigterm = None # State machine variables: @@ -282,7 +290,7 @@ class Coverage: if not env.METACOV: _prevent_sub_process_measurement() - def _init(self): + def _init(self) -> None: """Set all the initial state. This is called by the public methods to initialize state. This lets us @@ -322,7 +330,7 @@ class Coverage: # this is a bit childish. :) plugin.configure([self, self.config][int(time.time()) % 2]) - def _post_init(self): + def _post_init(self) -> None: """Stuff to do after everything is initialized.""" if self._should_write_debug: self._should_write_debug = False @@ -333,7 +341,7 @@ class Coverage: if self.config._crash and self.config._crash in short_stack(limit=4): raise Exception(f"Crashing because called by {self.config._crash}") - def _write_startup_debug(self): + def _write_startup_debug(self) -> None: """Write out debug info at startup if needed.""" wrote_any = False with self._debug.without_callers(): @@ -357,7 +365,7 @@ class Coverage: if wrote_any: write_formatted_info(self._debug.write, "end", ()) - def _should_trace(self, filename, frame): + def _should_trace(self, filename: str, frame: FrameType) -> FileDisposition: """Decide whether to trace execution in `filename`. Calls `_should_trace_internal`, and returns the FileDisposition. @@ -368,7 +376,7 @@ class Coverage: self._debug.write(disposition_debug_msg(disp)) return disp - def _check_include_omit_etc(self, filename, frame): + def _check_include_omit_etc(self, filename: str, frame: FrameType) -> bool: """Check a file name against the include/omit/etc, rules, verbosely. Returns a boolean: True if the file should be traced, False if not. @@ -384,7 +392,7 @@ class Coverage: return not reason - def _warn(self, msg, slug=None, once=False): + def _warn(self, msg: str, slug: Optional[str]=None, once: bool=False) -> None: """Use `msg` as a warning. For warning suppression, use `slug` as the shorthand. @@ -393,31 +401,32 @@ class Coverage: slug.) """ - if self._no_warn_slugs is None: - if self.config is not None: + if not self._no_warn_slugs: + # _warn() can be called before self.config is set in __init__... + if hasattr(self, "config"): self._no_warn_slugs = list(self.config.disable_warnings) - if self._no_warn_slugs is not None: - if slug in self._no_warn_slugs: - # Don't issue the warning - return + if slug in self._no_warn_slugs: + # Don't issue the warning + return self._warnings.append(msg) if slug: msg = f"{msg} ({slug})" - if self._debug is not None and self._debug.should('pid'): + if hasattr(self, "_debug") and self._debug.should('pid'): msg = f"[{os.getpid()}] {msg}" warnings.warn(msg, category=CoverageWarning, stacklevel=2) if once: + assert slug is not None self._no_warn_slugs.append(slug) - def _message(self, msg): + def _message(self, msg: str) -> None: """Write a message to the user, if configured to do so.""" if self._messages: print(msg) - def get_option(self, option_name): + def get_option(self, option_name: str) -> Optional[TConfigValue]: """Get an option from the configuration. `option_name` is a colon-separated string indicating the section and @@ -428,14 +437,14 @@ class Coverage: selected. As a special case, an `option_name` of ``"paths"`` will return an - OrderedDict with the entire ``[paths]`` section value. + dictionary with the entire ``[paths]`` section value. .. versionadded:: 4.0 """ return self.config.get_option(option_name) - def set_option(self, option_name, value): + def set_option(self, option_name: str, value: Union[TConfigValue, TConfigSection]) -> None: """Set an option in the configuration. `option_name` is a colon-separated string indicating the section and @@ -460,17 +469,17 @@ class Coverage: branch = True As a special case, an `option_name` of ``"paths"`` will replace the - entire ``[paths]`` section. The value should be an OrderedDict. + entire ``[paths]`` section. The value should be a dictionary. .. versionadded:: 4.0 """ self.config.set_option(option_name, value) - def load(self): + def load(self) -> None: """Load previously-collected coverage data from the data file.""" self._init() - if self._collector: + if hasattr(self, "_collector"): self._collector.reset() should_skip = self.config.parallel and not os.path.exists(self.config.data_file) if not should_skip: @@ -479,12 +488,12 @@ class Coverage: if not should_skip: self._data.read() - def _init_for_start(self): + def _init_for_start(self) -> None: """Initialization for start()""" # Construct the collector. - concurrency = self.config.concurrency or [] + concurrency: List[str] = self.config.concurrency or [] if "multiprocessing" in concurrency: - if not patch_multiprocessing: + if not has_patch_multiprocessing: raise ConfigError( # pragma: only jython "multiprocessing is not supported on this Python" ) @@ -550,11 +559,11 @@ class Coverage: # Create the file classifying substructure. self._inorout = InOrOut( + config=self.config, warn=self._warn, debug=(self._debug if self._debug.should('trace') else None), include_namespace_packages=self.config.include_namespace_packages, ) - self._inorout.configure(self.config) self._inorout.plugins = self._plugins self._inorout.disp_class = self._collector.file_disposition_class @@ -573,7 +582,7 @@ class Coverage: def _init_data(self, suffix): """Create a data file if we don't have one yet.""" - if self._data is None: + if not hasattr(self, "_data"): # Create the data file. We do this at construction time so that the # data file will be written into the directory where the process # started rather than wherever the process eventually chdir'd to. @@ -586,7 +595,7 @@ class Coverage: no_disk=self._no_disk, ) - def start(self): + def start(self) -> None: """Start measuring code coverage. Coverage measurement only occurs in functions called after @@ -618,7 +627,7 @@ class Coverage: self._started = True self._instances.append(self) - def stop(self): + def stop(self) -> None: """Stop measuring code coverage.""" if self._instances: if self._instances[-1] is self: @@ -627,7 +636,7 @@ class Coverage: self._collector.stop() self._started = False - def _atexit(self, event="atexit"): + def _atexit(self, event="atexit") -> None: """Clean up on process shutdown.""" if self._debug.should("process"): self._debug.write(f"{event}: pid: {os.getpid()}, instance: {self!r}") @@ -636,7 +645,7 @@ class Coverage: if self._auto_save: self.save() - def _on_sigterm(self, signum_unused, frame_unused): + def _on_sigterm(self, signum_unused, frame_unused) -> None: """A handler for signal.SIGTERM.""" self._atexit("sigterm") # Statements after here won't be seen by metacov because we just wrote @@ -644,7 +653,7 @@ class Coverage: signal.signal(signal.SIGTERM, self._old_sigterm) # pragma: not covered os.kill(os.getpid(), signal.SIGTERM) # pragma: not covered - def erase(self): + def erase(self) -> None: """Erase previously collected coverage data. This removes the in-memory data collected in this session as well as @@ -653,14 +662,14 @@ class Coverage: """ self._init() self._post_init() - if self._collector: + if hasattr(self, "_collector"): self._collector.reset() self._init_data(suffix=None) self._data.erase(parallel=self.config.parallel) - self._data = None + del self._data self._inited_for_start = False - def switch_context(self, new_context): + def switch_context(self, new_context) -> None: """Switch to a new dynamic context. `new_context` is a string to use as the :ref:`dynamic context @@ -681,13 +690,13 @@ class Coverage: self._collector.switch_context(new_context) - def clear_exclude(self, which='exclude'): + def clear_exclude(self, which='exclude') -> None: """Clear the exclude list.""" self._init() setattr(self.config, which + "_list", []) self._exclude_regex_stale() - def exclude(self, regex, which='exclude'): + def exclude(self, regex, which='exclude') -> None: """Exclude source lines from execution consideration. A number of lists of regular expressions are maintained. Each list @@ -707,7 +716,7 @@ class Coverage: excl_list.append(regex) self._exclude_regex_stale() - def _exclude_regex_stale(self): + def _exclude_regex_stale(self) -> None: """Drop all the compiled exclusion regexes, a list was modified.""" self._exclude_re.clear() @@ -728,7 +737,7 @@ class Coverage: self._init() return getattr(self.config, which + "_list") - def save(self): + def save(self) -> None: """Save the collected coverage data to the data file.""" data = self.get_data() data.write() @@ -745,7 +754,7 @@ class Coverage: aliases.add(pattern, result) return aliases - def combine(self, data_paths=None, strict=False, keep=False): + def combine(self, data_paths=None, strict=False, keep=False) -> None: """Combine together a number of similarly-named coverage data files. All coverage data files whose name starts with `data_file` (from the @@ -803,12 +812,12 @@ class Coverage: if not plugin._coverage_enabled: self._collector.plugin_was_disabled(plugin) - if self._collector and self._collector.flush_data(): + if hasattr(self, "_collector") and self._collector.flush_data(): self._post_save_work() return self._data - def _post_save_work(self): + def _post_save_work(self) -> None: """After saving data, look for warnings, post-work, etc. Warn about things that should have happened but didn't. @@ -928,7 +937,7 @@ class Coverage: file_reporters = [self._get_file_reporter(morf) for morf in morfs] return file_reporters - def _prepare_data_for_reporting(self): + def _prepare_data_for_reporting(self) -> None: """Re-map data before reporting, to get implicit 'combine' behavior.""" if self.config.paths: mapped_data = CoverageData(warn=self._warn, debug=self._debug, no_disk=True) @@ -1213,7 +1222,7 @@ class Coverage: ): return render_report(self.config.lcov_output, LcovReporter(self), morfs, self._message) - def sys_info(self): + def sys_info(self) -> TSysInfo: """Return a list of (key, value) pairs showing internal information.""" import coverage as covmod @@ -1234,7 +1243,7 @@ class Coverage: info = [ ('coverage_version', covmod.__version__), ('coverage_module', covmod.__file__), - ('tracer', self._collector.tracer_name() if self._collector else "-none-"), + ('tracer', self._collector.tracer_name() if hasattr(self, "_collector") else "-none-"), ('CTracer', 'available' if CTracer else "unavailable"), ('plugins.file_tracers', plugin_info(self._plugins.file_tracers)), ('plugins.configurers', plugin_info(self._plugins.configurers)), @@ -1245,7 +1254,7 @@ class Coverage: ('config_contents', repr(self.config._config_contents) if self.config._config_contents else '-none-' ), - ('data_file', self._data.data_filename() if self._data is not None else "-none-"), + ('data_file', self._data.data_filename() if hasattr(self, "_data") else "-none-"), ('python', sys.version.replace('\n', '')), ('platform', platform.platform()), ('implementation', platform.python_implementation()), @@ -1266,7 +1275,7 @@ class Coverage: ('command_line', " ".join(getattr(sys, 'argv', ['-none-']))), ] - if self._inorout: + if hasattr(self, "_inorout"): info.extend(self._inorout.sys_info()) info.extend(CoverageData.sys_info()) @@ -1282,7 +1291,7 @@ if int(os.environ.get("COVERAGE_DEBUG_CALLS", 0)): # pragma: debugg Coverage = decorate_methods(show_calls(show_args=True), butnot=['get_data'])(Coverage) -def process_startup(): +def process_startup() -> None: """Call this at Python start-up to perhaps measure coverage. If the environment variable COVERAGE_PROCESS_START is defined, coverage @@ -1335,7 +1344,7 @@ def process_startup(): return cov -def _prevent_sub_process_measurement(): +def _prevent_sub_process_measurement() -> None: """Stop any subprocess auto-measurement from writing data.""" auto_created_coverage = getattr(process_startup, "coverage", None) if auto_created_coverage is not None: |