summaryrefslogtreecommitdiff
path: root/coverage/parser.py
diff options
context:
space:
mode:
authorNed Batchelder <ned@nedbatchelder.com>2015-12-24 08:49:50 -0500
committerNed Batchelder <ned@nedbatchelder.com>2015-12-24 08:49:50 -0500
commitfbe278c8bc9f3355a23dd68ad53a8a0201004f0b (patch)
tree4852c6ad40be53a26dcd7d14e9ab6aae96120914 /coverage/parser.py
parent795a3679f89ed98bb278daf075123d0b55fd091e (diff)
downloadpython-coveragepy-fbe278c8bc9f3355a23dd68ad53a8a0201004f0b.tar.gz
WIP: measure branches with ast instead of bytecode
Diffstat (limited to 'coverage/parser.py')
-rw-r--r--coverage/parser.py230
1 files changed, 228 insertions, 2 deletions
diff --git a/coverage/parser.py b/coverage/parser.py
index 7b8a60f..fb2cf95 100644
--- a/coverage/parser.py
+++ b/coverage/parser.py
@@ -3,6 +3,7 @@
"""Code parsing for coverage.py."""
+import ast
import collections
import dis
import re
@@ -260,6 +261,18 @@ class PythonParser(object):
self._all_arcs.add((fl1, fl2))
return self._all_arcs
+ def ast_arcs(self):
+ aaa = AstArcAnalyzer(self.text)
+ arcs = aaa.collect_arcs()
+
+ arcs_ = set()
+ for l1, l2 in arcs:
+ fl1 = self.first_line(l1)
+ fl2 = self.first_line(l2)
+ if fl1 != fl2:
+ arcs_.add((fl1, fl2))
+ return arcs_
+
def exit_counts(self):
"""Get a count of exits from that each line.
@@ -288,6 +301,168 @@ class PythonParser(object):
return exit_counts
+class AstArcAnalyzer(object):
+ def __init__(self, text):
+ self.root_node = ast.parse(text)
+ ast_dump(self.root_node)
+
+ self.arcs = None
+ # References to the nearest enclosing thing of its kind.
+ self.function_start = None
+ self.loop_start = None
+
+ # Break-exits from a loop
+ self.break_exits = None
+
+ def line_for_node(self, node):
+ """What is the right line number to use for this node?"""
+ node_name = node.__class__.__name__
+ if node_name == "Assign":
+ return node.value.lineno
+ elif node_name == "comprehension":
+ # TODO: is this how to get the line number for a comprehension?
+ return node.target.lineno
+ else:
+ return node.lineno
+
+ def collect_arcs(self):
+ self.arcs = set()
+ self.add_arcs_for_code_objects(self.root_node)
+ return self.arcs
+
+ def add_arcs(self, node):
+ """add the arcs for `node`.
+
+ Return a set of line numbers, exits from this node to the next.
+ """
+ node_name = node.__class__.__name__
+ #print("Adding arcs for {}".format(node_name))
+
+ handler = getattr(self, "handle_" + node_name, self.handle_default)
+ return handler(node)
+
+ def add_body_arcs(self, body, from_line):
+ prev_lines = set([from_line])
+ for body_node in body:
+ lineno = self.line_for_node(body_node)
+ for prev_lineno in prev_lines:
+ self.arcs.add((prev_lineno, lineno))
+ prev_lines = self.add_arcs(body_node)
+ return prev_lines
+
+ def is_constant_expr(self, node):
+ """Is this a compile-time constant?"""
+ node_name = node.__class__.__name__
+ return node_name in ["NameConstant", "Num"]
+
+ # tests to write:
+ # TODO: while EXPR:
+ # TODO: while False:
+ # TODO: multi-target assignment with computed targets
+ # TODO: listcomps hidden deep in other expressions
+ # TODO: listcomps hidden in lists: x = [[i for i in range(10)]]
+ # TODO: multi-line listcomps
+ # TODO: nested function definitions
+
+ def handle_Break(self, node):
+ here = self.line_for_node(node)
+ # TODO: what if self.break_exits is None?
+ self.break_exits.add(here)
+ return set([])
+
+ def handle_Continue(self, node):
+ here = self.line_for_node(node)
+ # TODO: what if self.loop_start is None?
+ self.arcs.add((here, self.loop_start))
+ return set([])
+
+ def handle_For(self, node):
+ start = self.line_for_node(node.iter)
+ loop_state = self.loop_start, self.break_exits
+ self.loop_start = start
+ self.break_exits = set()
+ exits = self.add_body_arcs(node.body, from_line=start)
+ for exit in exits:
+ self.arcs.add((exit, start))
+ exits = self.break_exits
+ self.loop_start, self.break_exits = loop_state
+ if node.orelse:
+ else_start = self.line_for_node(node.orelse[0])
+ self.arcs.add((start, else_start))
+ else_exits = self.add_body_arcs(node.orelse, from_line=start)
+ exits |= else_exits
+ else:
+ # no else clause: exit from the for line.
+ exits.add(start)
+ return exits
+
+ def handle_FunctionDef(self, node):
+ start = self.line_for_node(node)
+ # the body is handled in add_arcs_for_code_objects.
+ exits = set([start])
+ return exits
+
+ def handle_If(self, node):
+ start = self.line_for_node(node.test)
+ exits = self.add_body_arcs(node.body, from_line=start)
+ exits |= self.add_body_arcs(node.orelse, from_line=start)
+ return exits
+
+ def handle_Module(self, node):
+ raise Exception("TODO: this shouldn't happen")
+
+ def handle_Return(self, node):
+ here = self.line_for_node(node)
+ # TODO: what if self.function_start is None?
+ self.arcs.add((here, -self.function_start))
+ return set([])
+
+ def handle_While(self, node):
+ constant_test = self.is_constant_expr(node.test)
+ start = to_top = self.line_for_node(node.test)
+ if constant_test:
+ to_top = self.line_for_node(node.body[0])
+ loop_state = self.loop_start, self.break_exits
+ self.loop_start = start
+ self.break_exits = set()
+ exits = self.add_body_arcs(node.body, from_line=start)
+ for exit in exits:
+ self.arcs.add((exit, to_top))
+ exits = self.break_exits
+ self.loop_start, self.break_exits = loop_state
+ # TODO: orelse
+ return exits
+
+ def handle_default(self, node):
+ node_name = node.__class__.__name__
+ if node_name not in ["Assign", "Assert", "AugAssign", "Expr"]:
+ print("*** Unhandled: {}".format(node))
+ return set([self.line_for_node(node)])
+
+ def add_arcs_for_code_objects(self, root_node):
+ for node in ast.walk(root_node):
+ node_name = node.__class__.__name__
+ if node_name == "Module":
+ start = self.line_for_node(node.body[0])
+ exits = self.add_body_arcs(node.body, from_line=-1)
+ for exit in exits:
+ self.arcs.add((exit, -start))
+ elif node_name == "FunctionDef":
+ start = self.line_for_node(node)
+ self.function_start = start
+ func_exits = self.add_body_arcs(node.body, from_line=-1)
+ for exit in func_exits:
+ self.arcs.add((exit, -start))
+ self.function_start = None
+ elif node_name == "comprehension":
+ start = self.line_for_node(node)
+ self.arcs.add((-1, start))
+ self.arcs.add((start, -start))
+ # TODO: guaranteed this won't work for multi-line comps.
+
+
+
+
## Opcodes that guide the ByteParser.
def _opcode(name):
@@ -321,7 +496,7 @@ OPS_CHUNK_BEGIN = _opcode_set('JUMP_ABSOLUTE', 'JUMP_FORWARD')
# Opcodes that push a block on the block stack.
OPS_PUSH_BLOCK = _opcode_set(
- 'SETUP_LOOP', 'SETUP_EXCEPT', 'SETUP_FINALLY', 'SETUP_WITH'
+ 'SETUP_LOOP', 'SETUP_EXCEPT', 'SETUP_FINALLY', 'SETUP_WITH', 'SETUP_ASYNC_WITH',
)
# Block types for exception handling.
@@ -330,6 +505,8 @@ OPS_EXCEPT_BLOCKS = _opcode_set('SETUP_EXCEPT', 'SETUP_FINALLY')
# Opcodes that pop a block from the block stack.
OPS_POP_BLOCK = _opcode_set('POP_BLOCK')
+OPS_GET_AITER = _opcode_set('GET_AITER')
+
# Opcodes that have a jump destination, but aren't really a jump.
OPS_NO_JUMP = OPS_PUSH_BLOCK
@@ -449,6 +626,8 @@ class ByteParser(object):
# is a count of how many ignores are left.
ignore_branch = 0
+ ignore_pop_block = 0
+
# We have to handle the last two bytecodes specially.
ult = penult = None
@@ -507,7 +686,10 @@ class ByteParser(object):
block_stack.append((bc.op, bc.jump_to))
if bc.op in OPS_POP_BLOCK:
# The opcode pops a block from the block stack.
- block_stack.pop()
+ if ignore_pop_block:
+ ignore_pop_block -= 1
+ else:
+ block_stack.pop()
if bc.op in OPS_CHUNK_END:
# This opcode forces the end of the chunk.
if bc.op == OP_BREAK_LOOP:
@@ -527,6 +709,15 @@ class ByteParser(object):
# branch, so that except's don't count as branches.
ignore_branch += 1
+ if bc.op in OPS_GET_AITER:
+ # GET_AITER is weird: First, it seems to generate one more
+ # POP_BLOCK than SETUP_*, so we have to prepare to ignore one
+ # of the POP_BLOCKS. Second, we don't have a clear branch to
+ # the exit of the loop, so we peek into the block stack to find
+ # it.
+ ignore_pop_block += 1
+ chunk.exits.add(block_stack[-1][1])
+
penult = ult
ult = bc
@@ -686,3 +877,38 @@ class Chunk(object):
"v" if self.entrance else "",
list(self.exits),
)
+
+
+SKIP_FIELDS = ["ctx"]
+
+def ast_dump(node, depth=0):
+ indent = " " * depth
+ lineno = getattr(node, "lineno", None)
+ if lineno is not None:
+ linemark = " @ {0}".format(lineno)
+ else:
+ linemark = ""
+ print("{0}<{1}{2}".format(indent, node.__class__.__name__, linemark))
+
+ indent += " "
+ for field_name, value in ast.iter_fields(node):
+ if field_name in SKIP_FIELDS:
+ continue
+ prefix = "{0}{1}:".format(indent, field_name)
+ if value is None:
+ print("{0} None".format(prefix))
+ elif isinstance(value, (str, int)):
+ print("{0} {1!r}".format(prefix, value))
+ elif isinstance(value, list):
+ if value == []:
+ print("{0} []".format(prefix))
+ else:
+ print("{0} [".format(prefix))
+ for n in value:
+ ast_dump(n, depth + 8)
+ print("{0}]".format(indent))
+ else:
+ print(prefix)
+ ast_dump(value, depth + 8)
+
+ print("{0}>".format(" " * depth))