diff options
Diffstat (limited to 'coverage/collector.py')
-rw-r--r-- | coverage/collector.py | 136 |
1 files changed, 80 insertions, 56 deletions
diff --git a/coverage/collector.py b/coverage/collector.py index ef1d9b41..a3c537d6 100644 --- a/coverage/collector.py +++ b/coverage/collector.py @@ -3,16 +3,29 @@ """Raw data collector for coverage.py.""" +from __future__ import annotations + +import functools import os import sys +from types import FrameType +from typing import ( + cast, Any, Callable, Dict, List, Mapping, Optional, Set, Tuple, Type, TypeVar, +) + from coverage import env from coverage.config import CoverageConfig +from coverage.data import CoverageData from coverage.debug import short_stack from coverage.disposition import FileDisposition from coverage.exceptions import ConfigError from coverage.misc import human_sorted_items, isolate_module +from coverage.plugin import CoveragePlugin from coverage.pytracer import PyTracer +from coverage.types import ( + TArc, TFileDisposition, TLineNo, TTraceData, TTraceFn, TTracer, TWarnFn, +) os = isolate_module(os) @@ -20,6 +33,7 @@ os = isolate_module(os) try: # Use the C extension code when we can, for speed. from coverage.tracer import CTracer, CFileDisposition + HAS_CTRACER = True except ImportError: # Couldn't import the C extension, maybe it isn't built. if os.getenv('COVERAGE_TEST_TRACER') == 'c': # pragma: part covered @@ -31,8 +45,9 @@ except ImportError: # exception here causes all sorts of other noise in unittest. sys.stderr.write("*** COVERAGE_TEST_TRACER is 'c' but can't import CTracer!\n") sys.exit(1) - CTracer = None + HAS_CTRACER = False +T = TypeVar("T") class Collector: """Collects trace data. @@ -53,15 +68,22 @@ class Collector: # The stack of active Collectors. Collectors are added here when started, # and popped when stopped. Collectors on the stack are paused when not # the top, and resumed when they become the top again. - _collectors = [] + _collectors: List[Collector] = [] # The concurrency settings we support here. LIGHT_THREADS = {"greenlet", "eventlet", "gevent"} def __init__( - self, should_trace, check_include, should_start_context, file_mapper, - timid, branch, warn, concurrency, - ): + self, + should_trace: Callable[[str, FrameType], TFileDisposition], + check_include: Callable[[str, FrameType], bool], + should_start_context: Optional[Callable[[FrameType], Optional[str]]], + file_mapper: Callable[[str], str], + timid: bool, + branch: bool, + warn: TWarnFn, + concurrency: List[str], + ) -> None: """Create a collector. `should_trace` is a function, taking a file name and a frame, and @@ -107,28 +129,29 @@ class Collector: self.concurrency = concurrency assert isinstance(self.concurrency, list), f"Expected a list: {self.concurrency!r}" + self.covdata: CoverageData self.threading = None - self.covdata = None - self.static_context = None + self.static_context: Optional[str] = None self.origin = short_stack() self.concur_id_func = None - self.mapped_file_cache = {} - if timid: - # Being timid: use the simple Python trace function. - self._trace_class = PyTracer - else: - # Being fast: use the C Tracer if it is available, else the Python - # trace function. - self._trace_class = CTracer or PyTracer + self._trace_class: Type[TTracer] + self.file_disposition_class: Type[TFileDisposition] + + use_ctracer = False + if HAS_CTRACER and not timid: + use_ctracer = True - if self._trace_class is CTracer: + #if HAS_CTRACER and self._trace_class is CTracer: + if use_ctracer: + self._trace_class = CTracer self.file_disposition_class = CFileDisposition self.supports_plugins = True self.packed_arcs = True else: + self._trace_class = PyTracer self.file_disposition_class = FileDisposition self.supports_plugins = False self.packed_arcs = False @@ -182,22 +205,22 @@ class Collector: self.reset() - def __repr__(self): + def __repr__(self) -> str: return f"<Collector at 0x{id(self):x}: {self.tracer_name()}>" - def use_data(self, covdata, context): + def use_data(self, covdata: CoverageData, context: Optional[str]) -> None: """Use `covdata` for recording data.""" self.covdata = covdata self.static_context = context self.covdata.set_context(self.static_context) - def tracer_name(self): + def tracer_name(self) -> str: """Return the class name of the tracer we're using.""" return self._trace_class.__name__ - def _clear_data(self): + def _clear_data(self) -> None: """Clear out existing data, but stay ready for more collection.""" - # We used to used self.data.clear(), but that would remove filename + # We used to use self.data.clear(), but that would remove filename # keys and data values that were still in use higher up the stack # when we are called as part of switch_context. for d in self.data.values(): @@ -206,18 +229,16 @@ class Collector: for tracer in self.tracers: tracer.reset_activity() - def reset(self): + def reset(self) -> None: """Clear collected data, and prepare to collect more.""" - # A dictionary mapping file names to dicts with line number keys (if not - # branch coverage), or mapping file names to dicts with line number - # pairs as keys (if branch coverage). - self.data = {} + # The trace data we are collecting. + self.data: TTraceData = {} # type: ignore[assignment] # A dictionary mapping file names to file tracer plugin names that will # handle them. - self.file_tracers = {} + self.file_tracers: Dict[str, str] = {} - self.disabled_plugins = set() + self.disabled_plugins: Set[str] = set() # The .should_trace_cache attribute is a cache from file names to # coverage.FileDisposition objects, or None. When a file is first @@ -248,11 +269,11 @@ class Collector: self.should_trace_cache = {} # Our active Tracers. - self.tracers = [] + self.tracers: List[TTracer] = [] self._clear_data() - def _start_tracer(self): + def _start_tracer(self) -> TTraceFn: """Start a new Tracer object, and store it in self.tracers.""" tracer = self._trace_class() tracer.data = self.data @@ -271,6 +292,7 @@ class Collector: tracer.check_include = self.check_include if hasattr(tracer, 'should_start_context'): tracer.should_start_context = self.should_start_context + if hasattr(tracer, 'switch_context'): tracer.switch_context = self.switch_context if hasattr(tracer, 'disable_plugin'): tracer.disable_plugin = self.disable_plugin @@ -288,7 +310,7 @@ class Collector: # # New in 3.12: threading.settrace_all_threads: https://github.com/python/cpython/pull/96681 - def _installation_trace(self, frame, event, arg): + def _installation_trace(self, frame: FrameType, event: str, arg: Any) -> TTraceFn: """Called on new threads, installs the real tracer.""" # Remove ourselves as the trace function. sys.settrace(None) @@ -301,7 +323,7 @@ class Collector: # Return the new trace function to continue tracing in this scope. return fn - def start(self): + def start(self) -> None: """Start collecting trace information.""" if self._collectors: self._collectors[-1].pause() @@ -310,7 +332,7 @@ class Collector: # Check to see whether we had a fullcoverage tracer installed. If so, # get the stack frames it stashed away for us. - traces0 = [] + traces0: List[Tuple[Tuple[FrameType, str, Any], TLineNo]] = [] fn0 = sys.gettrace() if fn0: tracer0 = getattr(fn0, '__self__', None) @@ -341,7 +363,7 @@ class Collector: if self.threading: self.threading.settrace(self._installation_trace) - def stop(self): + def stop(self) -> None: """Stop collecting trace information.""" assert self._collectors if self._collectors[-1] is not self: @@ -360,7 +382,7 @@ class Collector: if self._collectors: self._collectors[-1].resume() - def pause(self): + def pause(self) -> None: """Pause tracing, but be prepared to `resume`.""" for tracer in self.tracers: tracer.stop() @@ -372,7 +394,7 @@ class Collector: if self.threading: self.threading.settrace(None) - def resume(self): + def resume(self) -> None: """Resume tracing after a `pause`.""" for tracer in self.tracers: tracer.start() @@ -381,7 +403,7 @@ class Collector: else: self._start_tracer() - def _activity(self): + def _activity(self) -> bool: """Has any activity been traced? Returns a boolean, True if any trace function was invoked. @@ -389,8 +411,9 @@ class Collector: """ return any(tracer.activity() for tracer in self.tracers) - def switch_context(self, new_context): + def switch_context(self, new_context: Optional[str]) -> None: """Switch to a new dynamic context.""" + context: Optional[str] self.flush_data() if self.static_context: context = self.static_context @@ -400,24 +423,22 @@ class Collector: context = new_context self.covdata.set_context(context) - def disable_plugin(self, disposition): + def disable_plugin(self, disposition: TFileDisposition) -> None: """Disable the plugin mentioned in `disposition`.""" file_tracer = disposition.file_tracer + assert file_tracer is not None plugin = file_tracer._coverage_plugin plugin_name = plugin._coverage_plugin_name self.warn(f"Disabling plug-in {plugin_name!r} due to previous exception") plugin._coverage_enabled = False disposition.trace = False - def cached_mapped_file(self, filename): + @functools.lru_cache(maxsize=0) + def cached_mapped_file(self, filename: str) -> str: """A locally cached version of file names mapped through file_mapper.""" - key = (type(filename), filename) - try: - return self.mapped_file_cache[key] - except KeyError: - return self.mapped_file_cache.setdefault(key, self.file_mapper(filename)) + return self.file_mapper(filename) - def mapped_file_dict(self, d): + def mapped_file_dict(self, d: Mapping[str, T]) -> Dict[str, T]: """Return a dict like d, but with keys modified by file_mapper.""" # The call to list(items()) ensures that the GIL protects the dictionary # iterator against concurrent modifications by tracers running @@ -431,16 +452,17 @@ class Collector: runtime_err = ex else: break - else: - raise runtime_err # pragma: cant happen + else: # pragma: cant happen + assert isinstance(runtime_err, Exception) + raise runtime_err return {self.cached_mapped_file(k): v for k, v in items} - def plugin_was_disabled(self, plugin): + def plugin_was_disabled(self, plugin: CoveragePlugin) -> None: """Record that `plugin` was disabled during the run.""" self.disabled_plugins.add(plugin._coverage_plugin_name) - def flush_data(self): + def flush_data(self) -> bool: """Save the collected data to our associated `CoverageData`. Data may have also been saved along the way. This forces the @@ -456,8 +478,9 @@ class Collector: # Unpack the line number pairs packed into integers. See # tracer.c:CTracer_record_pair for the C code that creates # these packed ints. - data = {} - for fname, packeds in self.data.items(): + arc_data: Dict[str, List[TArc]] = {} + packed_data = cast(Dict[str, Set[int]], self.data) + for fname, packeds in packed_data.items(): tuples = [] for packed in packeds: l1 = packed & 0xFFFFF @@ -467,12 +490,13 @@ class Collector: if packed & (1 << 41): l2 *= -1 tuples.append((l1, l2)) - data[fname] = tuples + arc_data[fname] = tuples else: - data = self.data - self.covdata.add_arcs(self.mapped_file_dict(data)) + arc_data = cast(Dict[str, List[TArc]], self.data) + self.covdata.add_arcs(self.mapped_file_dict(arc_data)) else: - self.covdata.add_lines(self.mapped_file_dict(self.data)) + line_data = cast(Dict[str, Set[int]], self.data) + self.covdata.add_lines(self.mapped_file_dict(line_data)) file_tracers = { k: v for k, v in self.file_tracers.items() |