summaryrefslogtreecommitdiff
path: root/coverage/control.py
diff options
context:
space:
mode:
Diffstat (limited to 'coverage/control.py')
-rw-r--r--coverage/control.py803
1 files changed, 239 insertions, 564 deletions
diff --git a/coverage/control.py b/coverage/control.py
index b82c8047..4f2afda0 100644
--- a/coverage/control.py
+++ b/coverage/control.py
@@ -1,36 +1,30 @@
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
-# For details: https://bitbucket.org/ned/coveragepy/src/default/NOTICE.txt
+# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
"""Core control stuff for coverage.py."""
-
import atexit
-import inspect
-import itertools
import os
import platform
-import re
import sys
import time
-import traceback
from coverage import env
from coverage.annotate import AnnotateReporter
from coverage.backward import string_class, iitems
-from coverage.collector import Collector
+from coverage.collector import Collector, CTracer
from coverage.config import read_coverage_config
-from coverage.data import CoverageData, CoverageDataFiles
+from coverage.data import CoverageData, combine_parallel_data
from coverage.debug import DebugControl, write_formatted_info
-from coverage.files import TreeMatcher, FnmatchMatcher
-from coverage.files import PathAliases, find_python_files, prep_patterns
-from coverage.files import canonical_filename, set_relative_directory
-from coverage.files import ModuleMatcher, abs_file
+from coverage.disposition import disposition_debug_msg
+from coverage.files import PathAliases, set_relative_directory, abs_file
from coverage.html import HtmlReporter
+from coverage.inorout import InOrOut
from coverage.misc import CoverageException, bool_or_none, join_regex
from coverage.misc import file_be_gone, isolate_module
from coverage.plugin import FileReporter
from coverage.plugin_support import Plugins
-from coverage.python import PythonFileReporter, source_for_file
+from coverage.python import PythonFileReporter
from coverage.results import Analysis, Numbers
from coverage.summary import SummaryReporter
from coverage.xmlreport import XmlReporter
@@ -43,22 +37,6 @@ except ImportError: # pragma: only jytho
os = isolate_module(os)
-# Pypy has some unusual stuff in the "stdlib". Consider those locations
-# when deciding where the stdlib is. These modules are not used for anything,
-# they are modules importable from the pypy lib directories, so that we can
-# find those directories.
-_structseq = _pypy_irc_topic = None
-if env.PYPY:
- try:
- import _structseq
- except ImportError:
- pass
-
- try:
- import _pypy_irc_topic
- except ImportError:
- pass
-
class Coverage(object):
"""Programmatic access to coverage.py.
@@ -74,11 +52,12 @@ class Coverage(object):
cov.html_report(directory='covhtml')
"""
+
def __init__(
self, data_file=None, data_suffix=None, cover_pylib=None,
auto_data=False, timid=None, branch=None, config_file=True,
source=None, omit=None, include=None, debug=None,
- concurrency=None,
+ concurrency=None, check_preimported=False, context=None,
):
"""
`data_file` is the base name of the data file to use, defaulting to
@@ -132,58 +111,64 @@ class Coverage(object):
"eventlet", "gevent", "multiprocessing", or "thread" (the default).
This can also be a list of these strings.
+ If `check_preimported` is true, then when coverage is started, the
+ aleady-imported files will be checked to see if they should be measured
+ by coverage. Importing measured files before coverage is started can
+ mean that code is missed.
+
+ `context` is a string to use as the context label for collected data.
+
.. versionadded:: 4.0
The `concurrency` parameter.
.. versionadded:: 4.2
The `concurrency` parameter can now be a list of strings.
+ .. versionadded:: 5.0
+ The `check_preimported` and `context` parameters.
+
"""
# Build our configuration from a number of sources.
- self.config_file, self.config = read_coverage_config(
+ self.config = read_coverage_config(
config_file=config_file,
data_file=data_file, cover_pylib=cover_pylib, timid=timid,
branch=branch, parallel=bool_or_none(data_suffix),
source=source, run_omit=omit, run_include=include, debug=debug,
report_omit=omit, report_include=include,
- concurrency=concurrency,
+ concurrency=concurrency, context=context,
)
# This is injectable by tests.
self._debug_file = None
self._auto_load = self._auto_save = auto_data
- self._data_suffix = data_suffix
-
- # The matchers for _should_trace.
- self.source_match = None
- self.source_pkgs_match = None
- self.pylib_match = self.cover_match = None
- self.include_match = self.omit_match = None
+ self._data_suffix_specified = data_suffix
# Is it ok for no data to be collected?
self._warn_no_data = True
self._warn_unimported_source = True
+ self._warn_preimported_source = check_preimported
# A record of all the warnings that have been issued.
self._warnings = []
# Other instance attributes, set later.
- self.omit = self.include = self.source = None
- self.source_pkgs_unmatched = None
- self.source_pkgs = None
- self.data = self.data_files = self.collector = None
- self.plugins = None
- self.pylib_paths = self.cover_paths = None
- self.data_suffix = self.run_suffix = None
+ self._data = self._collector = None
+ self._plugins = None
+ self._inorout = None
+ self._inorout_class = InOrOut
+ self._data_suffix = self._run_suffix = None
self._exclude_re = None
- self.debug = None
+ self._debug = None
# State machine variables:
# Have we initialized everything?
self._inited = False
+ self._inited_for_start = False
# Have we started collecting and not stopped it?
self._started = False
+ # Have we written --debug output?
+ self._wrote_debug = False
# If we have sub-process measurement happening automatically, then we
# want any explicit creation of a Coverage object to mean, this process
@@ -209,13 +194,7 @@ class Coverage(object):
# Create and configure the debugging controller. COVERAGE_DEBUG_FILE
# is an environment variable, the name of a file to append debug logs
# to.
- if self._debug_file is None:
- debug_file_name = os.environ.get("COVERAGE_DEBUG_FILE")
- if debug_file_name:
- self._debug_file = open(debug_file_name, "a")
- else:
- self._debug_file = sys.stderr
- self.debug = DebugControl(self.config.debug, self._debug_file)
+ self._debug = DebugControl(self.config.debug, self._debug_file)
# _exclude_re is a dict that maps exclusion list names to compiled regexes.
self._exclude_re = {}
@@ -223,364 +202,42 @@ class Coverage(object):
set_relative_directory()
# Load plugins
- self.plugins = Plugins.load_plugins(self.config.plugins, self.config, self.debug)
+ self._plugins = Plugins.load_plugins(self.config.plugins, self.config, self._debug)
# Run configuring plugins.
- for plugin in self.plugins.configurers:
+ for plugin in self._plugins.configurers:
# We need an object with set_option and get_option. Either self or
# self.config will do. Choosing randomly stops people from doing
# other things with those objects, against the public API. Yes,
# this is a bit childish. :)
plugin.configure([self, self.config][int(time.time()) % 2])
- # The source argument can be directories or package names.
- self.source = []
- self.source_pkgs = []
- for src in self.config.source or []:
- if os.path.isdir(src):
- self.source.append(canonical_filename(src))
- else:
- self.source_pkgs.append(src)
- self.source_pkgs_unmatched = self.source_pkgs[:]
-
- self.omit = prep_patterns(self.config.run_omit)
- self.include = prep_patterns(self.config.run_include)
-
- concurrency = self.config.concurrency or []
- if "multiprocessing" in concurrency:
- if not patch_multiprocessing:
- raise CoverageException( # pragma: only jython
- "multiprocessing is not supported on this Python"
- )
- patch_multiprocessing(rcfile=self.config_file)
- # Multi-processing uses parallel for the subprocesses, so also use
- # it for the main process.
- self.config.parallel = True
-
- self.collector = Collector(
- should_trace=self._should_trace,
- check_include=self._check_include_omit_etc,
- timid=self.config.timid,
- branch=self.config.branch,
- warn=self._warn,
- concurrency=concurrency,
- )
-
- # Early warning if we aren't going to be able to support plugins.
- if self.plugins.file_tracers and not self.collector.supports_plugins:
- self._warn(
- "Plugin file tracers (%s) aren't supported with %s" % (
- ", ".join(
- plugin._coverage_plugin_name
- for plugin in self.plugins.file_tracers
- ),
- self.collector.tracer_name(),
- )
- )
- for plugin in self.plugins.file_tracers:
- plugin._coverage_enabled = False
-
- # Suffixes are a bit tricky. We want to use the data suffix only when
- # collecting data, not when combining data. So we save it as
- # `self.run_suffix` now, and promote it to `self.data_suffix` if we
- # find that we are collecting data later.
- if self._data_suffix or self.config.parallel:
- if not isinstance(self._data_suffix, string_class):
- # if data_suffix=True, use .machinename.pid.random
- self._data_suffix = True
- else:
- self._data_suffix = None
- self.data_suffix = None
- self.run_suffix = self._data_suffix
-
- # Create the data file. We do this at construction time so that the
- # data file will be written into the directory where the process
- # started rather than wherever the process eventually chdir'd to.
- self.data = CoverageData(debug=self.debug)
- self.data_files = CoverageDataFiles(
- basename=self.config.data_file, warn=self._warn, debug=self.debug,
- )
-
- # The directories for files considered "installed with the interpreter".
- self.pylib_paths = set()
- if not self.config.cover_pylib:
- # Look at where some standard modules are located. That's the
- # indication for "installed with the interpreter". In some
- # environments (virtualenv, for example), these modules may be
- # spread across a few locations. Look at all the candidate modules
- # we've imported, and take all the different ones.
- for m in (atexit, inspect, os, platform, _pypy_irc_topic, re, _structseq, traceback):
- if m is not None and hasattr(m, "__file__"):
- self.pylib_paths.add(self._canonical_path(m, directory=True))
-
- if _structseq and not hasattr(_structseq, '__file__'):
- # PyPy 2.4 has no __file__ in the builtin modules, but the code
- # objects still have the file names. So dig into one to find
- # the path to exclude.
- structseq_new = _structseq.structseq_new
- try:
- structseq_file = structseq_new.func_code.co_filename
- except AttributeError:
- structseq_file = structseq_new.__code__.co_filename
- self.pylib_paths.add(self._canonical_path(structseq_file))
-
- # To avoid tracing the coverage.py code itself, we skip anything
- # located where we are.
- self.cover_paths = [self._canonical_path(__file__, directory=True)]
- if env.TESTING:
- # Don't include our own test code.
- self.cover_paths.append(os.path.join(self.cover_paths[0], "tests"))
-
- # When testing, we use PyContracts, which should be considered
- # part of coverage.py, and it uses six. Exclude those directories
- # just as we exclude ourselves.
- import contracts
- import six
- for mod in [contracts, six]:
- self.cover_paths.append(self._canonical_path(mod))
-
- # Set the reporting precision.
- Numbers.set_precision(self.config.precision)
-
- atexit.register(self._atexit)
-
- # Create the matchers we need for _should_trace
- if self.source or self.source_pkgs:
- self.source_match = TreeMatcher(self.source)
- self.source_pkgs_match = ModuleMatcher(self.source_pkgs)
- else:
- if self.cover_paths:
- self.cover_match = TreeMatcher(self.cover_paths)
- if self.pylib_paths:
- self.pylib_match = TreeMatcher(self.pylib_paths)
- if self.include:
- self.include_match = FnmatchMatcher(self.include)
- if self.omit:
- self.omit_match = FnmatchMatcher(self.omit)
-
- # The user may want to debug things, show info if desired.
- self._write_startup_debug()
+ def _post_init(self):
+ """Stuff to do after everything is initialized."""
+ if not self._wrote_debug:
+ self._wrote_debug = True
+ self._write_startup_debug()
def _write_startup_debug(self):
"""Write out debug info at startup if needed."""
wrote_any = False
- with self.debug.without_callers():
- if self.debug.should('config'):
+ with self._debug.without_callers():
+ if self._debug.should('config'):
config_info = sorted(self.config.__dict__.items())
- write_formatted_info(self.debug, "config", config_info)
+ config_info = [(k, v) for k, v in config_info if not k.startswith('_')]
+ write_formatted_info(self._debug, "config", config_info)
wrote_any = True
- if self.debug.should('sys'):
- write_formatted_info(self.debug, "sys", self.sys_info())
- for plugin in self.plugins:
+ if self._debug.should('sys'):
+ write_formatted_info(self._debug, "sys", self.sys_info())
+ for plugin in self._plugins:
header = "sys: " + plugin._coverage_plugin_name
info = plugin.sys_info()
- write_formatted_info(self.debug, header, info)
+ write_formatted_info(self._debug, header, info)
wrote_any = True
if wrote_any:
- write_formatted_info(self.debug, "end", ())
-
- def _canonical_path(self, morf, directory=False):
- """Return the canonical path of the module or file `morf`.
-
- If the module is a package, then return its directory. If it is a
- module, then return its file, unless `directory` is True, in which
- case return its enclosing directory.
-
- """
- morf_path = PythonFileReporter(morf, self).filename
- if morf_path.endswith("__init__.py") or directory:
- morf_path = os.path.split(morf_path)[0]
- return morf_path
-
- def _name_for_module(self, module_globals, filename):
- """Get the name of the module for a set of globals and file name.
-
- For configurability's sake, we allow __main__ modules to be matched by
- their importable name.
-
- If loaded via runpy (aka -m), we can usually recover the "original"
- full dotted module name, otherwise, we resort to interpreting the
- file name to get the module's name. In the case that the module name
- can't be determined, None is returned.
-
- """
- if module_globals is None: # pragma: only ironpython
- # IronPython doesn't provide globals: https://github.com/IronLanguages/main/issues/1296
- module_globals = {}
-
- dunder_name = module_globals.get('__name__', None)
-
- if isinstance(dunder_name, str) and dunder_name != '__main__':
- # This is the usual case: an imported module.
- return dunder_name
-
- loader = module_globals.get('__loader__', None)
- for attrname in ('fullname', 'name'): # attribute renamed in py3.2
- if hasattr(loader, attrname):
- fullname = getattr(loader, attrname)
- else:
- continue
-
- if isinstance(fullname, str) and fullname != '__main__':
- # Module loaded via: runpy -m
- return fullname
-
- # Script as first argument to Python command line.
- inspectedname = inspect.getmodulename(filename)
- if inspectedname is not None:
- return inspectedname
- else:
- return dunder_name
-
- def _should_trace_internal(self, filename, frame):
- """Decide whether to trace execution in `filename`, with a reason.
-
- This function is called from the trace function. As each new file name
- is encountered, this function determines whether it is traced or not.
-
- Returns a FileDisposition object.
-
- """
- original_filename = filename
- disp = _disposition_init(self.collector.file_disposition_class, filename)
-
- def nope(disp, reason):
- """Simple helper to make it easy to return NO."""
- disp.trace = False
- disp.reason = reason
- return disp
-
- # Compiled Python files have two file names: frame.f_code.co_filename is
- # the file name at the time the .pyc was compiled. The second name is
- # __file__, which is where the .pyc was actually loaded from. Since
- # .pyc files can be moved after compilation (for example, by being
- # installed), we look for __file__ in the frame and prefer it to the
- # co_filename value.
- dunder_file = frame.f_globals and frame.f_globals.get('__file__')
- if dunder_file:
- filename = source_for_file(dunder_file)
- if original_filename and not original_filename.startswith('<'):
- orig = os.path.basename(original_filename)
- if orig != os.path.basename(filename):
- # Files shouldn't be renamed when moved. This happens when
- # exec'ing code. If it seems like something is wrong with
- # the frame's file name, then just use the original.
- filename = original_filename
-
- if not filename:
- # Empty string is pretty useless.
- return nope(disp, "empty string isn't a file name")
-
- if filename.startswith('memory:'):
- return nope(disp, "memory isn't traceable")
-
- if filename.startswith('<'):
- # Lots of non-file execution is represented with artificial
- # file names like "<string>", "<doctest readme.txt[0]>", or
- # "<exec_function>". Don't ever trace these executions, since we
- # can't do anything with the data later anyway.
- return nope(disp, "not a real file name")
-
- # pyexpat does a dumb thing, calling the trace function explicitly from
- # C code with a C file name.
- if re.search(r"[/\\]Modules[/\\]pyexpat.c", filename):
- return nope(disp, "pyexpat lies about itself")
-
- # Jython reports the .class file to the tracer, use the source file.
- if filename.endswith("$py.class"):
- filename = filename[:-9] + ".py"
-
- canonical = canonical_filename(filename)
- disp.canonical_filename = canonical
-
- # Try the plugins, see if they have an opinion about the file.
- plugin = None
- for plugin in self.plugins.file_tracers:
- if not plugin._coverage_enabled:
- continue
-
- try:
- file_tracer = plugin.file_tracer(canonical)
- if file_tracer is not None:
- file_tracer._coverage_plugin = plugin
- disp.trace = True
- disp.file_tracer = file_tracer
- if file_tracer.has_dynamic_source_filename():
- disp.has_dynamic_filename = True
- else:
- disp.source_filename = canonical_filename(
- file_tracer.source_filename()
- )
- break
- except Exception:
- self._warn(
- "Disabling plug-in %r due to an exception:" % (
- plugin._coverage_plugin_name
- )
- )
- traceback.print_exc()
- plugin._coverage_enabled = False
- continue
- else:
- # No plugin wanted it: it's Python.
- disp.trace = True
- disp.source_filename = canonical
-
- if not disp.has_dynamic_filename:
- if not disp.source_filename:
- raise CoverageException(
- "Plugin %r didn't set source_filename for %r" %
- (plugin, disp.original_filename)
- )
- reason = self._check_include_omit_etc_internal(
- disp.source_filename, frame,
- )
- if reason:
- nope(disp, reason)
-
- return disp
-
- def _check_include_omit_etc_internal(self, filename, frame):
- """Check a file name against the include, omit, etc, rules.
-
- Returns a string or None. String means, don't trace, and is the reason
- why. None means no reason found to not trace.
-
- """
- modulename = self._name_for_module(frame.f_globals, filename)
-
- # If the user specified source or include, then that's authoritative
- # about the outer bound of what to measure and we don't have to apply
- # any canned exclusions. If they didn't, then we have to exclude the
- # stdlib and coverage.py directories.
- if self.source_match:
- if self.source_pkgs_match.match(modulename):
- if modulename in self.source_pkgs_unmatched:
- self.source_pkgs_unmatched.remove(modulename)
- elif not self.source_match.match(filename):
- return "falls outside the --source trees"
- elif self.include_match:
- if not self.include_match.match(filename):
- return "falls outside the --include trees"
- else:
- # If we aren't supposed to trace installed code, then check if this
- # is near the Python standard library and skip it if so.
- if self.pylib_match and self.pylib_match.match(filename):
- return "is in the stdlib"
-
- # We exclude the coverage.py code itself, since a little of it
- # will be measured otherwise.
- if self.cover_match and self.cover_match.match(filename):
- return "is part of coverage.py"
-
- # Check the file against the omit pattern.
- if self.omit_match and self.omit_match.match(filename):
- return "is inside an --omit pattern"
-
- # No reason found to skip this file.
- return None
+ write_formatted_info(self._debug, "end", ())
def _should_trace(self, filename, frame):
"""Decide whether to trace execution in `filename`.
@@ -588,9 +245,9 @@ class Coverage(object):
Calls `_should_trace_internal`, and returns the FileDisposition.
"""
- disp = self._should_trace_internal(filename, frame)
- if self.debug.should('trace'):
- self.debug.write(_disposition_debug_msg(disp))
+ disp = self._inorout.should_trace(filename, frame)
+ if self._debug.should('trace'):
+ self._debug.write(disposition_debug_msg(disp))
return disp
def _check_include_omit_etc(self, filename, frame):
@@ -599,13 +256,13 @@ class Coverage(object):
Returns a boolean: True if the file should be traced, False if not.
"""
- reason = self._check_include_omit_etc_internal(filename, frame)
- if self.debug.should('trace'):
+ reason = self._inorout.check_include_omit_etc(filename, frame)
+ if self._debug.should('trace'):
if not reason:
msg = "Including %r" % (filename,)
else:
msg = "Not including %r: %s" % (filename, reason)
- self.debug.write(msg)
+ self._debug.write(msg)
return not reason
@@ -621,7 +278,7 @@ class Coverage(object):
self._warnings.append(msg)
if slug:
msg = "%s (%s)" % (msg, slug)
- if self.debug.should('pid'):
+ if self._debug.should('pid'):
msg = "[%d] %s" % (os.getpid(), msg)
sys.stderr.write("Coverage.py warning: %s\n" % msg)
@@ -664,17 +321,97 @@ class Coverage(object):
"""
self.config.set_option(option_name, value)
- def use_cache(self, usecache):
- """Obsolete method."""
- self._init()
- if not usecache:
- self._warn("use_cache(False) is no longer supported.")
-
def load(self):
"""Load previously-collected coverage data from the data file."""
self._init()
- self.collector.reset()
- self.data_files.read(self.data)
+ if self._collector:
+ self._collector.reset()
+ should_skip = self.config.parallel and not os.path.exists(self.config.data_file)
+ if not should_skip:
+ self._init_data(suffix=None)
+ self._post_init()
+ if not should_skip:
+ self._data.read()
+
+ def _init_for_start(self):
+ """Initialization for start()"""
+ # Construct the collector.
+ concurrency = self.config.concurrency or []
+ if "multiprocessing" in concurrency:
+ if not patch_multiprocessing:
+ raise CoverageException( # pragma: only jython
+ "multiprocessing is not supported on this Python"
+ )
+ patch_multiprocessing(rcfile=self.config.config_file)
+ # Multi-processing uses parallel for the subprocesses, so also use
+ # it for the main process.
+ self.config.parallel = True
+
+ if self.config.dynamic_context is None:
+ should_start_context = None
+ elif self.config.dynamic_context == "test_function":
+ should_start_context = should_start_context_test_function
+ else:
+ raise CoverageException(
+ "Don't understand dynamic_context setting: {!r}".format(self.config.dynamic_context)
+ )
+
+ self._collector = Collector(
+ should_trace=self._should_trace,
+ check_include=self._check_include_omit_etc,
+ should_start_context=should_start_context,
+ timid=self.config.timid,
+ branch=self.config.branch,
+ warn=self._warn,
+ concurrency=concurrency,
+ )
+
+ suffix = self._data_suffix_specified
+ if suffix or self.config.parallel:
+ if not isinstance(suffix, string_class):
+ # if data_suffix=True, use .machinename.pid.random
+ suffix = True
+ else:
+ suffix = None
+
+ self._init_data(suffix)
+
+ self._collector.use_data(self._data, self.config.context)
+
+ # Early warning if we aren't going to be able to support plugins.
+ if self._plugins.file_tracers and not self._collector.supports_plugins:
+ self._warn(
+ "Plugin file tracers (%s) aren't supported with %s" % (
+ ", ".join(
+ plugin._coverage_plugin_name
+ for plugin in self._plugins.file_tracers
+ ),
+ self._collector.tracer_name(),
+ )
+ )
+ for plugin in self._plugins.file_tracers:
+ plugin._coverage_enabled = False
+
+ # Create the file classifying substructure.
+ self._inorout = self._inorout_class(warn=self._warn)
+ self._inorout.configure(self.config)
+ self._inorout.plugins = self._plugins
+ self._inorout.disp_class = self._collector.file_disposition_class
+
+ atexit.register(self._atexit)
+
+ def _init_data(self, suffix):
+ """Create a data file if we don't have one yet."""
+ if self._data is None:
+ # Create the data file. We do this at construction time so that the
+ # data file will be written into the directory where the process
+ # started rather than wherever the process eventually chdir'd to.
+ self._data = CoverageData(
+ basename=self.config.data_file,
+ suffix=suffix,
+ warn=self._warn,
+ debug=self._debug,
+ )
def start(self):
"""Start measuring code coverage.
@@ -688,29 +425,35 @@ class Coverage(object):
"""
self._init()
- if self.include:
- if self.source or self.source_pkgs:
- self._warn("--include is ignored because --source is set", slug="include-ignored")
- if self.run_suffix:
- # Calling start() means we're running code, so use the run_suffix
- # as the data_suffix when we eventually save the data.
- self.data_suffix = self.run_suffix
+ if not self._inited_for_start:
+ self._inited_for_start = True
+ self._init_for_start()
+ self._post_init()
+
+ # Issue warnings for possible problems.
+ self._inorout.warn_conflicting_settings()
+
+ # See if we think some code that would eventually be measured has
+ # already been imported.
+ if self._warn_preimported_source:
+ self._inorout.warn_already_imported_files()
+
if self._auto_load:
self.load()
- self.collector.start()
+ self._collector.start()
self._started = True
def stop(self):
"""Stop measuring code coverage."""
if self._started:
- self.collector.stop()
+ self._collector.stop()
self._started = False
def _atexit(self):
"""Clean up on process shutdown."""
- if self.debug.should("process"):
- self.debug.write("atexit: {0!r}".format(self))
+ if self._debug.should("process"):
+ self._debug.write("atexit: {0!r}".format(self))
if self._started:
self.stop()
if self._auto_save:
@@ -724,9 +467,12 @@ class Coverage(object):
"""
self._init()
- self.collector.reset()
- self.data.erase()
- self.data_files.erase(parallel=self.config.parallel)
+ self._post_init()
+ if self._collector:
+ self._collector.reset()
+ self._init_data(suffix=None)
+ self._data.erase(parallel=self.config.parallel)
+ self._data = None
def clear_exclude(self, which='exclude'):
"""Clear the exclude list."""
@@ -777,9 +523,8 @@ class Coverage(object):
def save(self):
"""Save the collected coverage data to the data file."""
- self._init()
- self.get_data()
- self.data_files.write(self.data, suffix=self.data_suffix)
+ data = self.get_data()
+ data.write()
def combine(self, data_paths=None, strict=False):
"""Combine together a number of similarly-named coverage data files.
@@ -804,6 +549,8 @@ class Coverage(object):
"""
self._init()
+ self._init_data(suffix=None)
+ self._post_init()
self.get_data()
aliases = None
@@ -814,9 +561,7 @@ class Coverage(object):
for pattern in paths[1:]:
aliases.add(pattern, result)
- self.data_files.combine_parallel_data(
- self.data, aliases=aliases, data_paths=data_paths, strict=strict,
- )
+ combine_parallel_data(self._data, aliases=aliases, data_paths=data_paths, strict=strict)
def get_data(self):
"""Get the collected data.
@@ -829,11 +574,13 @@ class Coverage(object):
"""
self._init()
+ self._init_data(suffix=None)
+ self._post_init()
- if self.collector.save_data(self.data):
+ if self._collector and self._collector.flush_data():
self._post_save_work()
- return self.data
+ return self._data
def _post_save_work(self):
"""After saving data, look for warnings, post-work, etc.
@@ -845,82 +592,18 @@ class Coverage(object):
# If there are still entries in the source_pkgs_unmatched list,
# then we never encountered those packages.
if self._warn_unimported_source:
- for pkg in self.source_pkgs_unmatched:
- self._warn_about_unmeasured_code(pkg)
+ self._inorout.warn_unimported_source()
# Find out if we got any data.
- if not self.data and self._warn_no_data:
+ if not self._data and self._warn_no_data:
self._warn("No data was collected.", slug="no-data-collected")
# Find files that were never executed at all.
- for pkg in self.source_pkgs:
- if (not pkg in sys.modules or
- not hasattr(sys.modules[pkg], '__file__') or
- not os.path.exists(sys.modules[pkg].__file__)):
- continue
- pkg_file = source_for_file(sys.modules[pkg].__file__)
- self._find_unexecuted_files(self._canonical_path(pkg_file))
-
- for src in self.source:
- self._find_unexecuted_files(src)
+ for file_path, plugin_name in self._inorout.find_unexecuted_files():
+ self._data.touch_file(file_path, plugin_name)
if self.config.note:
- self.data.add_run_info(note=self.config.note)
-
- def _warn_about_unmeasured_code(self, pkg):
- """Warn about a package or module that we never traced.
-
- `pkg` is a string, the name of the package or module.
-
- """
- mod = sys.modules.get(pkg)
- if mod is None:
- self._warn("Module %s was never imported." % pkg, slug="module-not-imported")
- return
-
- is_namespace = hasattr(mod, '__path__') and not hasattr(mod, '__file__')
- has_file = hasattr(mod, '__file__') and os.path.exists(mod.__file__)
-
- if is_namespace:
- # A namespace package. It's OK for this not to have been traced,
- # since there is no code directly in it.
- return
-
- if not has_file:
- self._warn("Module %s has no Python source." % pkg, slug="module-not-python")
- return
-
- # The module was in sys.modules, and seems like a module with code, but
- # we never measured it. I guess that means it was imported before
- # coverage even started.
- self._warn(
- "Module %s was previously imported, but not measured" % pkg,
- slug="module-not-measured",
- )
-
- def _find_plugin_files(self, src_dir):
- """Get executable files from the plugins."""
- for plugin in self.plugins.file_tracers:
- for x_file in plugin.find_executable_files(src_dir):
- yield x_file, plugin._coverage_plugin_name
-
- def _find_unexecuted_files(self, src_dir):
- """Find unexecuted files in `src_dir`.
-
- Search for files in `src_dir` that are probably importable,
- and add them as unexecuted files in `self.data`.
-
- """
- py_files = ((py_file, None) for py_file in find_python_files(src_dir))
- plugin_files = self._find_plugin_files(src_dir)
-
- for file_path, plugin_name in itertools.chain(py_files, plugin_files):
- file_path = canonical_filename(file_path)
- if self.omit_match and self.omit_match.match(file_path):
- # Turns out this file was omitted, so don't pull it back
- # in as unexecuted.
- continue
- self.data.touch_file(file_path, plugin_name)
+ self._data.add_run_info(note=self.config.note)
# Backward compatibility with version 1.
def analysis(self, morf):
@@ -945,7 +628,6 @@ class Coverage(object):
coverage data.
"""
- self._init()
analysis = self._analyze(morf)
return (
analysis.filename,
@@ -961,11 +643,16 @@ class Coverage(object):
Returns an `Analysis` object.
"""
- self.get_data()
+ # All reporting comes through here, so do reporting initialization.
+ self._init()
+ Numbers.set_precision(self.config.precision)
+ self._post_init()
+
+ data = self.get_data()
if not isinstance(it, FileReporter):
it = self._get_file_reporter(it)
- return Analysis(self.data, it)
+ return Analysis(data, it)
def _get_file_reporter(self, morf):
"""Get a FileReporter for a module or file name."""
@@ -974,9 +661,9 @@ class Coverage(object):
if isinstance(morf, string_class):
abs_morf = abs_file(morf)
- plugin_name = self.data.file_tracer(abs_morf)
+ plugin_name = self._data.file_tracer(abs_morf)
if plugin_name:
- plugin = self.plugins.get(plugin_name)
+ plugin = self._plugins.get(plugin_name)
if plugin:
file_reporter = plugin.file_reporter(abs_morf)
@@ -1004,17 +691,13 @@ class Coverage(object):
"""
if not morfs:
- morfs = self.data.measured_files()
+ morfs = self._data.measured_files()
- # Be sure we have a list.
- if not isinstance(morfs, (list, tuple)):
+ # Be sure we have a collection.
+ if not isinstance(morfs, (list, tuple, set)):
morfs = [morfs]
- file_reporters = []
- for morf in morfs:
- file_reporter = self._get_file_reporter(morf)
- file_reporters.append(file_reporter)
-
+ file_reporters = [self._get_file_reporter(morf) for morf in morfs]
return file_reporters
def report(
@@ -1022,21 +705,30 @@ class Coverage(object):
file=None, # pylint: disable=redefined-builtin
omit=None, include=None, skip_covered=None,
):
- """Write a summary report to `file`.
+ """Write a textual summary report to `file`.
Each module in `morfs` is listed, with counts of statements, executed
statements, missing statements, and a list of lines missed.
+ If `show_missing` is true, then details of which lines or branches are
+ missing will be included in the report. If `ignore_errors` is true,
+ then a failure while reporting a single file will not stop the entire
+ report.
+
+ `file` is a file-like object, suitable for writing.
+
`include` is a list of file name patterns. Files that match will be
included in the report. Files matching `omit` will not be included in
the report.
- If `skip_covered` is True, don't report on files with 100% coverage.
+ If `skip_covered` is true, don't report on files with 100% coverage.
+
+ All of the arguments default to the settings read from the
+ :ref:`configuration file <config>`.
Returns a float, the total percentage covered.
"""
- self.get_data()
self.config.from_args(
ignore_errors=ignore_errors, report_omit=omit, report_include=include,
show_missing=show_missing, skip_covered=skip_covered,
@@ -1058,7 +750,6 @@ class Coverage(object):
See :meth:`report` for other arguments.
"""
- self.get_data()
self.config.from_args(
ignore_errors=ignore_errors, report_omit=omit, report_include=include
)
@@ -1085,7 +776,6 @@ class Coverage(object):
Returns a float, the total percentage covered.
"""
- self.get_data()
self.config.from_args(
ignore_errors=ignore_errors, report_omit=omit, report_include=include,
html_dir=directory, extra_css=extra_css, html_title=title,
@@ -1110,7 +800,6 @@ class Coverage(object):
Returns a float, the total percentage covered.
"""
- self.get_data()
self.config.from_args(
ignore_errors=ignore_errors, report_omit=omit, report_include=include,
xml_output=outfile,
@@ -1151,6 +840,7 @@ class Coverage(object):
import coverage as covmod
self._init()
+ self._post_init()
def plugin_info(plugins):
"""Make an entry for the sys_info from a list of plug-ins."""
@@ -1165,71 +855,55 @@ class Coverage(object):
info = [
('version', covmod.__version__),
('coverage', covmod.__file__),
- ('cover_paths', self.cover_paths),
- ('pylib_paths', self.pylib_paths),
- ('tracer', self.collector.tracer_name()),
- ('plugins.file_tracers', plugin_info(self.plugins.file_tracers)),
- ('plugins.configurers', plugin_info(self.plugins.configurers)),
- ('config_files', self.config.attempted_config_files),
- ('configs_read', self.config.config_files),
- ('data_path', self.data_files.filename),
+ ('tracer', self._collector.tracer_name() if self._collector else "-none-"),
+ ('CTracer', 'available' if CTracer else "unavailable"),
+ ('plugins.file_tracers', plugin_info(self._plugins.file_tracers)),
+ ('plugins.configurers', plugin_info(self._plugins.configurers)),
+ ('configs_attempted', self.config.attempted_config_files),
+ ('configs_read', self.config.config_files_read),
+ ('config_file', self.config.config_file),
+ ('config_contents',
+ repr(self.config._config_contents)
+ if self.config._config_contents
+ else '-none-'
+ ),
+ ('data_file', self._data.filename if self._data else "-none-"),
('python', sys.version.replace('\n', '')),
('platform', platform.platform()),
('implementation', platform.python_implementation()),
('executable', sys.executable),
+ ('pid', os.getpid()),
('cwd', os.getcwd()),
('path', sys.path),
('environment', sorted(
("%s = %s" % (k, v))
for k, v in iitems(os.environ)
- if k.startswith(("COV", "PY"))
+ if any(slug in k for slug in ("COV", "PY"))
)),
- ('command_line', " ".join(getattr(sys, 'argv', ['???']))),
+ ('command_line', " ".join(getattr(sys, 'argv', ['-none-']))),
]
- matcher_names = [
- 'source_match', 'source_pkgs_match',
- 'include_match', 'omit_match',
- 'cover_match', 'pylib_match',
- ]
-
- for matcher_name in matcher_names:
- matcher = getattr(self, matcher_name)
- if matcher:
- matcher_info = matcher.info()
- else:
- matcher_info = '-none-'
- info.append((matcher_name, matcher_info))
+ if self._inorout:
+ info.extend(self._inorout.sys_info())
return info
-# FileDisposition "methods": FileDisposition is a pure value object, so it can
-# be implemented in either C or Python. Acting on them is done with these
-# functions.
+# Mega debugging...
+if int(os.environ.get("COVERAGE_DEBUG_CALLS", 0)): # pragma: debugging
+ from coverage.debug import decorate_methods, show_calls
-def _disposition_init(cls, original_filename):
- """Construct and initialize a new FileDisposition object."""
- disp = cls()
- disp.original_filename = original_filename
- disp.canonical_filename = original_filename
- disp.source_filename = None
- disp.trace = False
- disp.reason = ""
- disp.file_tracer = None
- disp.has_dynamic_filename = False
- return disp
+ Coverage = decorate_methods(show_calls(show_args=True), butnot=['get_data'])(Coverage)
-def _disposition_debug_msg(disp):
- """Make a nice debug message of what the FileDisposition is doing."""
- if disp.trace:
- msg = "Tracing %r" % (disp.original_filename,)
- if disp.file_tracer:
- msg += ": will be traced by %r" % disp.file_tracer
- else:
- msg = "Not tracing %r: %s" % (disp.original_filename, disp.reason)
- return msg
+def should_start_context_test_function(frame):
+ """Who-Tests-What hack: Determine whether this frame begins a new who-context."""
+ with open("/tmp/ssc.txt", "a") as f:
+ f.write("hello\n")
+ fn_name = frame.f_code.co_name
+ if fn_name.startswith("test"):
+ return fn_name
+ return None
def process_startup():
@@ -1277,10 +951,11 @@ def process_startup():
cov = Coverage(config_file=cps)
process_startup.coverage = cov
- cov.start()
cov._warn_no_data = False
cov._warn_unimported_source = False
+ cov._warn_preimported_source = False
cov._auto_save = True
+ cov.start()
return cov