summaryrefslogtreecommitdiff
path: root/coverage/parser.py
diff options
context:
space:
mode:
Diffstat (limited to 'coverage/parser.py')
-rw-r--r--coverage/parser.py126
1 files changed, 83 insertions, 43 deletions
diff --git a/coverage/parser.py b/coverage/parser.py
index b73ac68..56ef528 100644
--- a/coverage/parser.py
+++ b/coverage/parser.py
@@ -356,6 +356,24 @@ class SetSpy(object): # pragma: debugging
self.the_set.add(arc)
+class ArcStart(collections.namedtuple("Arc", "lineno, cause")):
+ """The information needed to start an arc.
+
+ `lineno` is the line number the arc starts from. `cause` is a phrase used
+ to describe the arc if it is missing.
+
+ """
+ def __new__(cls, lineno, cause=None):
+ self = super(ArcStart, cls).__new__(cls, lineno, cause)
+ return self
+
+if env.TESTING:
+ from contracts import new_contract
+
+ # Define contract words that PyContract doesn't have.
+ new_contract('ArcStarts', lambda seq: all(isinstance(x, ArcStart) for x in seq))
+
+
class AstArcAnalyzer(object):
"""Analyze source text with an AST to find executable code paths."""
@@ -401,6 +419,7 @@ class AstArcAnalyzer(object):
"""Yield the blocks in nearest-to-farthest order."""
return reversed(self.block_stack)
+ @contract(returns=int)
def line_for_node(self, node):
"""What is the right line number to use for this node?
@@ -447,10 +466,12 @@ class AstArcAnalyzer(object):
"Import", "ImportFrom", "Nonlocal", "Pass", "Print",
])
+ @contract(returns='ArcStarts')
def add_arcs(self, node):
"""Add the arcs for `node`.
- Return a set of line numbers, exits from this node to the next.
+ Return a set of ArcStarts, exits from this node to the next.
+
"""
node_name = node.__class__.__name__
handler = getattr(self, "_handle__" + node_name, None)
@@ -461,29 +482,31 @@ class AstArcAnalyzer(object):
node_name = node.__class__.__name__
if node_name not in self.OK_TO_DEFAULT:
print("*** Unhandled: {0}".format(node))
- return set([self.line_for_node(node)])
+ return set([ArcStart(self.line_for_node(node))])
- def add_body_arcs(self, body, from_line=None, prev_lines=None):
+ @contract(returns='ArcStarts')
+ def add_body_arcs(self, body, from_start=None, prev_starts=None):
"""Add arcs for the body of a compound statement.
- `body` is the body node. `from_line` is a single line that can be the
- previous line in flow before this body. `prev_lines` is a set of lines
- that can be the previous line. Only one of them should be given.
+ `body` is the body node. `from_start` is a single `ArcStart` that can
+ be the previous line in flow before this body. `prev_starts` is a set
+ of ArcStarts that can be the previous line. Only one of them should be
+ given.
- Returns a set of lines, the exits from this body.
+ Returns a set of ArcStarts, the exits from this body.
"""
- if prev_lines is None:
- prev_lines = set([from_line])
+ if prev_starts is None:
+ prev_starts = set([from_start])
for body_node in body:
lineno = self.line_for_node(body_node)
first_line = self.multiline.get(lineno, lineno)
if first_line not in self.statements:
continue
- for prev_lineno in prev_lines:
- self.arcs.add((prev_lineno, lineno))
- prev_lines = self.add_arcs(body_node)
- return prev_lines
+ for prev_start in prev_starts:
+ self.arcs.add((prev_start.lineno, lineno))
+ prev_starts = self.add_arcs(body_node)
+ return prev_starts
def is_constant_expr(self, node):
"""Is this a compile-time constant?"""
@@ -502,6 +525,7 @@ class AstArcAnalyzer(object):
# TODO: listcomps hidden in lists: x = [[i for i in range(10)]]
# TODO: nested function definitions
+ @contract(exits='ArcStarts')
def process_break_exits(self, exits):
"""Add arcs due to jumps from `exits` being breaks."""
for block in self.nearest_blocks():
@@ -512,33 +536,36 @@ class AstArcAnalyzer(object):
block.break_from.update(exits)
break
+ @contract(exits='ArcStarts')
def process_continue_exits(self, exits):
"""Add arcs due to jumps from `exits` being continues."""
for block in self.nearest_blocks():
if isinstance(block, LoopBlock):
for xit in exits:
- self.arcs.add((xit, block.start))
+ self.arcs.add((xit.lineno, block.start))
break
elif isinstance(block, TryBlock) and block.final_start is not None:
block.continue_from.update(exits)
break
+ @contract(exits='ArcStarts')
def process_raise_exits(self, exits):
"""Add arcs due to jumps from `exits` being raises."""
for block in self.nearest_blocks():
if isinstance(block, TryBlock):
if block.handler_start is not None:
for xit in exits:
- self.arcs.add((xit, block.handler_start))
+ self.arcs.add((xit.lineno, block.handler_start))
break
elif block.final_start is not None:
block.raise_from.update(exits)
break
elif isinstance(block, FunctionBlock):
for xit in exits:
- self.arcs.add((xit, -block.start))
+ self.arcs.add((xit.lineno, -block.start))
break
+ @contract(exits='ArcStarts')
def process_return_exits(self, exits):
"""Add arcs due to jumps from `exits` being returns."""
for block in self.nearest_blocks():
@@ -547,16 +574,18 @@ class AstArcAnalyzer(object):
break
elif isinstance(block, FunctionBlock):
for xit in exits:
- self.arcs.add((xit, -block.start))
+ self.arcs.add((xit.lineno, -block.start))
break
## Handlers
+ @contract(returns='ArcStarts')
def _handle__Break(self, node):
here = self.line_for_node(node)
- self.process_break_exits([here])
+ self.process_break_exits([ArcStart(here)])
return set()
+ @contract(returns='ArcStarts')
def _handle_decorated(self, node):
"""Add arcs for things that can be decorated (classes and functions)."""
last = self.line_for_node(node)
@@ -577,29 +606,32 @@ class AstArcAnalyzer(object):
self.arcs.add((last, lineno))
last = lineno
# The body is handled in collect_arcs.
- return set([last])
+ return set([ArcStart(last)])
_handle__ClassDef = _handle_decorated
+ @contract(returns='ArcStarts')
def _handle__Continue(self, node):
here = self.line_for_node(node)
- self.process_continue_exits([here])
+ self.process_continue_exits([ArcStart(here)])
return set()
+ @contract(returns='ArcStarts')
def _handle__For(self, node):
start = self.line_for_node(node.iter)
self.block_stack.append(LoopBlock(start=start))
- exits = self.add_body_arcs(node.body, from_line=start)
+ exits = self.add_body_arcs(node.body, from_start=ArcStart(start, "didn't enter the loop"))
for xit in exits:
- self.arcs.add((xit, start))
+ self.arcs.add((xit.lineno, start))
my_block = self.block_stack.pop()
exits = my_block.break_exits
+ from_start = ArcStart(start, "didn't finish naturally")
if node.orelse:
- else_exits = self.add_body_arcs(node.orelse, from_line=start)
+ else_exits = self.add_body_arcs(node.orelse, from_start=from_start)
exits |= else_exits
else:
# no else clause: exit from the for line.
- exits.add(start)
+ exits.add(from_start)
return exits
_handle__AsyncFor = _handle__For
@@ -607,23 +639,27 @@ class AstArcAnalyzer(object):
_handle__FunctionDef = _handle_decorated
_handle__AsyncFunctionDef = _handle_decorated
+ @contract(returns='ArcStarts')
def _handle__If(self, node):
start = self.line_for_node(node.test)
- exits = self.add_body_arcs(node.body, from_line=start)
- exits |= self.add_body_arcs(node.orelse, from_line=start)
+ exits = self.add_body_arcs(node.body, from_start=ArcStart(start, "the condition wasn't true"))
+ exits |= self.add_body_arcs(node.orelse, from_start=ArcStart(start, "the condition wasn't false"))
return exits
+ @contract(returns='ArcStarts')
def _handle__Raise(self, node):
# `raise` statement jumps away, no exits from here.
here = self.line_for_node(node)
- self.process_raise_exits([here])
+ self.process_raise_exits([ArcStart(here)])
return set()
+ @contract(returns='ArcStarts')
def _handle__Return(self, node):
here = self.line_for_node(node)
- self.process_return_exits([here])
+ self.process_return_exits([ArcStart(here)])
return set()
+ @contract(returns='ArcStarts')
def _handle__Try(self, node):
if node.handlers:
handler_start = self.line_for_node(node.handlers[0])
@@ -639,7 +675,7 @@ class AstArcAnalyzer(object):
self.block_stack.append(try_block)
start = self.line_for_node(node)
- exits = self.add_body_arcs(node.body, from_line=start)
+ exits = self.add_body_arcs(node.body, from_start=ArcStart(start))
# We're done with the `try` body, so this block no longer handles
# exceptions. We keep the block so the `finally` clause can pick up
@@ -663,10 +699,10 @@ class AstArcAnalyzer(object):
if last_handler_start is not None:
self.arcs.add((last_handler_start, handler_start))
last_handler_start = handler_start
- handler_exits |= self.add_body_arcs(handler_node.body, from_line=handler_start)
+ handler_exits |= self.add_body_arcs(handler_node.body, from_start=ArcStart(handler_start))
if node.orelse:
- exits = self.add_body_arcs(node.orelse, prev_lines=exits)
+ exits = self.add_body_arcs(node.orelse, prev_starts=exits)
exits |= handler_exits
@@ -680,7 +716,7 @@ class AstArcAnalyzer(object):
try_block.return_from # or a `return`.
)
- exits = self.add_body_arcs(node.finalbody, prev_lines=final_from)
+ exits = self.add_body_arcs(node.finalbody, prev_starts=final_from)
if try_block.break_from:
self.process_break_exits(exits)
if try_block.continue_from:
@@ -692,6 +728,7 @@ class AstArcAnalyzer(object):
return exits
+ @contract(returns='ArcStarts')
def _handle__TryExcept(self, node):
# Python 2.7 uses separate TryExcept and TryFinally nodes. If we get
# TryExcept, it means there was no finally, so fake it, and treat as
@@ -699,6 +736,7 @@ class AstArcAnalyzer(object):
node.finalbody = []
return self._handle__Try(node)
+ @contract(returns='ArcStarts')
def _handle__TryFinally(self, node):
# Python 2.7 uses separate TryExcept and TryFinally nodes. If we get
# TryFinally, see if there's a TryExcept nested inside. If so, merge
@@ -715,30 +753,32 @@ class AstArcAnalyzer(object):
return self._handle__Try(node)
+ @contract(returns='ArcStarts')
def _handle__While(self, node):
constant_test = self.is_constant_expr(node.test)
start = to_top = self.line_for_node(node.test)
if constant_test:
to_top = self.line_for_node(node.body[0])
self.block_stack.append(LoopBlock(start=start))
- exits = self.add_body_arcs(node.body, from_line=start)
+ exits = self.add_body_arcs(node.body, from_start=ArcStart(start))
for xit in exits:
- self.arcs.add((xit, to_top))
+ self.arcs.add((xit.lineno, to_top))
exits = set()
my_block = self.block_stack.pop()
exits.update(my_block.break_exits)
if node.orelse:
- else_exits = self.add_body_arcs(node.orelse, from_line=start)
+ else_exits = self.add_body_arcs(node.orelse, from_start=ArcStart(start))
exits |= else_exits
else:
# No `else` clause: you can exit from the start.
if not constant_test:
- exits.add(start)
+ exits.add(ArcStart(start))
return exits
+ @contract(returns='ArcStarts')
def _handle__With(self, node):
start = self.line_for_node(node)
- exits = self.add_body_arcs(node.body, from_line=start)
+ exits = self.add_body_arcs(node.body, from_start=ArcStart(start))
return exits
_handle__AsyncWith = _handle__With
@@ -746,9 +786,9 @@ class AstArcAnalyzer(object):
def _code_object__Module(self, node):
start = self.line_for_node(node)
if node.body:
- exits = self.add_body_arcs(node.body, from_line=-1)
+ exits = self.add_body_arcs(node.body, from_start=ArcStart(-1))
for xit in exits:
- self.arcs.add((xit, -start))
+ self.arcs.add((xit.lineno, -start))
self.arc_dest_descriptions[-start] = 'exit the module'
else:
# Empty module.
@@ -758,10 +798,10 @@ class AstArcAnalyzer(object):
def _code_object__FunctionDef(self, node):
start = self.line_for_node(node)
self.block_stack.append(FunctionBlock(start=start))
- exits = self.add_body_arcs(node.body, from_line=-1)
+ exits = self.add_body_arcs(node.body, from_start=ArcStart(-1))
self.block_stack.pop()
for xit in exits:
- self.arcs.add((xit, -start))
+ self.arcs.add((xit.lineno, -start))
self.arc_dest_descriptions[-start] = 'exit function "{0}"'.format(node.name)
_code_object__AsyncFunctionDef = _code_object__FunctionDef
@@ -769,9 +809,9 @@ class AstArcAnalyzer(object):
def _code_object__ClassDef(self, node):
start = self.line_for_node(node)
self.arcs.add((-1, start))
- exits = self.add_body_arcs(node.body, from_line=start)
+ exits = self.add_body_arcs(node.body, from_start=ArcStart(start))
for xit in exits:
- self.arcs.add((xit, -start))
+ self.arcs.add((xit.lineno, -start))
self.arc_dest_descriptions[-start] = 'exit the body of class "{0}"'.format(node.name)
def _make_oneline_code_method(noun): # pylint: disable=no-self-argument