summaryrefslogtreecommitdiff
path: root/coverage/parser.py
diff options
context:
space:
mode:
Diffstat (limited to 'coverage/parser.py')
-rw-r--r--coverage/parser.py361
1 files changed, 252 insertions, 109 deletions
diff --git a/coverage/parser.py b/coverage/parser.py
index c3dba830..590eacee 100644
--- a/coverage/parser.py
+++ b/coverage/parser.py
@@ -15,8 +15,8 @@ from coverage.backward import range # pylint: disable=redefined-builtin
from coverage.backward import bytes_to_ints, string_class
from coverage.bytecode import CodeObjects
from coverage.debug import short_stack
-from coverage.misc import contract, new_contract, nice_pair, join_regex
-from coverage.misc import CoverageException, NoSource, NotPython
+from coverage.misc import contract, join_regex, new_contract, nice_pair, one_of
+from coverage.misc import NoSource, NotPython, StopEverything
from coverage.phystokens import compile_unicode, generate_tokens, neuter_encoding_declaration
@@ -106,7 +106,6 @@ class PythonParser(object):
"""
combined = join_regex(regexes)
if env.PY2:
- # pylint: disable=redefined-variable-type
combined = combined.decode("utf8")
regex_c = re.compile(combined)
matches = set()
@@ -138,7 +137,7 @@ class PythonParser(object):
tokgen = generate_tokens(self.text)
for toktype, ttext, (slineno, _), (elineno, _), ltext in tokgen:
- if self.show_tokens: # pragma: not covered
+ if self.show_tokens: # pragma: debugging
print("%10s %5s %-20r %r" % (
tokenize.tok_name.get(toktype, toktype),
nice_pair((slineno, elineno)), ttext, ltext
@@ -371,11 +370,11 @@ class ByteParser(object):
# Alternative Python implementations don't always provide all the
# attributes on code objects that we need to do the analysis.
- for attr in ['co_lnotab', 'co_firstlineno', 'co_consts']:
+ for attr in ['co_lnotab', 'co_firstlineno']:
if not hasattr(self.code, attr):
- raise CoverageException(
+ raise StopEverything( # pragma: only jython
"This implementation of Python doesn't support code analysis.\n"
- "Run coverage.py under CPython for this command."
+ "Run coverage.py under another Python for this command."
)
def child_parsers(self):
@@ -433,23 +432,35 @@ class ByteParser(object):
class LoopBlock(object):
"""A block on the block stack representing a `for` or `while` loop."""
+ @contract(start=int)
def __init__(self, start):
+ # The line number where the loop starts.
self.start = start
+ # A set of ArcStarts, the arcs from break statements exiting this loop.
self.break_exits = set()
class FunctionBlock(object):
"""A block on the block stack representing a function definition."""
+ @contract(start=int, name=str)
def __init__(self, start, name):
+ # The line number where the function starts.
self.start = start
+ # The name of the function.
self.name = name
class TryBlock(object):
"""A block on the block stack representing a `try` block."""
- def __init__(self, handler_start=None, final_start=None):
+ @contract(handler_start='int|None', final_start='int|None')
+ def __init__(self, handler_start, final_start):
+ # The line number of the first "except" handler, if any.
self.handler_start = handler_start
+ # The line number of the "finally:" clause, if any.
self.final_start = final_start
+
+ # The ArcStarts for breaks/continues/returns/raises inside the "try:"
+ # that need to route through the "finally:" clause.
self.break_from = set()
self.continue_from = set()
self.return_from = set()
@@ -459,8 +470,13 @@ class TryBlock(object):
class ArcStart(collections.namedtuple("Arc", "lineno, cause")):
"""The information needed to start an arc.
- `lineno` is the line number the arc starts from. `cause` is a fragment
- used as the startmsg for AstArcAnalyzer.missing_arc_fragments.
+ `lineno` is the line number the arc starts from.
+
+ `cause` is an English text fragment used as the `startmsg` for
+ AstArcAnalyzer.missing_arc_fragments. It will be used to describe why an
+ arc wasn't executed, so should fit well into a sentence of the form,
+ "Line 17 didn't run because {cause}." The fragment can include "{lineno}"
+ to have `lineno` interpolated into it.
"""
def __new__(cls, lineno, cause=None):
@@ -472,6 +488,21 @@ class ArcStart(collections.namedtuple("Arc", "lineno, cause")):
new_contract('ArcStarts', lambda seq: all(isinstance(x, ArcStart) for x in seq))
+# Turn on AST dumps with an environment variable.
+AST_DUMP = bool(int(os.environ.get("COVERAGE_AST_DUMP", 0)))
+
+class NodeList(object):
+ """A synthetic fictitious node, containing a sequence of nodes.
+
+ This is used when collapsing optimized if-statements, to represent the
+ unconditional execution of one of the clauses.
+
+ """
+ def __init__(self, body):
+ self.body = body
+ self.lineno = body[0].lineno
+
+
class AstArcAnalyzer(object):
"""Analyze source text with an AST to find executable code paths."""
@@ -482,15 +513,17 @@ class AstArcAnalyzer(object):
self.statements = set(multiline.get(l, l) for l in statements)
self.multiline = multiline
- if int(os.environ.get("COVERAGE_ASTDUMP", 0)): # pragma: debugging
+ if AST_DUMP: # pragma: debugging
# Dump the AST so that failing tests have helpful output.
- print("Statements: {}".format(self.statements))
- print("Multiline map: {}".format(self.multiline))
+ print("Statements: {0}".format(self.statements))
+ print("Multiline map: {0}".format(self.multiline))
ast_dump(self.root_node)
self.arcs = set()
- # A map from arc pairs to a pair of sentence fragments: (startmsg, endmsg).
+ # A map from arc pairs to a list of pairs of sentence fragments:
+ # { (start, end): [(startmsg, endmsg), ...], }
+ #
# For an arc from line 17, they should be usable like:
# "Line 17 {endmsg}, because {startmsg}"
self.missing_arc_fragments = collections.defaultdict(list)
@@ -513,7 +546,7 @@ class AstArcAnalyzer(object):
def add_arc(self, start, end, smsg=None, emsg=None):
"""Add an arc, including message fragments to use if it is missing."""
- if self.debug:
+ if self.debug: # pragma: debugging
print("\nAdding arc: ({}, {}): {!r}, {!r}".format(start, end, smsg, emsg))
print(short_stack(limit=6))
self.arcs.add((start, end))
@@ -564,9 +597,10 @@ class AstArcAnalyzer(object):
if node.body:
return self.line_for_node(node.body[0])
else:
- # Modules have no line number, they always start at 1.
+ # Empty modules have no line number, they always start at 1.
return 1
+ # The node types that just flow to the next node with no complications.
OK_TO_DEFAULT = set([
"Assign", "Assert", "AugAssign", "Delete", "Exec", "Expr", "Global",
"Import", "ImportFrom", "Nonlocal", "Pass", "Print",
@@ -576,20 +610,35 @@ class AstArcAnalyzer(object):
def add_arcs(self, node):
"""Add the arcs for `node`.
- Return a set of ArcStarts, exits from this node to the next.
+ Return a set of ArcStarts, exits from this node to the next. Because a
+ node represents an entire sub-tree (including its children), the exits
+ from a node can be arbitrarily complex::
+
+ if something(1):
+ if other(2):
+ doit(3)
+ else:
+ doit(5)
+
+ There are two exits from line 1: they start at line 3 and line 5.
"""
node_name = node.__class__.__name__
handler = getattr(self, "_handle__" + node_name, None)
if handler is not None:
return handler(node)
+ else:
+ # No handler: either it's something that's ok to default (a simple
+ # statement), or it's something we overlooked. Change this 0 to 1
+ # to see if it's overlooked.
+ if 0:
+ if node_name not in self.OK_TO_DEFAULT:
+ print("*** Unhandled: {0}".format(node))
- if 0:
- node_name = node.__class__.__name__
- if node_name not in self.OK_TO_DEFAULT:
- print("*** Unhandled: {0}".format(node))
- return set([ArcStart(self.line_for_node(node), cause=None)])
+ # Default for simple statements: one exit from this node.
+ return set([ArcStart(self.line_for_node(node))])
+ @one_of("from_start, prev_starts")
@contract(returns='ArcStarts')
def add_body_arcs(self, body, from_start=None, prev_starts=None):
"""Add arcs for the body of a compound statement.
@@ -608,28 +657,91 @@ class AstArcAnalyzer(object):
lineno = self.line_for_node(body_node)
first_line = self.multiline.get(lineno, lineno)
if first_line not in self.statements:
- continue
+ body_node = self.find_non_missing_node(body_node)
+ if body_node is None:
+ continue
+ lineno = self.line_for_node(body_node)
for prev_start in prev_starts:
self.add_arc(prev_start.lineno, lineno, prev_start.cause)
prev_starts = self.add_arcs(body_node)
return prev_starts
+ def find_non_missing_node(self, node):
+ """Search `node` looking for a child that has not been optimized away.
+
+ This might return the node you started with, or it will work recursively
+ to find a child node in self.statements.
+
+ Returns a node, or None if none of the node remains.
+
+ """
+ # This repeats work just done in add_body_arcs, but this duplication
+ # means we can avoid a function call in the 99.9999% case of not
+ # optimizing away statements.
+ lineno = self.line_for_node(node)
+ first_line = self.multiline.get(lineno, lineno)
+ if first_line in self.statements:
+ return node
+
+ missing_fn = getattr(self, "_missing__" + node.__class__.__name__, None)
+ if missing_fn:
+ node = missing_fn(node)
+ else:
+ node = None
+ return node
+
+ def _missing__If(self, node):
+ # If the if-node is missing, then one of its children might still be
+ # here, but not both. So return the first of the two that isn't missing.
+ # Use a NodeList to hold the clauses as a single node.
+ non_missing = self.find_non_missing_node(NodeList(node.body))
+ if non_missing:
+ return non_missing
+ if node.orelse:
+ return self.find_non_missing_node(NodeList(node.orelse))
+ return None
+
+ def _missing__NodeList(self, node):
+ # A NodeList might be a mixture of missing and present nodes. Find the
+ # ones that are present.
+ non_missing_children = []
+ for child in node.body:
+ child = self.find_non_missing_node(child)
+ if child is not None:
+ non_missing_children.append(child)
+
+ # Return the simplest representation of the present children.
+ if not non_missing_children:
+ return None
+ if len(non_missing_children) == 1:
+ return non_missing_children[0]
+ return NodeList(non_missing_children)
+
def is_constant_expr(self, node):
"""Is this a compile-time constant?"""
node_name = node.__class__.__name__
if node_name in ["NameConstant", "Num"]:
- return True
+ return "Num"
elif node_name == "Name":
- if env.PY3 and node.id in ["True", "False", "None"]:
- return True
- return False
-
- # tests to write:
- # TODO: while EXPR:
- # TODO: while False:
- # TODO: listcomps hidden deep in other expressions
- # TODO: listcomps hidden in lists: x = [[i for i in range(10)]]
- # TODO: nested function definitions
+ if node.id in ["True", "False", "None", "__debug__"]:
+ return "Name"
+ return None
+
+ # In the fullness of time, these might be good tests to write:
+ # while EXPR:
+ # while False:
+ # listcomps hidden deep in other expressions
+ # listcomps hidden in lists: x = [[i for i in range(10)]]
+ # nested function definitions
+
+
+ # Exit processing: process_*_exits
+ #
+ # These functions process the four kinds of jump exits: break, continue,
+ # raise, and return. To figure out where an exit goes, we have to look at
+ # the block stack context. For example, a break will jump to the nearest
+ # enclosing loop block, or the nearest enclosing finally block, whichever
+ # is nearer.
@contract(exits='ArcStarts')
def process_break_exits(self, exits):
@@ -689,7 +801,14 @@ class AstArcAnalyzer(object):
)
break
- ## Handlers
+
+ # Handlers: _handle__*
+ #
+ # Each handler deals with a specific AST node type, dispatched from
+ # add_arcs. Each deals with a particular kind of node type, and returns
+ # the set of exits from that node. These functions mirror the Python
+ # semantics of each syntactic construct. See the docstring for add_arcs to
+ # understand the concept of exits from a node.
@contract(returns='ArcStarts')
def _handle__Break(self, node):
@@ -719,7 +838,7 @@ class AstArcAnalyzer(object):
self.add_arc(last, lineno)
last = lineno
# The body is handled in collect_arcs.
- return set([ArcStart(last, cause=None)])
+ return set([ArcStart(last)])
_handle__ClassDef = _handle_decorated
@@ -746,7 +865,7 @@ class AstArcAnalyzer(object):
else_exits = self.add_body_arcs(node.orelse, from_start=from_start)
exits |= else_exits
else:
- # no else clause: exit from the for line.
+ # No else clause: exit from the for line.
exits.add(from_start)
return exits
@@ -765,6 +884,12 @@ class AstArcAnalyzer(object):
return exits
@contract(returns='ArcStarts')
+ def _handle__NodeList(self, node):
+ start = self.line_for_node(node)
+ exits = self.add_body_arcs(node.body, from_start=ArcStart(start))
+ return exits
+
+ @contract(returns='ArcStarts')
def _handle__Raise(self, node):
here = self.line_for_node(node)
raise_start = ArcStart(here, cause="the raise on line {lineno} wasn't executed")
@@ -792,11 +917,11 @@ class AstArcAnalyzer(object):
else:
final_start = None
- try_block = TryBlock(handler_start=handler_start, final_start=final_start)
+ try_block = TryBlock(handler_start, final_start)
self.block_stack.append(try_block)
start = self.line_for_node(node)
- exits = self.add_body_arcs(node.body, from_start=ArcStart(start, cause=None))
+ exits = self.add_body_arcs(node.body, from_start=ArcStart(start))
# We're done with the `try` body, so this block no longer handles
# exceptions. We keep the block so the `finally` clause can pick up
@@ -839,30 +964,46 @@ class AstArcAnalyzer(object):
try_block.return_from # or a `return`.
)
- exits = self.add_body_arcs(node.finalbody, prev_starts=final_from)
+ final_exits = self.add_body_arcs(node.finalbody, prev_starts=final_from)
+
if try_block.break_from:
- break_exits = self._combine_finally_starts(try_block.break_from, exits)
- self.process_break_exits(break_exits)
+ self.process_break_exits(
+ self._combine_finally_starts(try_block.break_from, final_exits)
+ )
if try_block.continue_from:
- continue_exits = self._combine_finally_starts(try_block.continue_from, exits)
- self.process_continue_exits(continue_exits)
+ self.process_continue_exits(
+ self._combine_finally_starts(try_block.continue_from, final_exits)
+ )
if try_block.raise_from:
- raise_exits = self._combine_finally_starts(try_block.raise_from, exits)
- self.process_raise_exits(raise_exits)
+ self.process_raise_exits(
+ self._combine_finally_starts(try_block.raise_from, final_exits)
+ )
if try_block.return_from:
- return_exits = self._combine_finally_starts(try_block.return_from, exits)
- self.process_return_exits(return_exits)
+ self.process_return_exits(
+ self._combine_finally_starts(try_block.return_from, final_exits)
+ )
+
+ if exits:
+ # The finally clause's exits are only exits for the try block
+ # as a whole if the try block had some exits to begin with.
+ exits = final_exits
return exits
+ @contract(starts='ArcStarts', exits='ArcStarts', returns='ArcStarts')
def _combine_finally_starts(self, starts, exits):
- """Helper for building the cause of `finally` branches."""
+ """Helper for building the cause of `finally` branches.
+
+ "finally" clauses might not execute their exits, and the causes could
+ be due to a failure to execute any of the exits in the try block. So
+ we use the causes from `starts` as the causes for `exits`.
+ """
causes = []
- for lineno, cause in sorted(starts):
- if cause is not None:
- causes.append(cause.format(lineno=lineno))
+ for start in sorted(starts):
+ if start.cause is not None:
+ causes.append(start.cause.format(lineno=start.lineno))
cause = " or ".join(causes)
- exits = set(ArcStart(ex.lineno, cause) for ex in exits)
+ exits = set(ArcStart(xit.lineno, cause) for xit in exits)
return exits
@contract(returns='ArcStarts')
@@ -894,9 +1035,9 @@ class AstArcAnalyzer(object):
def _handle__While(self, node):
constant_test = self.is_constant_expr(node.test)
start = to_top = self.line_for_node(node.test)
- if constant_test:
+ if constant_test and (env.PY3 or constant_test == "Num"):
to_top = self.line_for_node(node.body[0])
- self.block_stack.append(LoopBlock(start=start))
+ self.block_stack.append(LoopBlock(start=to_top))
from_start = ArcStart(start, cause="the condition on line {lineno} was never true")
exits = self.add_body_arcs(node.body, from_start=from_start)
for xit in exits:
@@ -971,62 +1112,64 @@ class AstArcAnalyzer(object):
_code_object__ListComp = _make_oneline_code_method("list comprehension")
-SKIP_DUMP_FIELDS = ["ctx"]
+if AST_DUMP: # pragma: debugging
+ # Code only used when dumping the AST for debugging.
-def _is_simple_value(value):
- """Is `value` simple enough to be displayed on a single line?"""
- return (
- value in [None, [], (), {}, set()] or
- isinstance(value, (string_class, int, float))
- )
+ SKIP_DUMP_FIELDS = ["ctx"]
-# TODO: a test of ast_dump?
-def ast_dump(node, depth=0):
- """Dump the AST for `node`.
+ def _is_simple_value(value):
+ """Is `value` simple enough to be displayed on a single line?"""
+ return (
+ value in [None, [], (), {}, set()] or
+ isinstance(value, (string_class, int, float))
+ )
- This recursively walks the AST, printing a readable version.
+ def ast_dump(node, depth=0):
+ """Dump the AST for `node`.
- """
- indent = " " * depth
- if not isinstance(node, ast.AST):
- print("{0}<{1} {2!r}>".format(indent, node.__class__.__name__, node))
- return
-
- lineno = getattr(node, "lineno", None)
- if lineno is not None:
- linemark = " @ {0}".format(node.lineno)
- else:
- linemark = ""
- head = "{0}<{1}{2}".format(indent, node.__class__.__name__, linemark)
-
- named_fields = [
- (name, value)
- for name, value in ast.iter_fields(node)
- if name not in SKIP_DUMP_FIELDS
- ]
- if not named_fields:
- print("{0}>".format(head))
- elif len(named_fields) == 1 and _is_simple_value(named_fields[0][1]):
- field_name, value = named_fields[0]
- print("{0} {1}: {2!r}>".format(head, field_name, value))
- else:
- print(head)
- if 0:
- print("{0}# mro: {1}".format(
- indent, ", ".join(c.__name__ for c in node.__class__.__mro__[1:]),
- ))
- next_indent = indent + " "
- for field_name, value in named_fields:
- prefix = "{0}{1}:".format(next_indent, field_name)
- if _is_simple_value(value):
- print("{0} {1!r}".format(prefix, value))
- elif isinstance(value, list):
- print("{0} [".format(prefix))
- for n in value:
- ast_dump(n, depth + 8)
- print("{0}]".format(next_indent))
- else:
- print(prefix)
- ast_dump(value, depth + 8)
+ This recursively walks the AST, printing a readable version.
+
+ """
+ indent = " " * depth
+ if not isinstance(node, ast.AST):
+ print("{0}<{1} {2!r}>".format(indent, node.__class__.__name__, node))
+ return
+
+ lineno = getattr(node, "lineno", None)
+ if lineno is not None:
+ linemark = " @ {0}".format(node.lineno)
+ else:
+ linemark = ""
+ head = "{0}<{1}{2}".format(indent, node.__class__.__name__, linemark)
+
+ named_fields = [
+ (name, value)
+ for name, value in ast.iter_fields(node)
+ if name not in SKIP_DUMP_FIELDS
+ ]
+ if not named_fields:
+ print("{0}>".format(head))
+ elif len(named_fields) == 1 and _is_simple_value(named_fields[0][1]):
+ field_name, value = named_fields[0]
+ print("{0} {1}: {2!r}>".format(head, field_name, value))
+ else:
+ print(head)
+ if 0:
+ print("{0}# mro: {1}".format(
+ indent, ", ".join(c.__name__ for c in node.__class__.__mro__[1:]),
+ ))
+ next_indent = indent + " "
+ for field_name, value in named_fields:
+ prefix = "{0}{1}:".format(next_indent, field_name)
+ if _is_simple_value(value):
+ print("{0} {1!r}".format(prefix, value))
+ elif isinstance(value, list):
+ print("{0} [".format(prefix))
+ for n in value:
+ ast_dump(n, depth + 8)
+ print("{0}]".format(next_indent))
+ else:
+ print(prefix)
+ ast_dump(value, depth + 8)
- print("{0}>".format(indent))
+ print("{0}>".format(indent))