summaryrefslogtreecommitdiff
path: root/coverage/parser.py
diff options
context:
space:
mode:
authorNed Batchelder <ned@nedbatchelder.com>2016-01-07 20:07:04 -0500
committerNed Batchelder <ned@nedbatchelder.com>2016-01-07 20:07:04 -0500
commitb7035114aa515d9d1fe171a9bf678868e76d8f74 (patch)
tree60609f38079b9de18cdbee1d255e96e9c666dd31 /coverage/parser.py
parentd1c92d8e6b066a7b16d625b566853821afe8b46c (diff)
parentd93ddb9524a3e3535541812bbeade8e8ff822409 (diff)
downloadpython-coveragepy-git-b7035114aa515d9d1fe171a9bf678868e76d8f74.tar.gz
Branch analysis is now done with AST instead of bytecode
Diffstat (limited to 'coverage/parser.py')
-rw-r--r--coverage/parser.py799
1 files changed, 482 insertions, 317 deletions
diff --git a/coverage/parser.py b/coverage/parser.py
index 884d40cb..9f7400e5 100644
--- a/coverage/parser.py
+++ b/coverage/parser.py
@@ -3,19 +3,20 @@
"""Code parsing for coverage.py."""
+import ast
import collections
-import dis
+import os
import re
import token
import tokenize
from coverage import env
from coverage.backward import range # pylint: disable=redefined-builtin
-from coverage.backward import bytes_to_ints
-from coverage.bytecode import ByteCodes, CodeObjects
+from coverage.backward import bytes_to_ints, string_class
+from coverage.bytecode import CodeObjects
from coverage.misc import contract, nice_pair, join_regex
from coverage.misc import CoverageException, NoSource, NotPython
-from coverage.phystokens import compile_unicode, generate_tokens
+from coverage.phystokens import compile_unicode, generate_tokens, neuter_encoding_declaration
class PythonParser(object):
@@ -65,8 +66,9 @@ class PythonParser(object):
# The raw line numbers of excluded lines of code, as marked by pragmas.
self.raw_excluded = set()
- # The line numbers of class definitions.
+ # The line numbers of class and function definitions.
self.raw_classdefs = set()
+ self.raw_funcdefs = set()
# The line numbers of docstring lines.
self.raw_docstrings = set()
@@ -140,10 +142,12 @@ class PythonParser(object):
indent -= 1
elif toktype == token.NAME:
if ttext == 'class':
- # Class definitions look like branches in the byte code, so
+ # Class definitions look like branches in the bytecode, so
# we need to exclude them. The simplest way is to note the
# lines with the 'class' keyword.
self.raw_classdefs.add(slineno)
+ elif ttext == 'def':
+ self.raw_funcdefs.add(slineno)
elif toktype == token.OP:
if ttext == ':':
should_exclude = (elineno in self.raw_excluded) or excluding_decorators
@@ -256,8 +260,11 @@ class PythonParser(object):
"""
if self._all_arcs is None:
+ aaa = AstArcAnalyzer(self.text, self.raw_funcdefs, self.raw_classdefs)
+ arcs = aaa.collect_arcs()
+
self._all_arcs = set()
- for l1, l2 in self.byte_parser._all_arcs():
+ for l1, l2 in arcs:
fl1 = self.first_line(l1)
fl2 = self.first_line(l2)
if fl1 != fl2:
@@ -292,62 +299,435 @@ class PythonParser(object):
return exit_counts
-## Opcodes that guide the ByteParser.
+class LoopBlock(object):
+ def __init__(self, start):
+ self.start = start
+ self.break_exits = set()
-def _opcode(name):
- """Return the opcode by name from the dis module."""
- return dis.opmap[name]
+class FunctionBlock(object):
+ def __init__(self, start):
+ self.start = start
-def _opcode_set(*names):
- """Return a set of opcodes by the names in `names`."""
- s = set()
- for name in names:
- try:
- s.add(_opcode(name))
- except KeyError:
- pass
- return s
-# Opcodes that leave the code object.
-OPS_CODE_END = _opcode_set('RETURN_VALUE')
+class TryBlock(object):
+ def __init__(self, handler_start=None, final_start=None):
+ self.handler_start = handler_start
+ self.final_start = final_start
+ self.break_from = set()
+ self.continue_from = set()
+ self.return_from = set()
+ self.raise_from = set()
+
+
+class AstArcAnalyzer(object):
+ @contract(text='unicode', funcdefs=set, classdefs=set)
+ def __init__(self, text, funcdefs, classdefs):
+ self.root_node = ast.parse(neuter_encoding_declaration(text))
+ self.funcdefs = funcdefs
+ self.classdefs = classdefs
+
+ if int(os.environ.get("COVERAGE_ASTDUMP", 0)): # pragma: debugging
+ # Dump the AST so that failing tests have helpful output.
+ ast_dump(self.root_node)
+
+ self.arcs = None
+ self.block_stack = []
+
+ def collect_arcs(self):
+ self.arcs = set()
+ self.add_arcs_for_code_objects(self.root_node)
+ return self.arcs
-# Opcodes that unconditionally end the code chunk.
-OPS_CHUNK_END = _opcode_set(
- 'JUMP_ABSOLUTE', 'JUMP_FORWARD', 'RETURN_VALUE', 'RAISE_VARARGS',
- 'BREAK_LOOP', 'CONTINUE_LOOP',
-)
+ def blocks(self):
+ """Yield the blocks in nearest-to-farthest order."""
+ return reversed(self.block_stack)
-# Opcodes that unconditionally begin a new code chunk. By starting new chunks
-# with unconditional jump instructions, we neatly deal with jumps to jumps
-# properly.
-OPS_CHUNK_BEGIN = _opcode_set('JUMP_ABSOLUTE', 'JUMP_FORWARD')
+ def line_for_node(self, node):
+ """What is the right line number to use for this node?"""
+ node_name = node.__class__.__name__
+ handler = getattr(self, "_line__" + node_name, None)
+ if handler is not None:
+ return handler(node)
+ else:
+ return node.lineno
+
+ def _line__Assign(self, node):
+ return self.line_for_node(node.value)
+
+ def _line__Dict(self, node):
+ # Python 3.5 changed how dict literals are made.
+ if env.PYVERSION >= (3, 5) and node.keys:
+ return node.keys[0].lineno
+ else:
+ return node.lineno
-# Opcodes that push a block on the block stack.
-OPS_PUSH_BLOCK = _opcode_set(
- 'SETUP_LOOP', 'SETUP_EXCEPT', 'SETUP_FINALLY', 'SETUP_WITH'
-)
+ def _line__List(self, node):
+ if node.elts:
+ return self.line_for_node(node.elts[0])
+ else:
+ return node.lineno
+
+ def _line__Module(self, node):
+ if node.body:
+ return self.line_for_node(node.body[0])
+ else:
+ # Modules have no line number, they always start at 1.
+ return 1
-# Block types for exception handling.
-OPS_EXCEPT_BLOCKS = _opcode_set('SETUP_EXCEPT', 'SETUP_FINALLY')
+ OK_TO_DEFAULT = set([
+ "Assign", "Assert", "AugAssign", "Delete", "Exec", "Expr", "Global",
+ "Import", "ImportFrom", "Pass", "Print",
+ ])
-# Opcodes that pop a block from the block stack.
-OPS_POP_BLOCK = _opcode_set('POP_BLOCK')
+ def add_arcs(self, node):
+ """Add the arcs for `node`.
-# Opcodes that have a jump destination, but aren't really a jump.
-OPS_NO_JUMP = OPS_PUSH_BLOCK
+ Return a set of line numbers, exits from this node to the next.
+ """
+ # Yield-froms and awaits can appear anywhere.
+ # TODO: this is probably over-doing it, and too expensive. Can we
+ # instrument the ast walking to see how many nodes we are revisiting?
+ if isinstance(node, ast.stmt):
+ for _, value in ast.iter_fields(node):
+ if isinstance(value, ast.expr) and self.contains_return_expression(value):
+ self.process_return_exits([self.line_for_node(node)])
+ break
+
+ node_name = node.__class__.__name__
+ handler = getattr(self, "_handle__" + node_name, None)
+ if handler is not None:
+ return handler(node)
+
+ if 0:
+ node_name = node.__class__.__name__
+ if node_name not in self.OK_TO_DEFAULT:
+ print("*** Unhandled: {0}".format(node))
+ return set([self.line_for_node(node)])
+
+ def add_body_arcs(self, body, from_line=None, prev_lines=None):
+ if prev_lines is None:
+ prev_lines = set([from_line])
+ for body_node in body:
+ lineno = self.line_for_node(body_node)
+ for prev_lineno in prev_lines:
+ self.arcs.add((prev_lineno, lineno))
+ prev_lines = self.add_arcs(body_node)
+ return prev_lines
+
+ def is_constant_expr(self, node):
+ """Is this a compile-time constant?"""
+ node_name = node.__class__.__name__
+ if node_name in ["NameConstant", "Num"]:
+ return True
+ elif node_name == "Name":
+ if env.PY3 and node.id in ["True", "False", "None"]:
+ return True
+ return False
+
+ # tests to write:
+ # TODO: while EXPR:
+ # TODO: while False:
+ # TODO: listcomps hidden deep in other expressions
+ # TODO: listcomps hidden in lists: x = [[i for i in range(10)]]
+ # TODO: nested function definitions
+
+ def process_break_exits(self, exits):
+ for block in self.blocks():
+ if isinstance(block, LoopBlock):
+ block.break_exits.update(exits)
+ break
+ elif isinstance(block, TryBlock) and block.final_start:
+ block.break_from.update(exits)
+ break
+
+ def process_continue_exits(self, exits):
+ for block in self.blocks():
+ if isinstance(block, LoopBlock):
+ for xit in exits:
+ self.arcs.add((xit, block.start))
+ break
+ elif isinstance(block, TryBlock) and block.final_start:
+ block.continue_from.update(exits)
+ break
+
+ def process_raise_exits(self, exits):
+ for block in self.blocks():
+ if isinstance(block, TryBlock):
+ if block.handler_start:
+ for xit in exits:
+ self.arcs.add((xit, block.handler_start))
+ break
+ elif block.final_start:
+ block.raise_from.update(exits)
+ break
+ elif isinstance(block, FunctionBlock):
+ for xit in exits:
+ self.arcs.add((xit, -block.start))
+ break
+
+ def process_return_exits(self, exits):
+ for block in self.blocks():
+ if isinstance(block, TryBlock) and block.final_start:
+ block.return_from.update(exits)
+ break
+ elif isinstance(block, FunctionBlock):
+ for xit in exits:
+ self.arcs.add((xit, -block.start))
+ break
+
+ ## Handlers
+
+ def _handle__Break(self, node):
+ here = self.line_for_node(node)
+ self.process_break_exits([here])
+ return set()
+
+ def _handle__ClassDef(self, node):
+ return self.process_decorated(node, self.classdefs)
+
+ def process_decorated(self, node, defs):
+ last = self.line_for_node(node)
+ if node.decorator_list:
+ for dec_node in node.decorator_list:
+ dec_start = self.line_for_node(dec_node)
+ if dec_start != last:
+ self.arcs.add((last, dec_start))
+ last = dec_start
+ # The definition line may have been missed, but we should have it in
+ # `defs`.
+ body_start = self.line_for_node(node.body[0])
+ for lineno in range(last+1, body_start):
+ if lineno in defs:
+ self.arcs.add((last, lineno))
+ last = lineno
+ # the body is handled in add_arcs_for_code_objects.
+ return set([last])
+
+ def _handle__Continue(self, node):
+ here = self.line_for_node(node)
+ self.process_continue_exits([here])
+ return set()
+
+ def _handle__For(self, node):
+ start = self.line_for_node(node.iter)
+ self.block_stack.append(LoopBlock(start=start))
+ exits = self.add_body_arcs(node.body, from_line=start)
+ for xit in exits:
+ self.arcs.add((xit, start))
+ my_block = self.block_stack.pop()
+ exits = my_block.break_exits
+ if node.orelse:
+ else_exits = self.add_body_arcs(node.orelse, from_line=start)
+ exits |= else_exits
+ else:
+ # no else clause: exit from the for line.
+ exits.add(start)
+ return exits
+
+ _handle__AsyncFor = _handle__For
+
+ def _handle__FunctionDef(self, node):
+ return self.process_decorated(node, self.funcdefs)
+
+ _handle__AsyncFunctionDef = _handle__FunctionDef
+
+ def _handle__If(self, node):
+ start = self.line_for_node(node.test)
+ exits = self.add_body_arcs(node.body, from_line=start)
+ exits |= self.add_body_arcs(node.orelse, from_line=start)
+ return exits
+
+ def _handle__Raise(self, node):
+ # `raise` statement jumps away, no exits from here.
+ here = self.line_for_node(node)
+ self.process_raise_exits([here])
+ return set()
+
+ def _handle__Return(self, node):
+ here = self.line_for_node(node)
+ self.process_return_exits([here])
+ return set()
+
+ def _handle__Try(self, node):
+ # try/finally is tricky. If there's a finally clause, then we need a
+ # FinallyBlock to track what flows might go through the finally instead
+ # of their normal flow.
+ if node.handlers:
+ handler_start = self.line_for_node(node.handlers[0])
+ else:
+ handler_start = None
-# Individual opcodes we need below.
-OP_BREAK_LOOP = _opcode('BREAK_LOOP')
-OP_END_FINALLY = _opcode('END_FINALLY')
-OP_COMPARE_OP = _opcode('COMPARE_OP')
-COMPARE_EXCEPTION = 10 # just have to get this constant from the code.
-OP_LOAD_CONST = _opcode('LOAD_CONST')
-OP_RETURN_VALUE = _opcode('RETURN_VALUE')
+ if node.finalbody:
+ final_start = self.line_for_node(node.finalbody[0])
+ else:
+ final_start = None
+
+ self.block_stack.append(TryBlock(handler_start=handler_start, final_start=final_start))
+
+ start = self.line_for_node(node)
+ exits = self.add_body_arcs(node.body, from_line=start)
+
+ try_block = self.block_stack.pop()
+ handler_exits = set()
+ last_handler_start = None
+ if node.handlers:
+ for handler_node in node.handlers:
+ handler_start = self.line_for_node(handler_node)
+ if last_handler_start is not None:
+ self.arcs.add((last_handler_start, handler_start))
+ last_handler_start = handler_start
+ handler_exits |= self.add_body_arcs(handler_node.body, from_line=handler_start)
+ if handler_node.type is None:
+ # "except:" doesn't jump to subsequent handlers, or
+ # "finally:".
+ last_handler_start = None
+ # TODO: should we break here? Handlers after "except:"
+ # won't be run. Should coverage know that code can't be
+ # run, or should it flag it as not run?
+
+ if node.orelse:
+ exits = self.add_body_arcs(node.orelse, prev_lines=exits)
+
+ exits |= handler_exits
+ if node.finalbody:
+ final_from = ( # You can get to the `finally` clause from:
+ exits | # the exits of the body or `else` clause,
+ try_block.break_from | # or a `break` in the body,
+ try_block.continue_from | # or a `continue` in the body,
+ try_block.return_from # or a `return` in the body.
+ )
+ if node.handlers and last_handler_start is not None:
+ # If there was an "except X:" clause, then a "raise" in the
+ # body goes to the "except X:" before the "finally", but the
+ # "except" go to the finally.
+ final_from.add(last_handler_start)
+ else:
+ final_from |= try_block.raise_from
+ exits = self.add_body_arcs(node.finalbody, prev_lines=final_from)
+ if try_block.break_from:
+ self.process_break_exits(exits)
+ if try_block.continue_from:
+ self.process_continue_exits(exits)
+ if try_block.raise_from:
+ self.process_raise_exits(exits)
+ if try_block.return_from:
+ self.process_return_exits(exits)
+ return exits
+
+ def _handle__TryExcept(self, node):
+ # Python 2.7 uses separate TryExcept and TryFinally nodes. If we get
+ # TryExcept, it means there was no finally, so fake it, and treat as
+ # a general Try node.
+ node.finalbody = []
+ return self._handle__Try(node)
+
+ def _handle__TryFinally(self, node):
+ # Python 2.7 uses separate TryExcept and TryFinally nodes. If we get
+ # TryFinally, see if there's a TryExcept nested inside. If so, merge
+ # them. Otherwise, fake fields to complete a Try node.
+ node.handlers = []
+ node.orelse = []
+
+ first = node.body[0]
+ if first.__class__.__name__ == "TryExcept" and node.lineno == first.lineno:
+ assert len(node.body) == 1
+ node.body = first.body
+ node.handlers = first.handlers
+ node.orelse = first.orelse
+
+ return self._handle__Try(node)
+
+ def _handle__While(self, node):
+ constant_test = self.is_constant_expr(node.test)
+ start = to_top = self.line_for_node(node.test)
+ if constant_test:
+ to_top = self.line_for_node(node.body[0])
+ self.block_stack.append(LoopBlock(start=start))
+ exits = self.add_body_arcs(node.body, from_line=start)
+ for xit in exits:
+ self.arcs.add((xit, to_top))
+ exits = set()
+ my_block = self.block_stack.pop()
+ exits.update(my_block.break_exits)
+ if node.orelse:
+ else_exits = self.add_body_arcs(node.orelse, from_line=start)
+ exits |= else_exits
+ else:
+ # No `else` clause: you can exit from the start.
+ if not constant_test:
+ exits.add(start)
+ return exits
+
+ def _handle__With(self, node):
+ start = self.line_for_node(node)
+ exits = self.add_body_arcs(node.body, from_line=start)
+ return exits
+
+ _handle__AsyncWith = _handle__With
+
+ def add_arcs_for_code_objects(self, root_node):
+ for node in ast.walk(root_node):
+ node_name = node.__class__.__name__
+ code_object_handler = getattr(self, "_code_object__" + node_name, None)
+ if code_object_handler is not None:
+ code_object_handler(node)
+
+ def _code_object__Module(self, node):
+ start = self.line_for_node(node)
+ if node.body:
+ exits = self.add_body_arcs(node.body, from_line=-1)
+ for xit in exits:
+ self.arcs.add((xit, -start))
+ else:
+ # Empty module.
+ self.arcs.add((-1, start))
+ self.arcs.add((start, -1))
+
+ def _code_object__FunctionDef(self, node):
+ start = self.line_for_node(node)
+ self.block_stack.append(FunctionBlock(start=start))
+ exits = self.add_body_arcs(node.body, from_line=-1)
+ self.block_stack.pop()
+ for xit in exits:
+ self.arcs.add((xit, -start))
+
+ _code_object__AsyncFunctionDef = _code_object__FunctionDef
+
+ def _code_object__ClassDef(self, node):
+ start = self.line_for_node(node)
+ self.arcs.add((-1, start))
+ exits = self.add_body_arcs(node.body, from_line=start)
+ for xit in exits:
+ self.arcs.add((xit, -start))
+
+ def do_code_object_comprehension(self, node):
+ start = self.line_for_node(node)
+ self.arcs.add((-1, start))
+ self.arcs.add((start, -start))
+
+ _code_object__GeneratorExp = do_code_object_comprehension
+ _code_object__DictComp = do_code_object_comprehension
+ _code_object__SetComp = do_code_object_comprehension
+ if env.PY3:
+ _code_object__ListComp = do_code_object_comprehension
+
+ def _code_object__Lambda(self, node):
+ start = self.line_for_node(node)
+ self.arcs.add((-1, start))
+ self.arcs.add((start, -start))
+
+ def contains_return_expression(self, node):
+ """Is there a yield-from or await in `node` someplace?"""
+ for child in ast.walk(node):
+ if child.__class__.__name__ in ["YieldFrom", "Await"]:
+ return True
+
+ return False
class ByteParser(object):
- """Parse byte codes to understand the structure of code."""
+ """Parse bytecode to understand the structure of code."""
@contract(text='unicode')
def __init__(self, text, code=None, filename=None):
@@ -366,7 +746,7 @@ class ByteParser(object):
# Alternative Python implementations don't always provide all the
# attributes on code objects that we need to do the analysis.
- for attr in ['co_lnotab', 'co_firstlineno', 'co_consts', 'co_code']:
+ for attr in ['co_lnotab', 'co_firstlineno', 'co_consts']:
if not hasattr(self.code, attr):
raise CoverageException(
"This implementation of Python doesn't support code analysis.\n"
@@ -421,272 +801,57 @@ class ByteParser(object):
for _, l in bp._bytes_lines():
yield l
- def _block_stack_repr(self, block_stack): # pragma: debugging
- """Get a string version of `block_stack`, for debugging."""
- blocks = ", ".join(
- "(%s, %r)" % (dis.opname[b[0]], b[1]) for b in block_stack
- )
- return "[" + blocks + "]"
-
- def _split_into_chunks(self):
- """Split the code object into a list of `Chunk` objects.
-
- Each chunk is only entered at its first instruction, though there can
- be many exits from a chunk.
-
- Returns a list of `Chunk` objects.
- """
- # The list of chunks so far, and the one we're working on.
- chunks = []
- chunk = None
-
- # A dict mapping byte offsets of line starts to the line numbers.
- bytes_lines_map = dict(self._bytes_lines())
-
- # The block stack: loops and try blocks get pushed here for the
- # implicit jumps that can occur.
- # Each entry is a tuple: (block type, destination)
- block_stack = []
-
- # Some op codes are followed by branches that should be ignored. This
- # is a count of how many ignores are left.
- ignore_branch = 0
-
- # We have to handle the last two bytecodes specially.
- ult = penult = None
-
- # Get a set of all of the jump-to points.
- jump_to = set()
- bytecodes = list(ByteCodes(self.code.co_code))
- for bc in bytecodes:
- if bc.jump_to >= 0:
- jump_to.add(bc.jump_to)
-
- chunk_lineno = 0
-
- # Walk the byte codes building chunks.
- for bc in bytecodes:
- # Maybe have to start a new chunk.
- start_new_chunk = False
- first_chunk = False
- if bc.offset in bytes_lines_map:
- # Start a new chunk for each source line number.
- start_new_chunk = True
- chunk_lineno = bytes_lines_map[bc.offset]
- first_chunk = True
- elif bc.offset in jump_to:
- # To make chunks have a single entrance, we have to make a new
- # chunk when we get to a place some bytecode jumps to.
- start_new_chunk = True
- elif bc.op in OPS_CHUNK_BEGIN:
- # Jumps deserve their own unnumbered chunk. This fixes
- # problems with jumps to jumps getting confused.
- start_new_chunk = True
-
- if not chunk or start_new_chunk:
- if chunk:
- chunk.exits.add(bc.offset)
- chunk = Chunk(bc.offset, chunk_lineno, first_chunk)
- if not chunks:
- # The very first chunk of a code object is always an
- # entrance.
- chunk.entrance = True
- chunks.append(chunk)
-
- # Look at the opcode.
- if bc.jump_to >= 0 and bc.op not in OPS_NO_JUMP:
- if ignore_branch:
- # Someone earlier wanted us to ignore this branch.
- ignore_branch -= 1
- else:
- # The opcode has a jump, it's an exit for this chunk.
- chunk.exits.add(bc.jump_to)
-
- if bc.op in OPS_CODE_END:
- # The opcode can exit the code object.
- chunk.exits.add(-self.code.co_firstlineno)
- if bc.op in OPS_PUSH_BLOCK:
- # The opcode adds a block to the block_stack.
- block_stack.append((bc.op, bc.jump_to))
- if bc.op in OPS_POP_BLOCK:
- # The opcode pops a block from the block stack.
- block_stack.pop()
- if bc.op in OPS_CHUNK_END:
- # This opcode forces the end of the chunk.
- if bc.op == OP_BREAK_LOOP:
- # A break is implicit: jump where the top of the
- # block_stack points.
- chunk.exits.add(block_stack[-1][1])
- chunk = None
- if bc.op == OP_END_FINALLY:
- # For the finally clause we need to find the closest exception
- # block, and use its jump target as an exit.
- for block in reversed(block_stack):
- if block[0] in OPS_EXCEPT_BLOCKS:
- chunk.exits.add(block[1])
- break
- if bc.op == OP_COMPARE_OP and bc.arg == COMPARE_EXCEPTION:
- # This is an except clause. We want to overlook the next
- # branch, so that except's don't count as branches.
- ignore_branch += 1
-
- penult = ult
- ult = bc
-
- if chunks:
- # The last two bytecodes could be a dummy "return None" that
- # shouldn't be counted as real code. Every Python code object seems
- # to end with a return, and a "return None" is inserted if there
- # isn't an explicit return in the source.
- if ult and penult:
- if penult.op == OP_LOAD_CONST and ult.op == OP_RETURN_VALUE:
- if self.code.co_consts[penult.arg] is None:
- # This is "return None", but is it dummy? A real line
- # would be a last chunk all by itself.
- if chunks[-1].byte != penult.offset:
- ex = -self.code.co_firstlineno
- # Split the last chunk
- last_chunk = chunks[-1]
- last_chunk.exits.remove(ex)
- last_chunk.exits.add(penult.offset)
- chunk = Chunk(
- penult.offset, last_chunk.line, False
- )
- chunk.exits.add(ex)
- chunks.append(chunk)
-
- # Give all the chunks a length.
- chunks[-1].length = bc.next_offset - chunks[-1].byte
- for i in range(len(chunks)-1):
- chunks[i].length = chunks[i+1].byte - chunks[i].byte
-
- #self.validate_chunks(chunks)
- return chunks
-
- def validate_chunks(self, chunks): # pragma: debugging
- """Validate the rule that chunks have a single entrance."""
- # starts is the entrances to the chunks
- starts = set(ch.byte for ch in chunks)
- for ch in chunks:
- assert all((ex in starts or ex < 0) for ex in ch.exits)
-
- def _arcs(self):
- """Find the executable arcs in the code.
-
- Yields pairs: (from,to). From and to are integer line numbers. If
- from is < 0, then the arc is an entrance into the code object. If to
- is < 0, the arc is an exit from the code object.
-
- """
- chunks = self._split_into_chunks()
-
- # A map from byte offsets to the chunk starting at that offset.
- byte_chunks = dict((c.byte, c) for c in chunks)
-
- # Traverse from the first chunk in each line, and yield arcs where
- # the trace function will be invoked.
- for chunk in chunks:
- if chunk.entrance:
- yield (-1, chunk.line)
-
- if not chunk.first:
- continue
-
- chunks_considered = set()
- chunks_to_consider = [chunk]
- while chunks_to_consider:
- # Get the chunk we're considering, and make sure we don't
- # consider it again.
- this_chunk = chunks_to_consider.pop()
- chunks_considered.add(this_chunk)
-
- # For each exit, add the line number if the trace function
- # would be triggered, or add the chunk to those being
- # considered if not.
- for ex in this_chunk.exits:
- if ex < 0:
- yield (chunk.line, ex)
- else:
- next_chunk = byte_chunks[ex]
- if next_chunk in chunks_considered:
- continue
-
- # The trace function is invoked if visiting the first
- # bytecode in a line, or if the transition is a
- # backward jump.
- backward_jump = next_chunk.byte < this_chunk.byte
- if next_chunk.first or backward_jump:
- if next_chunk.line != chunk.line:
- yield (chunk.line, next_chunk.line)
- else:
- chunks_to_consider.append(next_chunk)
-
- def _all_chunks(self):
- """Returns a list of `Chunk` objects for this code and its children.
-
- See `_split_into_chunks` for details.
-
- """
- chunks = []
- for bp in self.child_parsers():
- chunks.extend(bp._split_into_chunks())
-
- return chunks
-
- def _all_arcs(self):
- """Get the set of all arcs in this code object and its children.
-
- See `_arcs` for details.
-
- """
- arcs = set()
- for bp in self.child_parsers():
- arcs.update(bp._arcs())
-
- return arcs
-
-
-class Chunk(object):
- """A sequence of byte codes with a single entrance.
-
- To analyze byte code, we have to divide it into chunks, sequences of byte
- codes such that each chunk has only one entrance, the first instruction in
- the block.
-
- This is almost the CS concept of `basic block`_, except that we're willing
- to have many exits from a chunk, and "basic block" is a more cumbersome
- term.
-
- .. _basic block: http://en.wikipedia.org/wiki/Basic_block
-
- `byte` is the offset to the bytecode starting this chunk.
-
- `line` is the source line number containing this chunk.
-
- `first` is true if this is the first chunk in the source line.
-
- An exit < 0 means the chunk can leave the code (return). The exit is
- the negative of the starting line number of the code block.
-
- The `entrance` attribute is a boolean indicating whether the code object
- can be entered at this chunk.
-
- """
- def __init__(self, byte, line, first):
- self.byte = byte
- self.line = line
- self.first = first
- self.length = 0
- self.entrance = False
- self.exits = set()
+SKIP_DUMP_FIELDS = ["ctx"]
+
+def is_simple_value(value):
+ return (
+ value in [None, [], (), {}, set()] or
+ isinstance(value, (string_class, int, float))
+ )
+
+# TODO: a test of ast_dump?
+def ast_dump(node, depth=0):
+ indent = " " * depth
+ if not isinstance(node, ast.AST):
+ print("{0}<{1} {2!r}>".format(indent, node.__class__.__name__, node))
+ return
+
+ lineno = getattr(node, "lineno", None)
+ if lineno is not None:
+ linemark = " @ {0}".format(node.lineno)
+ else:
+ linemark = ""
+ head = "{0}<{1}{2}".format(indent, node.__class__.__name__, linemark)
+
+ named_fields = [
+ (name, value)
+ for name, value in ast.iter_fields(node)
+ if name not in SKIP_DUMP_FIELDS
+ ]
+ if not named_fields:
+ print("{0}>".format(head))
+ elif len(named_fields) == 1 and is_simple_value(named_fields[0][1]):
+ field_name, value = named_fields[0]
+ print("{0} {1}: {2!r}>".format(head, field_name, value))
+ else:
+ print(head)
+ if 0:
+ print("{0}# mro: {1}".format(
+ indent, ", ".join(c.__name__ for c in node.__class__.__mro__[1:]),
+ ))
+ next_indent = indent + " "
+ for field_name, value in named_fields:
+ prefix = "{0}{1}:".format(next_indent, field_name)
+ if is_simple_value(value):
+ print("{0} {1!r}".format(prefix, value))
+ elif isinstance(value, list):
+ print("{0} [".format(prefix))
+ for n in value:
+ ast_dump(n, depth + 8)
+ print("{0}]".format(next_indent))
+ else:
+ print(prefix)
+ ast_dump(value, depth + 8)
- def __repr__(self):
- return "<%d+%d @%d%s%s %r>" % (
- self.byte,
- self.length,
- self.line,
- "!" if self.first else "",
- "v" if self.entrance else "",
- list(self.exits),
- )
+ print("{0}>".format(indent))