summaryrefslogtreecommitdiff
path: root/coverage
diff options
context:
space:
mode:
authorNed Batchelder <ned@nedbatchelder.com>2009-03-05 21:21:43 -0500
committerNed Batchelder <ned@nedbatchelder.com>2009-03-05 21:21:43 -0500
commit0cc8fd18714cb214327a0a58b704401a9efdf8dc (patch)
tree0a6ac59eaf1b7acaf26053a5f3c69304b9ddbda3 /coverage
downloadpython-coveragepy-git-0cc8fd18714cb214327a0a58b704401a9efdf8dc.tar.gz
Initial coverage.py 3.0 beta 1
Diffstat (limited to 'coverage')
-rw-r--r--coverage/__init__.py76
-rw-r--r--coverage/analyzer.py232
-rw-r--r--coverage/cmdline.py149
-rw-r--r--coverage/collector.py110
-rw-r--r--coverage/control.py410
-rw-r--r--coverage/data.py122
-rw-r--r--coverage/misc.py18
-rw-r--r--coverage/tracer.c211
8 files changed, 1328 insertions, 0 deletions
diff --git a/coverage/__init__.py b/coverage/__init__.py
new file mode 100644
index 00000000..8086877c
--- /dev/null
+++ b/coverage/__init__.py
@@ -0,0 +1,76 @@
+"""Code coverage measurement for Python.
+
+Ned Batchelder
+http://nedbatchelder.com/code/modules/coverage.html
+
+"""
+
+__version__ = "3.0b1" # see detailed history in CHANGES
+
+import sys
+
+from coverage.control import coverage
+from coverage.data import CoverageData
+from coverage.cmdline import main, CoverageScript
+from coverage.misc import CoverageException
+
+
+# Module-level functions. The original API to this module was based on
+# functions defined directly in the module, with a singleton of the coverage()
+# class. This design hampered programmability. Here we define the top-level
+# functions to create the singleton when they are first called.
+
+# Singleton object for use with module-level functions. The singleton is
+# created as needed when one of the module-level functions is called.
+the_coverage = None
+
+def call_singleton_method(name, args, kwargs):
+ global the_coverage
+ if not the_coverage:
+ the_coverage = coverage()
+ return getattr(the_coverage, name)(*args, **kwargs)
+
+mod_funcs = """
+ use_cache start stop erase begin_recursive end_recursive exclude
+ analysis analysis2 report annotate annotate_file
+ """
+
+coverage_module = sys.modules[__name__]
+
+for func_name in mod_funcs.split():
+ # Have to define a function here to make a closure so the function name
+ # is locked in.
+ def func(name):
+ return lambda *a, **kw: call_singleton_method(name, a, kw)
+ setattr(coverage_module, func_name, func(func_name))
+
+
+# COPYRIGHT AND LICENSE
+#
+# Copyright 2001 Gareth Rees. All rights reserved.
+# Copyright 2004-2009 Ned Batchelder. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the
+# distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# HOLDERS AND CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
+# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
+# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
+# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+# DAMAGE.
diff --git a/coverage/analyzer.py b/coverage/analyzer.py
new file mode 100644
index 00000000..55dae7f7
--- /dev/null
+++ b/coverage/analyzer.py
@@ -0,0 +1,232 @@
+"""Code analysis for coverage.py"""
+
+import re, token, tokenize, types
+import cStringIO as StringIO
+
+from coverage.misc import nice_pair, CoverageException
+
+
+# Python version compatibility
+try:
+ set() # new in 2.4
+except NameError:
+ import sets
+ set = sets.Set # pylint: disable-msg=W0622
+
+
+class CodeAnalyzer:
+ """Analyze code to find executable lines, excluded lines, etc."""
+
+ def __init__(self, show_tokens=False):
+ self.show_tokens = show_tokens
+
+ # The text lines of the analyzed code.
+ self.lines = None
+
+ # The line numbers of excluded lines of code.
+ self.excluded = set()
+
+ # The line numbers of docstring lines.
+ self.docstrings = set()
+
+ # A dict mapping line numbers to (lo,hi) for multi-line statements.
+ self.multiline = {}
+
+ # The line numbers that start statements.
+ self.statement_starts = set()
+
+ def find_statement_starts(self, code):
+ """Find the starts of statements in compiled code.
+
+ Uses co_lnotab described in Python/compile.c to find line numbers that
+ start statements, adding them to `self.statement_starts`.
+
+ """
+ # Adapted from dis.py in the standard library.
+ byte_increments = [ord(c) for c in code.co_lnotab[0::2]]
+ line_increments = [ord(c) for c in code.co_lnotab[1::2]]
+
+ last_line_num = None
+ line_num = code.co_firstlineno
+ for byte_incr, line_incr in zip(byte_increments, line_increments):
+ if byte_incr:
+ if line_num != last_line_num:
+ self.statement_starts.add(line_num)
+ last_line_num = line_num
+ line_num += line_incr
+ if line_num != last_line_num:
+ self.statement_starts.add(line_num)
+
+ def find_statements(self, code):
+ """Find the statements in `code`.
+
+ Update `self.statement_starts`, a set of line numbers that start
+ statements. Recurses into all code objects reachable from `code`.
+
+ """
+ # Adapted from trace.py in the standard library.
+
+ # Get all of the lineno information from this code.
+ self.find_statement_starts(code)
+
+ # Check the constants for references to other code objects.
+ for c in code.co_consts:
+ if isinstance(c, types.CodeType):
+ # Found another code object, so recurse into it.
+ self.find_statements(c)
+
+ def raw_analyze(self, text=None, filename=None, exclude=None):
+ """Analyze `text` to find the interesting facts about its lines.
+
+ A handful of member fields are updated.
+
+ """
+ if not text:
+ sourcef = open(filename, 'rU')
+ text = sourcef.read()
+ sourcef.close()
+ text = text.replace('\r\n', '\n')
+ self.lines = text.split('\n')
+
+ # Find lines which match an exclusion pattern.
+ if exclude:
+ re_exclude = re.compile(exclude)
+ for i, ltext in enumerate(self.lines):
+ if re_exclude.search(ltext):
+ self.excluded.add(i+1)
+
+ # Tokenize, to find excluded suites, to find docstrings, and to find
+ # multi-line statements.
+ indent = 0
+ exclude_indent = 0
+ excluding = False
+ prev_toktype = token.INDENT
+ first_line = None
+
+ tokgen = tokenize.generate_tokens(StringIO.StringIO(text).readline)
+ for toktype, ttext, (slineno, _), (elineno, _), ltext in tokgen:
+ if self.show_tokens:
+ print "%10s %5s %-20r %r" % (
+ tokenize.tok_name.get(toktype, toktype),
+ nice_pair((slineno, elineno)), ttext, ltext
+ )
+ if toktype == token.INDENT:
+ indent += 1
+ elif toktype == token.DEDENT:
+ indent -= 1
+ elif toktype == token.OP and ttext == ':':
+ if not excluding and elineno in self.excluded:
+ # Start excluding a suite. We trigger off of the colon
+ # token so that the #pragma comment will be recognized on
+ # the same line as the colon.
+ exclude_indent = indent
+ excluding = True
+ elif toktype == token.STRING and prev_toktype == token.INDENT:
+ # Strings that are first on an indented line are docstrings.
+ # (a trick from trace.py in the stdlib.)
+ for i in xrange(slineno, elineno+1):
+ self.docstrings.add(i)
+ elif toktype == token.NEWLINE:
+ if first_line is not None and elineno != first_line:
+ # We're at the end of a line, and we've ended on a
+ # different line than the first line of the statement,
+ # so record a multi-line range.
+ rng = (first_line, elineno)
+ for l in xrange(first_line, elineno+1):
+ self.multiline[l] = rng
+ first_line = None
+
+ if ttext.strip() and toktype != tokenize.COMMENT:
+ # A non-whitespace token.
+ if first_line is None:
+ # The token is not whitespace, and is the first in a
+ # statement.
+ first_line = slineno
+ # Check whether to end an excluded suite.
+ if excluding and indent <= exclude_indent:
+ excluding = False
+ if excluding:
+ self.excluded.add(elineno)
+
+ prev_toktype = toktype
+
+ # Find the starts of the executable statements.
+ filename = filename or "<code>"
+ try:
+ # Python 2.3 and 2.4 don't like partial last lines, so be sure the
+ # text ends nicely for them.
+ text += '\n'
+ code = compile(text, filename, "exec")
+ except SyntaxError, synerr:
+ raise CoverageException(
+ "Couldn't parse '%s' as Python source: '%s' at line %d" %
+ (filename, synerr.msg, synerr.lineno)
+ )
+
+ self.find_statements(code)
+
+ def map_to_first_line(self, lines, ignore=None):
+ """Map the line numbers in `lines` to the correct first line of the
+ statement.
+
+ Skip any line mentioned in `ignore`.
+
+ Returns a sorted list of the first lines.
+
+ """
+ ignore = ignore or []
+ lset = set()
+ for l in lines:
+ if l in ignore:
+ continue
+ rng = self.multiline.get(l)
+ if rng:
+ new_l = rng[0]
+ else:
+ new_l = l
+ if new_l not in ignore:
+ lset.add(new_l)
+ lines = list(lset)
+ lines.sort()
+ return lines
+
+ def analyze_source(self, text=None, filename=None, exclude=None):
+ """Analyze source text to find executable lines, excluded lines, etc.
+
+ Source can be provided as `text`, the text itself, or `filename`, from
+ which text will be read. Excluded lines are those that match `exclude`,
+ a regex.
+
+ Return values are 1) a sorted list of executable line numbers,
+ 2) a sorted list of excluded line numbers, and 3) a dict mapping line
+ numbers to pairs (lo,hi) for multi-line statements.
+
+ """
+ self.raw_analyze(text, filename, exclude)
+
+ excluded_lines = self.map_to_first_line(self.excluded)
+ ignore = excluded_lines + list(self.docstrings)
+ lines = self.map_to_first_line(self.statement_starts, ignore)
+
+ return lines, excluded_lines, self.multiline
+
+ def print_analysis(self):
+ """Print the results of the analysis."""
+ for i, ltext in enumerate(self.lines):
+ lineno = i+1
+ m0 = m1 = m2 = ' '
+ if lineno in self.statement_starts:
+ m0 = '-'
+ if lineno in self.docstrings:
+ m1 = '"'
+ if lineno in self.excluded:
+ m2 = 'x'
+ print "%4d %s%s%s %s" % (lineno, m0, m1, m2, ltext)
+
+
+if __name__ == '__main__':
+ import sys
+
+ analyzer = CodeAnalyzer(show_tokens=True)
+ analyzer.raw_analyze(filename=sys.argv[1], exclude=r"no\s*cover")
+ analyzer.print_analysis()
diff --git a/coverage/cmdline.py b/coverage/cmdline.py
new file mode 100644
index 00000000..469338e7
--- /dev/null
+++ b/coverage/cmdline.py
@@ -0,0 +1,149 @@
+"""Command-line support for coverage.py"""
+
+import getopt, os, sys
+
+USAGE = r"""
+Coverage version %(__version__)s
+
+Usage:
+
+coverage -x [-p] MODULE.py [ARG1 ARG2 ...]
+ Execute module, passing the given command-line arguments, collecting
+ coverage data. With the -p option, write to a temporary file containing
+ the machine name and process ID.
+
+coverage -e
+ Erase collected coverage data.
+
+coverage -c
+ Combine data from multiple coverage files (as created by -p option above)
+ and store it into a single file representing the union of the coverage.
+
+coverage -r [-m] [-i] [-o DIR,...] [FILE1 FILE2 ...]
+ Report on the statement coverage for the given files. With the -m
+ option, show line numbers of the statements that weren't executed.
+
+coverage -a [-d DIR] [-i] [-o DIR,...] [FILE1 FILE2 ...]
+ Make annotated copies of the given files, marking statements that
+ are executed with > and statements that are missed with !. With
+ the -d option, make the copies in that directory. Without the -d
+ option, make each copy in the same directory as the original.
+
+-h Print this help.
+
+-i Ignore errors while reporting or annotating.
+
+-o DIR,...
+ Omit reporting or annotating files when their filename path starts with
+ a directory listed in the omit list.
+ e.g. coverage -i -r -o c:\python25,lib\enthought\traits
+
+Coverage data is saved in the file .coverage by default. Set the
+COVERAGE_FILE environment variable to save it somewhere else.
+""".strip()
+
+class CoverageScript:
+ def __init__(self):
+ import coverage
+ self.covpkg = coverage
+ self.coverage = coverage.coverage()
+
+ def help(self, error=None): #pragma: no cover
+ if error:
+ print error
+ print
+ print USAGE % self.covpkg.__dict__
+ sys.exit(1)
+
+ def command_line(self, argv, help_fn=None):
+ # Collect the command-line options.
+ help_fn = help_fn or self.help
+ settings = {}
+ optmap = {
+ '-a': 'annotate',
+ '-c': 'combine',
+ '-d:': 'directory=',
+ '-e': 'erase',
+ '-h': 'help',
+ '-i': 'ignore-errors',
+ '-m': 'show-missing',
+ '-p': 'parallel-mode',
+ '-r': 'report',
+ '-x': 'execute',
+ '-o:': 'omit=',
+ }
+ short_opts = ''.join(map(lambda o: o[1:], optmap.keys()))
+ long_opts = optmap.values()
+ options, args = getopt.getopt(argv, short_opts, long_opts)
+ for o, a in options:
+ if optmap.has_key(o):
+ settings[optmap[o]] = True
+ elif optmap.has_key(o + ':'):
+ settings[optmap[o + ':']] = a
+ elif o[2:] in long_opts:
+ settings[o[2:]] = True
+ elif o[2:] + '=' in long_opts:
+ settings[o[2:]+'='] = a
+
+ if settings.get('help'):
+ help_fn()
+
+ # Check for conflicts and problems in the options.
+ for i in ['erase', 'execute']:
+ for j in ['annotate', 'report', 'combine']:
+ if settings.get(i) and settings.get(j):
+ help_fn("You can't specify the '%s' and '%s' "
+ "options at the same time." % (i, j))
+
+ args_needed = (settings.get('execute')
+ or settings.get('annotate')
+ or settings.get('report'))
+ action = (settings.get('erase')
+ or settings.get('combine')
+ or args_needed)
+ if not action:
+ help_fn("You must specify at least one of -e, -x, -c, -r, or -a.")
+ if not args_needed and args:
+ help_fn("Unexpected arguments: %s" % " ".join(args))
+
+ # Do something.
+ self.coverage.parallel_mode = settings.get('parallel-mode')
+ self.coverage.get_ready()
+
+ if settings.get('erase'):
+ self.coverage.erase()
+ if settings.get('execute'):
+ if not args:
+ help_fn("Nothing to do.")
+ sys.argv = args
+ self.coverage.start()
+ import __main__
+ sys.path[0] = os.path.dirname(sys.argv[0])
+ execfile(sys.argv[0], __main__.__dict__)
+ if settings.get('combine'):
+ self.coverage.combine()
+ if not args:
+ # For report and annotate, if no files are given on the command
+ # line, then report or annotate everything that was executed.
+ args = self.coverage.data.executed.keys() # TODO: Yikes!
+
+ ignore_errors = settings.get('ignore-errors')
+ show_missing = settings.get('show-missing')
+ directory = settings.get('directory=')
+
+ omit = settings.get('omit=')
+ if omit is not None:
+ omit = [self.coverage.abs_file(p) for p in omit.split(',')]
+ else:
+ omit = []
+
+ if settings.get('report'):
+ self.coverage.report(args, show_missing, ignore_errors, omit_prefixes=omit)
+ if settings.get('annotate'):
+ self.coverage.annotate(args, directory, ignore_errors, omit_prefixes=omit)
+
+
+# Main entrypoint. This is installed as the script entrypoint, so don't
+# refactor it away...
+def main():
+ CoverageScript().command_line(sys.argv[1:])
diff --git a/coverage/collector.py b/coverage/collector.py
new file mode 100644
index 00000000..dfcfeb6d
--- /dev/null
+++ b/coverage/collector.py
@@ -0,0 +1,110 @@
+"""Raw data collector for coverage.py."""
+
+import sys, threading
+
+try:
+ # Use the C extension code when we can, for speed.
+ from coverage.tracer import Tracer
+except ImportError:
+ # If we don't have the C tracer, use this Python one.
+ class Tracer:
+ """Python implementation of the raw data tracer."""
+ def __init__(self):
+ self.cur_filename = None
+ self.filename_stack = []
+
+ def _global_trace(self, frame, event, arg_unused):
+ """The trace function passed to sys.settrace."""
+ if event == 'call':
+ filename = frame.f_code.co_filename
+ tracename = self.should_trace_cache.get(filename)
+ if tracename is None:
+ tracename = self.should_trace(filename)
+ self.should_trace_cache[filename] = tracename
+ if tracename:
+ self.filename_stack.append(self.cur_filename)
+ self.cur_filename = tracename
+ return self._local_trace
+ else:
+ return None
+ return self.trace
+
+ def _local_trace(self, frame, event, arg_unused):
+ if event == 'line':
+ self.data[(self.cur_filename, frame.f_lineno)] = True
+ elif event == 'return':
+ self.cur_filename = self.filename_stack.pop()
+ return self._local_trace
+
+ def start(self):
+ sys.settrace(self._global_trace)
+
+ def stop(self):
+ sys.settrace(None)
+
+
+class Collector:
+ """Collects trace data.
+
+ Creates a Tracer object for each thread, since they track stack information.
+ Each Tracer points to the same shared data, contributing traced data points.
+
+ """
+
+ def __init__(self, should_trace):
+ """Create a collector.
+
+ `should_trace` is a function, taking a filename, and returns a
+ canonicalized filename, or False depending on whether the file should be
+ traced or not.
+
+ """
+ self.should_trace = should_trace
+ self.reset()
+
+ def reset(self):
+ # A dictionary with an entry for (Python source file name, line number
+ # in that file) if that line has been executed.
+ self.data = {}
+
+ # A cache of the decision about whether to trace execution in a file.
+ # A dict of filename to boolean.
+ self.should_trace_cache = {}
+
+ def _start_tracer(self):
+ tracer = Tracer()
+ tracer.data = self.data
+ tracer.should_trace = self.should_trace
+ tracer.should_trace_cache = self.should_trace_cache
+ tracer.start()
+ return tracer
+
+ # The trace function has to be set individually on each thread before
+ # execution begins. Ironically, the only support the threading module has
+ # for running code before the thread main is the tracing function. So we
+ # install this as a trace function, and the first time it's called, it does
+ # the real trace installation.
+
+ def _installation_trace(self, frame_unused, event_unused, arg_unused):
+ """Called on new threads, installs the real tracer."""
+ # Remove ourselves as the trace function
+ sys.settrace(None)
+ # Install the real tracer
+ self._start_tracer()
+ # Return None to reiterate that we shouldn't be used for tracing.
+ return None
+
+ def start(self):
+ # Install the tracer on this thread.
+ self.tracer = self._start_tracer()
+ # Install our installation tracer in threading, to jump start other
+ # threads.
+ threading.settrace(self._installation_trace)
+
+ def stop(self):
+ self.tracer.stop()
+ threading.settrace(None)
+
+ def data_points(self):
+ """Return the (filename, lineno) pairs collected."""
+ return self.data.keys()
diff --git a/coverage/control.py b/coverage/control.py
new file mode 100644
index 00000000..78a65a2e
--- /dev/null
+++ b/coverage/control.py
@@ -0,0 +1,410 @@
+"""Core control stuff for coverage.py"""
+
+import glob, os, re, sys, types
+
+from coverage.data import CoverageData
+from coverage.misc import nice_pair, CoverageException
+
+
+class coverage:
+ def __init__(self):
+ from coverage.collector import Collector
+
+ self.parallel_mode = False
+ self.exclude_re = ''
+ self.nesting = 0
+ self.cstack = []
+ self.xstack = []
+ self.relative_dir = self.abs_file(os.curdir)+os.sep
+
+ self.collector = Collector(self.should_trace)
+
+ self.data = CoverageData()
+
+ # Cache of results of calling the analysis2() method, so that you can
+ # specify both -r and -a without doing double work.
+ self.analysis_cache = {}
+
+ # Cache of results of calling the canonical_filename() method, to
+ # avoid duplicating work.
+ self.canonical_filename_cache = {}
+
+ # The default exclude pattern.
+ self.exclude('# *pragma[: ]*[nN][oO] *[cC][oO][vV][eE][rR]')
+
+ # Save coverage data when Python exits.
+ import atexit
+ atexit.register(self.save)
+
+ def should_trace(self, filename):
+ """Decide whether to trace execution in `filename`
+
+ Returns a canonicalized filename if it should be traced, False if it
+ should not.
+ """
+ if filename == '<string>':
+ # There's no point in ever tracing string executions, we can't do
+ # anything with the data later anyway.
+ return False
+ # TODO: flag: ignore std lib?
+ # TODO: ignore by module as well as file?
+ return self.canonical_filename(filename)
+
+ def use_cache(self, usecache, cache_file=None):
+ self.data.usefile(usecache, cache_file)
+
+ def get_ready(self):
+ self.collector.reset()
+ self.data.read(parallel=self.parallel_mode)
+ self.analysis_cache = {}
+
+ def start(self):
+ self.get_ready()
+ if self.nesting == 0: #pragma: no cover
+ self.collector.start()
+ self.nesting += 1
+
+ def stop(self):
+ self.nesting -= 1
+ if self.nesting == 0: #pragma: no cover
+ self.collector.stop()
+
+ def erase(self):
+ self.get_ready()
+ self.collector.reset()
+ self.analysis_cache = {}
+ self.data.erase()
+
+ def exclude(self, regex):
+ if self.exclude_re:
+ self.exclude_re += "|"
+ self.exclude_re += "(" + regex + ")"
+
+ def begin_recursive(self):
+ #self.cstack.append(self.c)
+ self.xstack.append(self.exclude_re)
+
+ def end_recursive(self):
+ #self.c = self.cstack.pop()
+ self.exclude_re = self.xstack.pop()
+
+ def save(self):
+ self.group_collected_data()
+ self.data.write()
+
+ def combine(self):
+ """Entry point for combining together parallel-mode coverage data."""
+ self.data.combine_parallel_data()
+
+ def get_zip_data(self, filename):
+ """ Get data from `filename` if it is a zip file path, or return None
+ if it is not.
+ """
+ import zipimport
+ markers = ['.zip'+os.sep, '.egg'+os.sep]
+ for marker in markers:
+ if marker in filename:
+ parts = filename.split(marker)
+ try:
+ zi = zipimport.zipimporter(parts[0]+marker[:-1])
+ except zipimport.ZipImportError:
+ continue
+ try:
+ data = zi.get_data(parts[1])
+ except IOError:
+ continue
+ return data
+ return None
+
+ def abs_file(self, filename):
+ """ Helper function to turn a filename into an absolute normalized
+ filename.
+ """
+ return os.path.normcase(os.path.abspath(os.path.realpath(filename)))
+
+ def relative_filename(self, filename):
+ """ Convert filename to relative filename from self.relative_dir.
+ """
+ return filename.replace(self.relative_dir, "")
+
+ def canonical_filename(self, filename):
+ """Return a canonical filename for `filename`.
+
+ An absolute path with no redundant components and normalized case.
+
+ """
+ if not self.canonical_filename_cache.has_key(filename):
+ f = filename
+ if os.path.isabs(f) and not os.path.exists(f):
+ if not self.get_zip_data(f):
+ f = os.path.basename(f)
+ if not os.path.isabs(f):
+ for path in [os.curdir] + sys.path:
+ g = os.path.join(path, f)
+ if os.path.exists(g):
+ f = g
+ break
+ cf = self.abs_file(f)
+ self.canonical_filename_cache[filename] = cf
+ return self.canonical_filename_cache[filename]
+
+ def group_collected_data(self):
+ """Group the collected data by filename and reset the collector."""
+ self.data.add_raw_data(self.collector.data_points())
+ self.collector.reset()
+
+ # analyze_morf(morf). Analyze the module or filename passed as
+ # the argument. If the source code can't be found, raise an error.
+ # Otherwise, return a tuple of (1) the canonical filename of the
+ # source code for the module, (2) a list of lines of statements
+ # in the source code, (3) a list of lines of excluded statements,
+ # and (4), a map of line numbers to multi-line line number ranges, for
+ # statements that cross lines.
+
+ # The word "morf" means a module object (from which the source file can
+ # be deduced by suitable manipulation of the __file__ attribute) or a
+ # filename.
+
+ def analyze_morf(self, morf):
+ from coverage.analyzer import CodeAnalyzer
+
+ if self.analysis_cache.has_key(morf):
+ return self.analysis_cache[morf]
+ orig_filename = filename = self.morf_filename(morf)
+ ext = os.path.splitext(filename)[1]
+ source = None
+ if ext == '.pyc':
+ filename = filename[:-1]
+ ext = '.py'
+ if ext == '.py':
+ if not os.path.exists(filename):
+ source = self.get_zip_data(filename)
+ if not source:
+ raise CoverageException(
+ "No source for code '%s'." % orig_filename
+ )
+
+ analyzer = CodeAnalyzer()
+ lines, excluded_lines, line_map = analyzer.analyze_source(
+ text=source, filename=filename, exclude=self.exclude_re
+ )
+
+ result = filename, lines, excluded_lines, line_map
+ self.analysis_cache[morf] = result
+ return result
+
+ # format_lines(statements, lines). Format a list of line numbers
+ # for printing by coalescing groups of lines as long as the lines
+ # represent consecutive statements. This will coalesce even if
+ # there are gaps between statements, so if statements =
+ # [1,2,3,4,5,10,11,12,13,14] and lines = [1,2,5,10,11,13,14] then
+ # format_lines will return "1-2, 5-11, 13-14".
+
+ def format_lines(self, statements, lines):
+ pairs = []
+ i = 0
+ j = 0
+ start = None
+ pairs = []
+ while i < len(statements) and j < len(lines):
+ if statements[i] == lines[j]:
+ if start == None:
+ start = lines[j]
+ end = lines[j]
+ j = j + 1
+ elif start:
+ pairs.append((start, end))
+ start = None
+ i = i + 1
+ if start:
+ pairs.append((start, end))
+ ret = ', '.join(map(nice_pair, pairs))
+ return ret
+
+ # Backward compatibility with version 1.
+ def analysis(self, morf):
+ f, s, _, m, mf = self.analysis2(morf)
+ return f, s, m, mf
+
+ def analysis2(self, morf):
+ filename, statements, excluded, line_map = self.analyze_morf(morf)
+ self.group_collected_data()
+
+ # Identify missing statements.
+ missing = []
+ execed = self.data.executed_lines(filename)
+ for line in statements:
+ lines = line_map.get(line)
+ if lines:
+ for l in range(lines[0], lines[1]+1):
+ if l in execed:
+ break
+ else:
+ missing.append(line)
+ else:
+ if line not in execed:
+ missing.append(line)
+
+ return (filename, statements, excluded, missing,
+ self.format_lines(statements, missing))
+
+ # morf_filename(morf). Return the filename for a module or file.
+
+ def morf_filename(self, morf):
+ if hasattr(morf, '__file__'):
+ f = morf.__file__
+ else:
+ f = morf
+ return self.canonical_filename(f)
+
+ def morf_name(self, morf):
+ """ Return the name of morf as used in report.
+ """
+ if hasattr(morf, '__name__'):
+ return morf.__name__
+ else:
+ return self.relative_filename(os.path.splitext(morf)[0])
+
+ def filter_by_prefix(self, morfs, omit_prefixes):
+ """ Return list of morfs where the morf name does not begin
+ with any one of the omit_prefixes.
+ """
+ filtered_morfs = []
+ for morf in morfs:
+ for prefix in omit_prefixes:
+ if self.morf_name(morf).startswith(prefix):
+ break
+ else:
+ filtered_morfs.append(morf)
+
+ return filtered_morfs
+
+ def morf_name_compare(self, x, y):
+ return cmp(self.morf_name(x), self.morf_name(y))
+
+ def report(self, morfs, show_missing=True, ignore_errors=False, file=None, omit_prefixes=None):
+ if not isinstance(morfs, types.ListType):
+ morfs = [morfs]
+ # On windows, the shell doesn't expand wildcards. Do it here.
+ globbed = []
+ for morf in morfs:
+ if isinstance(morf, basestring) and ('?' in morf or '*' in morf):
+ globbed.extend(glob.glob(morf))
+ else:
+ globbed.append(morf)
+ morfs = globbed
+
+ if omit_prefixes:
+ morfs = self.filter_by_prefix(morfs, omit_prefixes)
+ morfs.sort(self.morf_name_compare)
+
+ max_name = max(5, max(map(len, map(self.morf_name, morfs))))
+ fmt_name = "%%- %ds " % max_name
+ fmt_err = fmt_name + "%s: %s"
+ header = fmt_name % "Name" + " Stmts Exec Cover"
+ fmt_coverage = fmt_name + "% 6d % 6d % 5d%%"
+ if show_missing:
+ header = header + " Missing"
+ fmt_coverage = fmt_coverage + " %s"
+ if not file:
+ file = sys.stdout
+ print >>file, header
+ print >>file, "-" * len(header)
+ total_statements = 0
+ total_executed = 0
+ for morf in morfs:
+ name = self.morf_name(morf)
+ try:
+ _, statements, _, missing, readable = self.analysis2(morf)
+ n = len(statements)
+ m = n - len(missing)
+ if n > 0:
+ pc = 100.0 * m / n
+ else:
+ pc = 100.0
+ args = (name, n, m, pc)
+ if show_missing:
+ args = args + (readable,)
+ print >>file, fmt_coverage % args
+ total_statements = total_statements + n
+ total_executed = total_executed + m
+ except KeyboardInterrupt: #pragma: no cover
+ raise
+ except:
+ if not ignore_errors:
+ typ, msg = sys.exc_info()[:2]
+ print >>file, fmt_err % (name, typ, msg)
+ if len(morfs) > 1:
+ print >>file, "-" * len(header)
+ if total_statements > 0:
+ pc = 100.0 * total_executed / total_statements
+ else:
+ pc = 100.0
+ args = ("TOTAL", total_statements, total_executed, pc)
+ if show_missing:
+ args = args + ("",)
+ print >>file, fmt_coverage % args
+
+ # annotate(morfs, ignore_errors).
+
+ blank_re = re.compile(r"\s*(#|$)")
+ else_re = re.compile(r"\s*else\s*:\s*(#|$)")
+
+ def annotate(self, morfs, directory=None, ignore_errors=False, omit_prefixes=None):
+ if omit_prefixes:
+ morfs = self.filter_by_prefix(morfs, omit_prefixes)
+ for morf in morfs:
+ try:
+ filename, statements, excluded, missing, _ = self.analysis2(morf)
+ self.annotate_file(filename, statements, excluded, missing, directory)
+ except KeyboardInterrupt:
+ raise
+ except:
+ if not ignore_errors:
+ raise
+
+ def annotate_file(self, filename, statements, excluded, missing, directory=None):
+ source = open(filename, 'r')
+ if directory:
+ dest_file = os.path.join(directory,
+ os.path.basename(filename)
+ + ',cover')
+ else:
+ dest_file = filename + ',cover'
+ dest = open(dest_file, 'w')
+ lineno = 0
+ i = 0
+ j = 0
+ covered = True
+ while True:
+ line = source.readline()
+ if line == '':
+ break
+ lineno = lineno + 1
+ while i < len(statements) and statements[i] < lineno:
+ i = i + 1
+ while j < len(missing) and missing[j] < lineno:
+ j = j + 1
+ if i < len(statements) and statements[i] == lineno:
+ covered = j >= len(missing) or missing[j] > lineno
+ if self.blank_re.match(line):
+ dest.write(' ')
+ elif self.else_re.match(line):
+ # Special logic for lines containing only 'else:'.
+ if i >= len(statements) and j >= len(missing):
+ dest.write('! ')
+ elif i >= len(statements) or j >= len(missing):
+ dest.write('> ')
+ elif statements[i] == missing[j]:
+ dest.write('! ')
+ else:
+ dest.write('> ')
+ elif lineno in excluded:
+ dest.write('- ')
+ elif covered:
+ dest.write('> ')
+ else:
+ dest.write('! ')
+ dest.write(line)
+ source.close()
+ dest.close()
diff --git a/coverage/data.py b/coverage/data.py
new file mode 100644
index 00000000..5d14a337
--- /dev/null
+++ b/coverage/data.py
@@ -0,0 +1,122 @@
+"""Coverage data for coverage.py"""
+
+import os, marshal, socket, types
+
+class CoverageData:
+ """Manages collected coverage data."""
+ # Name of the data file (unless environment variable is set).
+ filename_default = ".coverage"
+
+ # Environment variable naming the data file.
+ filename_env = "COVERAGE_FILE"
+
+ def __init__(self):
+ self.filename = None
+ self.use_file = True
+
+ # A map from canonical Python source file name to a dictionary in
+ # which there's an entry for each line number that has been
+ # executed:
+ #
+ # {
+ # 'filename1.py': { 12: True, 47: True, ... },
+ # ...
+ # }
+ #
+ self.executed = {}
+
+ def usefile(self, use_file=True, filename_default=None):
+ self.use_file = use_file
+ if filename_default and not self.filename:
+ self.filename_default = filename_default
+
+ def read(self, parallel=False):
+ """Read coverage data from the coverage data file (if it exists)."""
+ data = {}
+ if self.use_file and not self.filename:
+ self.filename = os.environ.get(
+ self.filename_env, self.filename_default)
+ if parallel:
+ self.filename += "." + socket.gethostname()
+ self.filename += "." + str(os.getpid())
+ if os.path.exists(self.filename):
+ data = self._read_file(self.filename)
+ self.executed = data
+
+ def write(self):
+ """Write the collected coverage data to a file."""
+ if self.use_file and self.filename:
+ self.write_file(self.filename)
+
+ def erase(self):
+ if self.filename and os.path.exists(self.filename):
+ os.remove(self.filename)
+
+ def write_file(self, filename):
+ """Write the coverage data to `filename`."""
+ f = open(filename, 'wb')
+ try:
+ marshal.dump(self.executed, f)
+ finally:
+ f.close()
+
+ def read_file(self, filename):
+ self.executed = self._read_file(filename)
+
+ def _read_file(self, filename):
+ """ Return the stored coverage data from the given file.
+ """
+ try:
+ fdata = open(filename, 'rb')
+ executed = marshal.load(fdata)
+ fdata.close()
+ if isinstance(executed, types.DictType):
+ return executed
+ else:
+ return {}
+ except:
+ return {}
+
+ def combine_parallel_data(self):
+ """ Treat self.filename as a file prefix, and combine the data from all
+ of the files starting with that prefix.
+ """
+ data_dir, local = os.path.split(self.filename)
+ for f in os.listdir(data_dir or '.'):
+ if f.startswith(local):
+ full_path = os.path.join(data_dir, f)
+ file_data = self._read_file(full_path)
+ self._combine_data(file_data)
+
+ def _combine_data(self, new_data):
+ """Combine the `new_data` into `executed`."""
+ for filename, file_data in new_data.items():
+ self.executed.setdefault(filename, {}).update(file_data)
+
+ def add_raw_data(self, data_points):
+ """Add raw data.
+
+ `data_points` is (filename, lineno) pairs.
+
+ """
+ for filename, lineno in data_points:
+ self.executed.setdefault(filename, {})[lineno] = True
+
+ def executed_lines(self, filename):
+ """Return a mapping object such that "lineno in obj" is true if that
+ line number had been executed in `filename`.
+ """
+ # TODO: Write a better description.
+ return self.executed[filename]
+
+ def summary(self):
+ """Return a dict summarizing the coverage data.
+
+ Keys are the basename of the filenames, and values are the number of
+ executed lines. This is useful in the unit tests.
+
+ """
+ summ = {}
+ for filename, lines in self.executed.items():
+ summ[os.path.basename(filename)] = len(lines)
+ return summ
diff --git a/coverage/misc.py b/coverage/misc.py
new file mode 100644
index 00000000..15ddad08
--- /dev/null
+++ b/coverage/misc.py
@@ -0,0 +1,18 @@
+"""Miscellaneous stuff for coverage.py"""
+
+def nice_pair(pair):
+ """Make a nice string representation of a pair of numbers.
+
+ If the numbers are equal, just return the number, otherwise return the pair
+ with a dash between them, indicating the range.
+
+ """
+ start, end = pair
+ if start == end:
+ return "%d" % start
+ else:
+ return "%d-%d" % (start, end)
+
+
+class CoverageException(Exception):
+ pass
diff --git a/coverage/tracer.c b/coverage/tracer.c
new file mode 100644
index 00000000..cd07ded2
--- /dev/null
+++ b/coverage/tracer.c
@@ -0,0 +1,211 @@
+// C-based Tracer for coverage.py
+
+#include "Python.h"
+#include "compile.h" // in 2.3, this wasn't part of Python.h
+#include "eval.h" // or this.
+#include "structmember.h"
+#include "frameobject.h"
+
+// The Tracer type.
+
+typedef struct {
+ PyObject_HEAD
+ PyObject * should_trace;
+ PyObject * data;
+ PyObject * should_trace_cache;
+ int started;
+ // The index of the last-used entry in tracenames.
+ int depth;
+ // Filenames to record at each level, or NULL if not recording.
+ PyObject * tracenames[300];
+} Tracer;
+
+static int
+Tracer_init(Tracer *self, PyObject *args, PyObject *kwds)
+{
+ self->should_trace = NULL;
+ self->data = NULL;
+ self->should_trace_cache = NULL;
+ self->started = 0;
+ self->depth = -1;
+ return 0;
+}
+
+static void
+Tracer_dealloc(Tracer *self)
+{
+ if (self->started) {
+ PyEval_SetTrace(NULL, NULL);
+ }
+
+ Py_XDECREF(self->should_trace);
+ Py_XDECREF(self->data);
+ Py_XDECREF(self->should_trace_cache);
+
+ while (self->depth >= 0) {
+ Py_XDECREF(self->tracenames[self->depth]);
+ self->depth--;
+ }
+
+ self->ob_type->tp_free((PyObject*)self);
+}
+
+static int
+Tracer_trace(Tracer *self, PyFrameObject *frame, int what, PyObject *arg)
+{
+ PyObject * filename = NULL;
+ PyObject * tracename = NULL;
+
+ // printf("trace: %d @ %d\n", what, frame->f_lineno);
+
+ switch (what) {
+ case PyTrace_CALL: // 0
+ self->depth++;
+ if (self->depth > sizeof(self->tracenames)/sizeof(self->tracenames[0])) {
+ PyErr_SetString(PyExc_RuntimeError, "Tracer stack overflow");
+ return -1;
+ }
+ // Check if we should trace this line.
+ filename = frame->f_code->co_filename;
+ tracename = PyDict_GetItem(self->should_trace_cache, filename);
+ if (tracename == NULL) {
+ // We've never considered this file before. Ask should_trace about it.
+ PyObject * args = Py_BuildValue("(O)", filename);
+ tracename = PyObject_Call(self->should_trace, args, NULL);
+ Py_DECREF(args);
+ if (tracename == NULL) {
+ // An error occurred inside should_trace.
+ return -1;
+ }
+ PyDict_SetItem(self->should_trace_cache, filename, tracename);
+ }
+ else {
+ Py_INCREF(tracename);
+ }
+
+ // If tracename is a string, then we're supposed to trace.
+ self->tracenames[self->depth] = PyString_Check(tracename) ? tracename : NULL;
+ break;
+
+ case PyTrace_RETURN: // 3
+ if (self->depth >= 0) {
+ Py_XDECREF(self->tracenames[self->depth]);
+ self->depth--;
+ }
+ break;
+
+ case PyTrace_LINE: // 2
+ if (self->depth >= 0) {
+ if (self->tracenames[self->depth]) {
+ PyObject * t = PyTuple_New(2);
+ tracename = self->tracenames[self->depth];
+ Py_INCREF(tracename);
+ PyTuple_SetItem(t, 0, tracename);
+ PyTuple_SetItem(t, 1, PyInt_FromLong(frame->f_lineno));
+ Py_INCREF(Py_None);
+ PyDict_SetItem(self->data, t, Py_None);
+ Py_DECREF(t);
+ }
+ }
+ break;
+ }
+
+ return 0;
+}
+
+static PyObject *
+Tracer_start(Tracer *self, PyObject *args)
+{
+ PyEval_SetTrace((Py_tracefunc)Tracer_trace, (PyObject*)self);
+ self->started = 1;
+ return Py_BuildValue("");
+}
+
+static PyObject *
+Tracer_stop(Tracer *self, PyObject *args)
+{
+ if (self->started) {
+ PyEval_SetTrace(NULL, NULL);
+ self->started = 0;
+ }
+ return Py_BuildValue("");
+}
+
+static PyMemberDef
+Tracer_members[] = {
+ { "should_trace", T_OBJECT, offsetof(Tracer, should_trace), 0, "Function indicating whether to trace a file." },
+ { "data", T_OBJECT, offsetof(Tracer, data), 0, "The raw dictionary of trace data." },
+ { "should_trace_cache", T_OBJECT, offsetof(Tracer, should_trace_cache), 0, "Dictionary caching should_trace results." },
+ { NULL }
+};
+
+static PyMethodDef
+Tracer_methods[] = {
+ { "start", (PyCFunction) Tracer_start, METH_VARARGS, "Start the tracer" },
+ { "stop", (PyCFunction) Tracer_stop, METH_VARARGS, "Stop the tracer" },
+ { NULL }
+};
+
+static PyTypeObject
+TracerType = {
+ PyObject_HEAD_INIT(NULL)
+ 0, /*ob_size*/
+ "coverage.Tracer", /*tp_name*/
+ sizeof(Tracer), /*tp_basicsize*/
+ 0, /*tp_itemsize*/
+ (destructor)Tracer_dealloc, /*tp_dealloc*/
+ 0, /*tp_print*/
+ 0, /*tp_getattr*/
+ 0, /*tp_setattr*/
+ 0, /*tp_compare*/
+ 0, /*tp_repr*/
+ 0, /*tp_as_number*/
+ 0, /*tp_as_sequence*/
+ 0, /*tp_as_mapping*/
+ 0, /*tp_hash */
+ 0, /*tp_call*/
+ 0, /*tp_str*/
+ 0, /*tp_getattro*/
+ 0, /*tp_setattro*/
+ 0, /*tp_as_buffer*/
+ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /*tp_flags*/
+ "Tracer objects", /* tp_doc */
+ 0, /* tp_traverse */
+ 0, /* tp_clear */
+ 0, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ 0, /* tp_iter */
+ 0, /* tp_iternext */
+ Tracer_methods, /* tp_methods */
+ Tracer_members, /* tp_members */
+ 0, /* tp_getset */
+ 0, /* tp_base */
+ 0, /* tp_dict */
+ 0, /* tp_descr_get */
+ 0, /* tp_descr_set */
+ 0, /* tp_dictoffset */
+ (initproc)Tracer_init, /* tp_init */
+ 0, /* tp_alloc */
+ 0, /* tp_new */
+};
+
+// Module definition
+
+void
+inittracer(void)
+{
+ PyObject* mod;
+
+ mod = Py_InitModule3("coverage.tracer", NULL, "Fast coverage tracer.");
+ if (mod == NULL) {
+ return;
+ }
+
+ TracerType.tp_new = PyType_GenericNew;
+ if (PyType_Ready(&TracerType) < 0) {
+ return;
+ }
+
+ Py_INCREF(&TracerType);
+ PyModule_AddObject(mod, "Tracer", (PyObject *)&TracerType);
+}