summaryrefslogtreecommitdiff
path: root/coverage/parser.py
diff options
context:
space:
mode:
Diffstat (limited to 'coverage/parser.py')
-rw-r--r--coverage/parser.py495
1 files changed, 490 insertions, 5 deletions
diff --git a/coverage/parser.py b/coverage/parser.py
index 884d40cb..c11bc222 100644
--- a/coverage/parser.py
+++ b/coverage/parser.py
@@ -3,19 +3,21 @@
"""Code parsing for coverage.py."""
+import ast
import collections
import dis
+import os
import re
import token
import tokenize
from coverage import env
from coverage.backward import range # pylint: disable=redefined-builtin
-from coverage.backward import bytes_to_ints
+from coverage.backward import bytes_to_ints, string_class
from coverage.bytecode import ByteCodes, CodeObjects
from coverage.misc import contract, nice_pair, join_regex
from coverage.misc import CoverageException, NoSource, NotPython
-from coverage.phystokens import compile_unicode, generate_tokens
+from coverage.phystokens import compile_unicode, generate_tokens, neuter_encoding_declaration
class PythonParser(object):
@@ -248,7 +250,7 @@ class PythonParser(object):
starts = self.raw_statements - ignore
self.statements = self.first_lines(starts) - ignore
- def arcs(self):
+ def old_arcs(self):
"""Get information about the arcs available in the code.
Returns a set of line number pairs. Line numbers have been normalized
@@ -264,6 +266,19 @@ class PythonParser(object):
self._all_arcs.add((fl1, fl2))
return self._all_arcs
+ def arcs(self):
+ if self._all_arcs is None:
+ aaa = AstArcAnalyzer(self.text)
+ arcs = aaa.collect_arcs()
+
+ self._all_arcs = set()
+ for l1, l2 in arcs:
+ fl1 = self.first_line(l1)
+ fl2 = self.first_line(l2)
+ if fl1 != fl2:
+ self._all_arcs.add((fl1, fl2))
+ return self._all_arcs
+
def exit_counts(self):
"""Get a count of exits from that each line.
@@ -292,6 +307,409 @@ class PythonParser(object):
return exit_counts
+class LoopBlock(object):
+ def __init__(self, start):
+ self.start = start
+ self.break_exits = set()
+
+class FunctionBlock(object):
+ def __init__(self, start):
+ self.start = start
+
+class TryBlock(object):
+ def __init__(self, handler_start=None, final_start=None):
+ self.handler_start = handler_start # TODO: is this used?
+ self.final_start = final_start # TODO: is this used?
+ self.break_from = set()
+ self.continue_from = set()
+ self.return_from = set()
+ self.raise_from = set()
+
+
+class AstArcAnalyzer(object):
+ @contract(text='unicode')
+ def __init__(self, text):
+ self.root_node = ast.parse(neuter_encoding_declaration(text))
+ if int(os.environ.get("COVERAGE_ASTDUMP", 0)):
+ # Dump the AST so that failing tests have helpful output.
+ ast_dump(self.root_node)
+
+ self.arcs = None
+ self.block_stack = []
+
+ def collect_arcs(self):
+ self.arcs = set()
+ self.add_arcs_for_code_objects(self.root_node)
+ return self.arcs
+
+ def blocks(self):
+ """Yield the blocks in nearest-to-farthest order."""
+ return reversed(self.block_stack)
+
+ def line_for_node(self, node):
+ """What is the right line number to use for this node?"""
+ node_name = node.__class__.__name__
+ handler = getattr(self, "line_" + node_name, self.line_default)
+ return handler(node)
+
+ def line_Assign(self, node):
+ return self.line_for_node(node.value)
+
+ def line_Dict(self, node):
+ # Python 3.5 changed how dict literals are made.
+ if env.PYVERSION >= (3, 5) and node.keys:
+ return node.keys[0].lineno
+ else:
+ return node.lineno
+
+ def line_List(self, node):
+ if node.elts:
+ return self.line_for_node(node.elts[0])
+ else:
+ # TODO: test case for this branch: x = []
+ return node.lineno
+
+ def line_Module(self, node):
+ if node.body:
+ return self.line_for_node(node.body[0])
+ else:
+ # Modules have no line number, they always start at 1.
+ # TODO: test case for empty module.
+ return 1
+
+ def line_comprehension(self, node):
+ # TODO: is this how to get the line number for a comprehension?
+ return node.target.lineno
+
+ def line_default(self, node):
+ return node.lineno
+
+ def add_arcs(self, node):
+ """Add the arcs for `node`.
+
+ Return a set of line numbers, exits from this node to the next.
+ """
+ # Yield-froms and awaits can appear anywhere.
+ # TODO: this is probably over-doing it, and too expensive. Can we
+ # instrument the ast walking to see how many nodes we are revisiting?
+ if isinstance(node, ast.stmt):
+ for name, value in ast.iter_fields(node):
+ if isinstance(value, ast.expr) and self.contains_return_expression(value):
+ self.process_return_exits([self.line_for_node(node)])
+ break
+ node_name = node.__class__.__name__
+ handler = getattr(self, "handle_" + node_name, self.handle_default)
+ return handler(node)
+
+ def add_body_arcs(self, body, from_line=None, prev_lines=None):
+ if prev_lines is None:
+ prev_lines = set([from_line])
+ for body_node in body:
+ lineno = self.line_for_node(body_node)
+ for prev_lineno in prev_lines:
+ self.arcs.add((prev_lineno, lineno))
+ prev_lines = self.add_arcs(body_node)
+ return prev_lines
+
+ def is_constant_expr(self, node):
+ """Is this a compile-time constant?"""
+ node_name = node.__class__.__name__
+ if node_name in ["NameConstant", "Num"]:
+ return True
+ elif node_name == "Name":
+ if env.PY3 and node.id in ["True", "False", "None"]:
+ return True
+ return False
+
+ # tests to write:
+ # TODO: while EXPR:
+ # TODO: while False:
+ # TODO: multi-target assignment with computed targets
+ # TODO: listcomps hidden deep in other expressions
+ # TODO: listcomps hidden in lists: x = [[i for i in range(10)]]
+ # TODO: multi-line listcomps
+ # TODO: nested function definitions
+
+ def process_break_exits(self, exits):
+ for block in self.blocks():
+ if isinstance(block, LoopBlock):
+ # TODO: what if there is no loop?
+ block.break_exits.update(exits)
+ break
+ elif isinstance(block, TryBlock) and block.final_start:
+ block.break_from.update(exits)
+ break
+
+ def process_continue_exits(self, exits):
+ for block in self.blocks():
+ if isinstance(block, LoopBlock):
+ # TODO: what if there is no loop?
+ for exit in exits:
+ self.arcs.add((exit, block.start))
+ break
+ elif isinstance(block, TryBlock) and block.final_start:
+ block.continue_from.update(exits)
+ break
+
+ def process_raise_exits(self, exits):
+ for block in self.blocks():
+ if isinstance(block, TryBlock):
+ if block.handler_start:
+ for exit in exits:
+ self.arcs.add((exit, block.handler_start))
+ break
+ elif block.final_start:
+ block.raise_from.update(exits)
+ break
+ elif isinstance(block, FunctionBlock):
+ for exit in exits:
+ self.arcs.add((exit, -block.start))
+ break
+
+ def process_return_exits(self, exits):
+ for block in self.blocks():
+ if isinstance(block, TryBlock) and block.final_start:
+ block.return_from.update(exits)
+ break
+ elif isinstance(block, FunctionBlock):
+ # TODO: what if there is no enclosing function?
+ for exit in exits:
+ self.arcs.add((exit, -block.start))
+ break
+
+ ## Handlers
+
+ def handle_Break(self, node):
+ here = self.line_for_node(node)
+ self.process_break_exits([here])
+ return set()
+
+ def handle_ClassDef(self, node):
+ start = self.line_for_node(node)
+ # the body is handled in add_arcs_for_code_objects.
+ return set([start])
+
+ def handle_Continue(self, node):
+ here = self.line_for_node(node)
+ self.process_continue_exits([here])
+ return set()
+
+ def handle_For(self, node):
+ start = self.line_for_node(node.iter)
+ self.block_stack.append(LoopBlock(start=start))
+ exits = self.add_body_arcs(node.body, from_line=start)
+ for exit in exits:
+ self.arcs.add((exit, start))
+ my_block = self.block_stack.pop()
+ exits = my_block.break_exits
+ if node.orelse:
+ else_start = self.line_for_node(node.orelse[0])
+ self.arcs.add((start, else_start))
+ else_exits = self.add_body_arcs(node.orelse, from_line=start)
+ exits |= else_exits
+ else:
+ # no else clause: exit from the for line.
+ exits.add(start)
+ return exits
+
+ handle_AsyncFor = handle_For
+
+ def handle_FunctionDef(self, node):
+ start = self.line_for_node(node)
+ # the body is handled in add_arcs_for_code_objects.
+ return set([start])
+
+ handle_AsyncFunctionDef = handle_FunctionDef
+
+ def handle_If(self, node):
+ start = self.line_for_node(node.test)
+ exits = self.add_body_arcs(node.body, from_line=start)
+ exits |= self.add_body_arcs(node.orelse, from_line=start)
+ return exits
+
+ def handle_Module(self, node):
+ raise Exception("TODO: this shouldn't happen")
+
+ def handle_Raise(self, node):
+ # `raise` statement jumps away, no exits from here.
+ here = self.line_for_node(node)
+ self.process_raise_exits([here])
+ return set()
+
+ def handle_Return(self, node):
+ # TODO: deal with returning through a finally.
+ here = self.line_for_node(node)
+ self.process_return_exits([here])
+ return set()
+
+ def handle_Try(self, node):
+ # try/finally is tricky. If there's a finally clause, then we need a
+ # FinallyBlock to track what flows might go through the finally instead
+ # of their normal flow.
+ if node.handlers:
+ handler_start = self.line_for_node(node.handlers[0])
+ else:
+ handler_start = None
+
+ if node.finalbody:
+ final_start = self.line_for_node(node.finalbody[0])
+ else:
+ final_start = None
+
+ self.block_stack.append(TryBlock(handler_start=handler_start, final_start=final_start))
+
+ start = self.line_for_node(node)
+ exits = self.add_body_arcs(node.body, from_line=start)
+
+ try_block = self.block_stack.pop()
+ handler_exits = set()
+ last_handler_start = None
+ if node.handlers:
+ for handler_node in node.handlers:
+ handler_start = self.line_for_node(handler_node)
+ if last_handler_start is not None:
+ self.arcs.add((last_handler_start, handler_start))
+ last_handler_start = handler_start
+ handler_exits |= self.add_body_arcs(handler_node.body, from_line=handler_start)
+ if handler_node.type is None:
+ # "except:" doesn't jump to subsequent handlers, or
+ # "finally:".
+ last_handler_start = None
+ # TODO: should we break here? Handlers after "except:"
+ # won't be run. Should coverage know that code can't be
+ # run, or should it flag it as not run?
+
+ if node.orelse:
+ exits = self.add_body_arcs(node.orelse, prev_lines=exits)
+
+ exits |= handler_exits
+ if node.finalbody:
+ final_from = exits | try_block.break_from | try_block.continue_from | try_block.return_from
+ if node.handlers and last_handler_start is not None:
+ # If there was an "except X:" clause, then a "raise" in the
+ # body goes to the "except X:" before the "finally", but the
+ # "except" go to the finally.
+ final_from.add(last_handler_start)
+ else:
+ final_from |= try_block.raise_from
+ exits = self.add_body_arcs(node.finalbody, prev_lines=final_from)
+ if try_block.break_from:
+ self.process_break_exits(exits)
+ if try_block.continue_from:
+ self.process_continue_exits(exits)
+ if try_block.raise_from:
+ self.process_raise_exits(exits)
+ if try_block.return_from:
+ self.process_return_exits(exits)
+ return exits
+
+ def handle_TryExcept(self, node):
+ # Python 2.7 uses separate TryExcept and TryFinally nodes. If we get
+ # TryExcept, it means there was no finally, so fake it, and treat as
+ # a general Try node.
+ node.finalbody = []
+ return self.handle_Try(node)
+
+ def handle_TryFinally(self, node):
+ # Python 2.7 uses separate TryExcept and TryFinally nodes. If we get
+ # TryFinally, see if there's a TryExcept nested inside. If so, merge
+ # them. Otherwise, fake fields to complete a Try node.
+ node.handlers = []
+ node.orelse = []
+
+ if node.body:
+ first = node.body[0]
+ if first.__class__.__name__ == "TryExcept" and node.lineno == first.lineno:
+ assert len(node.body) == 1
+ node.body = first.body
+ node.handlers = first.handlers
+ node.orelse = first.orelse
+
+ return self.handle_Try(node)
+
+ def handle_While(self, node):
+ constant_test = self.is_constant_expr(node.test)
+ start = to_top = self.line_for_node(node.test)
+ if constant_test:
+ to_top = self.line_for_node(node.body[0])
+ self.block_stack.append(LoopBlock(start=start))
+ exits = self.add_body_arcs(node.body, from_line=start)
+ for exit in exits:
+ self.arcs.add((exit, to_top))
+ exits = set()
+ if not constant_test:
+ exits.add(start)
+ my_block = self.block_stack.pop()
+ exits.update(my_block.break_exits)
+ # TODO: orelse
+ return exits
+
+ def handle_With(self, node):
+ start = self.line_for_node(node)
+ exits = self.add_body_arcs(node.body, from_line=start)
+ return exits
+
+ handle_AsyncWith = handle_With
+
+ OK_TO_DEFAULT = set([
+ "Assign", "Assert", "AugAssign", "Delete", "Exec", "Expr", "Global",
+ "Import", "ImportFrom", "Pass", "Print",
+ ])
+
+ def handle_default(self, node):
+ node_name = node.__class__.__name__
+ if node_name not in self.OK_TO_DEFAULT:
+ # TODO: put 1/0 here to find unhandled nodes.
+ print("*** Unhandled: {0}".format(node))
+ return set([self.line_for_node(node)])
+
+ CODE_COMPREHENSIONS = set(["GeneratorExp", "DictComp", "SetComp"])
+ if env.PY3:
+ CODE_COMPREHENSIONS.add("ListComp")
+
+ def add_arcs_for_code_objects(self, root_node):
+ for node in ast.walk(root_node):
+ node_name = node.__class__.__name__
+ if node_name == "Module":
+ start = self.line_for_node(node)
+ exits = self.add_body_arcs(node.body, from_line=-1)
+ for exit in exits:
+ self.arcs.add((exit, -start))
+ elif node_name in ["FunctionDef", "AsyncFunctionDef"]:
+ start = self.line_for_node(node)
+ self.block_stack.append(FunctionBlock(start=start))
+ exits = self.add_body_arcs(node.body, from_line=-1)
+ self.block_stack.pop()
+ for exit in exits:
+ self.arcs.add((exit, -start))
+ elif node_name == "ClassDef":
+ start = self.line_for_node(node)
+ self.arcs.add((-1, start))
+ exits = self.add_body_arcs(node.body, from_line=start)
+ for exit in exits:
+ self.arcs.add((exit, -start))
+ elif node_name in self.CODE_COMPREHENSIONS:
+ # TODO: tests for when generators is more than one?
+ for gen in node.generators:
+ start = self.line_for_node(gen)
+ self.arcs.add((-1, start))
+ self.arcs.add((start, -start))
+ # TODO: guaranteed this won't work for multi-line comps.
+ elif node_name == "Lambda":
+ start = self.line_for_node(node)
+ self.arcs.add((-1, start))
+ self.arcs.add((start, -start))
+ # TODO: test multi-line lambdas
+
+ def contains_return_expression(self, node):
+ """Is there a yield-from or await in `node` someplace?"""
+ for child in ast.walk(node):
+ if child.__class__.__name__ in ["YieldFrom", "Await"]:
+ return True
+
+ return False
+
+
## Opcodes that guide the ByteParser.
def _opcode(name):
@@ -325,7 +743,7 @@ OPS_CHUNK_BEGIN = _opcode_set('JUMP_ABSOLUTE', 'JUMP_FORWARD')
# Opcodes that push a block on the block stack.
OPS_PUSH_BLOCK = _opcode_set(
- 'SETUP_LOOP', 'SETUP_EXCEPT', 'SETUP_FINALLY', 'SETUP_WITH'
+ 'SETUP_LOOP', 'SETUP_EXCEPT', 'SETUP_FINALLY', 'SETUP_WITH', 'SETUP_ASYNC_WITH',
)
# Block types for exception handling.
@@ -334,6 +752,8 @@ OPS_EXCEPT_BLOCKS = _opcode_set('SETUP_EXCEPT', 'SETUP_FINALLY')
# Opcodes that pop a block from the block stack.
OPS_POP_BLOCK = _opcode_set('POP_BLOCK')
+OPS_GET_AITER = _opcode_set('GET_AITER')
+
# Opcodes that have a jump destination, but aren't really a jump.
OPS_NO_JUMP = OPS_PUSH_BLOCK
@@ -453,6 +873,8 @@ class ByteParser(object):
# is a count of how many ignores are left.
ignore_branch = 0
+ ignore_pop_block = 0
+
# We have to handle the last two bytecodes specially.
ult = penult = None
@@ -511,7 +933,10 @@ class ByteParser(object):
block_stack.append((bc.op, bc.jump_to))
if bc.op in OPS_POP_BLOCK:
# The opcode pops a block from the block stack.
- block_stack.pop()
+ if ignore_pop_block:
+ ignore_pop_block -= 1
+ else:
+ block_stack.pop()
if bc.op in OPS_CHUNK_END:
# This opcode forces the end of the chunk.
if bc.op == OP_BREAK_LOOP:
@@ -531,6 +956,15 @@ class ByteParser(object):
# branch, so that except's don't count as branches.
ignore_branch += 1
+ if bc.op in OPS_GET_AITER:
+ # GET_AITER is weird: First, it seems to generate one more
+ # POP_BLOCK than SETUP_*, so we have to prepare to ignore one
+ # of the POP_BLOCKS. Second, we don't have a clear branch to
+ # the exit of the loop, so we peek into the block stack to find
+ # it.
+ ignore_pop_block += 1
+ chunk.exits.add(block_stack[-1][1])
+
penult = ult
ult = bc
@@ -690,3 +1124,54 @@ class Chunk(object):
"v" if self.entrance else "",
list(self.exits),
)
+
+
+SKIP_DUMP_FIELDS = ["ctx"]
+
+def is_simple_value(value):
+ return (
+ value in [None, [], (), {}, set()] or
+ isinstance(value, (string_class, int, float))
+ )
+
+def ast_dump(node, depth=0):
+ indent = " " * depth
+ if not isinstance(node, ast.AST):
+ print("{0}<{1} {2!r}>".format(indent, node.__class__.__name__, node))
+ return
+
+ lineno = getattr(node, "lineno", None)
+ if lineno is not None:
+ linemark = " @ {0}".format(node.lineno)
+ else:
+ linemark = ""
+ head = "{0}<{1}{2}".format(indent, node.__class__.__name__, linemark)
+
+ named_fields = [
+ (name, value)
+ for name, value in ast.iter_fields(node)
+ if name not in SKIP_DUMP_FIELDS
+ ]
+ if not named_fields:
+ print("{0}>".format(head))
+ elif len(named_fields) == 1 and is_simple_value(named_fields[0][1]):
+ field_name, value = named_fields[0]
+ print("{0} {1}: {2!r}>".format(head, field_name, value))
+ else:
+ print(head)
+ print("{0}# mro: {1}".format(indent, ", ".join(c.__name__ for c in node.__class__.__mro__[1:])))
+ next_indent = indent + " "
+ for field_name, value in named_fields:
+ prefix = "{0}{1}:".format(next_indent, field_name)
+ if is_simple_value(value):
+ print("{0} {1!r}".format(prefix, value))
+ elif isinstance(value, list):
+ print("{0} [".format(prefix))
+ for n in value:
+ ast_dump(n, depth + 8)
+ print("{0}]".format(next_indent))
+ else:
+ print(prefix)
+ ast_dump(value, depth + 8)
+
+ print("{0}>".format(indent))