summaryrefslogtreecommitdiff
path: root/coverage/parser.py
diff options
context:
space:
mode:
authorNed Batchelder <ned@nedbatchelder.com>2022-12-29 08:32:14 -0500
committerNed Batchelder <ned@nedbatchelder.com>2022-12-29 11:27:55 -0500
commit0accb68cd9ac353bd5464750987e02012bdb8e0c (patch)
treea20b51aada58f12febcd71580b494b9013a977c6 /coverage/parser.py
parent42508990e08865ba93e8a893d36061351e553a63 (diff)
downloadpython-coveragepy-git-0accb68cd9ac353bd5464750987e02012bdb8e0c.tar.gz
mypy: add parser.py to mypy
Diffstat (limited to 'coverage/parser.py')
-rw-r--r--coverage/parser.py491
1 files changed, 277 insertions, 214 deletions
diff --git a/coverage/parser.py b/coverage/parser.py
index a5ad2f5c..1e2011e2 100644
--- a/coverage/parser.py
+++ b/coverage/parser.py
@@ -3,20 +3,35 @@
"""Code parsing for coverage.py."""
+from __future__ import annotations
+
import ast
import collections
import os
import re
+import sys
import token
import tokenize
+from types import CodeType
+from typing import (
+ cast, TYPE_CHECKING,
+ Any, Callable, Dict, Iterable, List, Optional, Sequence, Set, Tuple,
+)
+
from coverage import env
from coverage.bytecode import code_objects
from coverage.debug import short_stack
from coverage.exceptions import NoSource, NotPython, _StopEverything
-from coverage.misc import contract, join_regex, nice_pair
+from coverage.misc import join_regex, nice_pair
from coverage.phystokens import generate_tokens
+if TYPE_CHECKING:
+ # Protocol is new in 3.8. PYVERSIONS
+ from typing import Protocol
+else:
+ class Protocol: # pylint: disable=missing-class-docstring
+ pass
class PythonParser:
"""Parse code to find executable lines, excluded lines, etc.
@@ -25,8 +40,12 @@ class PythonParser:
involved.
"""
- @contract(text='unicode|None')
- def __init__(self, text=None, filename=None, exclude=None):
+ def __init__(
+ self,
+ text: Optional[str]=None,
+ filename: Optional[str]=None,
+ exclude: Optional[str]=None,
+ ) -> None:
"""
Source can be provided as `text`, the text itself, or `filename`, from
which the text will be read. Excluded lines are those that match
@@ -35,8 +54,9 @@ class PythonParser:
"""
assert text or filename, "PythonParser needs either text or filename"
self.filename = filename or "<code>"
- self.text = text
- if not self.text:
+ if text is not None:
+ self.text: str = text
+ else:
from coverage.python import get_python_source
try:
self.text = get_python_source(self.filename)
@@ -46,45 +66,45 @@ class PythonParser:
self.exclude = exclude
# The text lines of the parsed code.
- self.lines = self.text.split('\n')
+ self.lines: List[str] = self.text.split('\n')
# The normalized line numbers of the statements in the code. Exclusions
# are taken into account, and statements are adjusted to their first
# lines.
- self.statements = set()
+ self.statements: Set[int] = set()
# The normalized line numbers of the excluded lines in the code,
# adjusted to their first lines.
- self.excluded = set()
+ self.excluded: Set[int] = set()
# The raw_* attributes are only used in this class, and in
# lab/parser.py to show how this class is working.
# The line numbers that start statements, as reported by the line
# number table in the bytecode.
- self.raw_statements = set()
+ self.raw_statements: Set[int] = set()
# The raw line numbers of excluded lines of code, as marked by pragmas.
- self.raw_excluded = set()
+ self.raw_excluded: Set[int] = set()
# The line numbers of class definitions.
- self.raw_classdefs = set()
+ self.raw_classdefs: Set[int] = set()
# The line numbers of docstring lines.
- self.raw_docstrings = set()
+ self.raw_docstrings: Set[int] = set()
# Internal detail, used by lab/parser.py.
self.show_tokens = False
# A dict mapping line numbers to lexical statement starts for
# multi-line statements.
- self._multiline = {}
+ self._multiline: Dict[int, int] = {}
# Lazily-created arc data, and missing arc descriptions.
- self._all_arcs = None
- self._missing_arc_fragments = None
+ self._all_arcs: Optional[Set[TArc]] = None
+ self._missing_arc_fragments: Optional[TArcFragments] = None
- def lines_matching(self, *regexes):
+ def lines_matching(self, *regexes: str) -> Set[int]:
"""Find the lines matching one of a list of regexes.
Returns a set of line numbers, the lines that contain a match for one
@@ -100,7 +120,7 @@ class PythonParser:
matches.add(i)
return matches
- def _raw_parse(self):
+ def _raw_parse(self) -> None:
"""Parse the source to find the interesting facts about its lines.
A handful of attributes are updated.
@@ -122,6 +142,7 @@ class PythonParser:
first_on_line = True
nesting = 0
+ assert self.text is not None
tokgen = generate_tokens(self.text)
for toktype, ttext, (slineno, _), (elineno, _), ltext in tokgen:
if self.show_tokens: # pragma: debugging
@@ -167,11 +188,11 @@ class PythonParser:
# http://stackoverflow.com/questions/1769332/x/1769794#1769794
self.raw_docstrings.update(range(slineno, elineno+1))
elif toktype == token.NEWLINE:
- if first_line is not None and elineno != first_line:
+ if first_line is not None and elineno != first_line: # type: ignore[unreachable]
# We're at the end of a line, and we've ended on a
# different line than the first line of the statement,
# so record a multi-line range.
- for l in range(first_line, elineno+1):
+ for l in range(first_line, elineno+1): # type: ignore[unreachable]
self._multiline[l] = first_line
first_line = None
first_on_line = True
@@ -202,32 +223,32 @@ class PythonParser:
if env.PYBEHAVIOR.module_firstline_1 and self._multiline:
self._multiline[1] = min(self.raw_statements)
- def first_line(self, line):
- """Return the first line number of the statement including `line`."""
- if line < 0:
- line = -self._multiline.get(-line, -line)
+ def first_line(self, lineno: int) -> int:
+ """Return the first line number of the statement including `lineno`."""
+ if lineno < 0:
+ lineno = -self._multiline.get(-lineno, -lineno)
else:
- line = self._multiline.get(line, line)
- return line
+ lineno = self._multiline.get(lineno, lineno)
+ return lineno
- def first_lines(self, lines):
- """Map the line numbers in `lines` to the correct first line of the
+ def first_lines(self, linenos: Iterable[int]) -> Set[int]:
+ """Map the line numbers in `linenos` to the correct first line of the
statement.
Returns a set of the first lines.
"""
- return {self.first_line(l) for l in lines}
+ return {self.first_line(l) for l in linenos}
- def translate_lines(self, lines):
+ def translate_lines(self, lines: Iterable[int]) -> Set[int]:
"""Implement `FileReporter.translate_lines`."""
return self.first_lines(lines)
- def translate_arcs(self, arcs):
+ def translate_arcs(self, arcs: Iterable[TArc]) -> List[TArc]:
"""Implement `FileReporter.translate_arcs`."""
return [(self.first_line(a), self.first_line(b)) for (a, b) in arcs]
- def parse_source(self):
+ def parse_source(self) -> None:
"""Parse source text to find executable lines, excluded lines, etc.
Sets the .excluded and .statements attributes, normalized to the first
@@ -252,7 +273,7 @@ class PythonParser:
starts = self.raw_statements - ignore
self.statements = self.first_lines(starts) - ignore
- def arcs(self):
+ def arcs(self) -> Set[TArc]:
"""Get information about the arcs available in the code.
Returns a set of line number pairs. Line numbers have been normalized
@@ -261,9 +282,10 @@ class PythonParser:
"""
if self._all_arcs is None:
self._analyze_ast()
+ assert self._all_arcs is not None
return self._all_arcs
- def _analyze_ast(self):
+ def _analyze_ast(self) -> None:
"""Run the AstArcAnalyzer and save its results.
`_all_arcs` is the set of arcs in the code.
@@ -281,13 +303,13 @@ class PythonParser:
self._missing_arc_fragments = aaa.missing_arc_fragments
- def exit_counts(self):
+ def exit_counts(self) -> Dict[int, int]:
"""Get a count of exits from that each line.
Excluded lines are excluded.
"""
- exit_counts = collections.defaultdict(int)
+ exit_counts: Dict[int, int] = collections.defaultdict(int)
for l1, l2 in self.arcs():
if l1 < 0:
# Don't ever report -1 as a line number
@@ -308,10 +330,16 @@ class PythonParser:
return exit_counts
- def missing_arc_description(self, start, end, executed_arcs=None):
+ def missing_arc_description(
+ self,
+ start: int,
+ end: int,
+ executed_arcs: Optional[Set[TArc]]=None,
+ ) -> str:
"""Provide an English sentence describing a missing arc."""
if self._missing_arc_fragments is None:
self._analyze_ast()
+ assert self._missing_arc_fragments is not None
actual_start = start
@@ -351,18 +379,23 @@ class PythonParser:
class ByteParser:
"""Parse bytecode to understand the structure of code."""
- @contract(text='unicode')
- def __init__(self, text, code=None, filename=None):
+ def __init__(
+ self,
+ text: str,
+ code: Optional[CodeType]=None,
+ filename: Optional[str]=None,
+ ) -> None:
self.text = text
- if code:
+ if code is not None:
self.code = code
else:
+ assert filename is not None
try:
self.code = compile(text, filename, "exec")
except SyntaxError as synerr:
raise NotPython(
"Couldn't parse '%s' as Python source: '%s' at line %d" % (
- filename, synerr.msg, synerr.lineno
+ filename, synerr.msg, synerr.lineno or 0
)
) from synerr
@@ -375,7 +408,7 @@ class ByteParser:
"Run coverage.py under another Python for this command."
)
- def child_parsers(self):
+ def child_parsers(self) -> Iterable[ByteParser]:
"""Iterate over all the code objects nested within this one.
The iteration includes `self` as its first value.
@@ -383,7 +416,7 @@ class ByteParser:
"""
return (ByteParser(self.text, code=c) for c in code_objects(self.code))
- def _line_numbers(self):
+ def _line_numbers(self) -> Iterable[int]:
"""Yield the line numbers possible in this code object.
Uses co_lnotab described in Python/compile.c to find the
@@ -413,7 +446,7 @@ class ByteParser:
if line_num != last_line_num:
yield line_num
- def _find_statements(self):
+ def _find_statements(self) -> Iterable[int]:
"""Find the statements in `self.code`.
Produce a sequence of line numbers that start statements. Recurses
@@ -429,7 +462,37 @@ class ByteParser:
# AST analysis
#
-class BlockBase:
+class ArcStart(collections.namedtuple("Arc", "lineno, cause")):
+ """The information needed to start an arc.
+
+ `lineno` is the line number the arc starts from.
+
+ `cause` is an English text fragment used as the `startmsg` for
+ AstArcAnalyzer.missing_arc_fragments. It will be used to describe why an
+ arc wasn't executed, so should fit well into a sentence of the form,
+ "Line 17 didn't run because {cause}." The fragment can include "{lineno}"
+ to have `lineno` interpolated into it.
+
+ """
+ def __new__(cls, lineno: int, cause: Optional[str]=None) -> ArcStart:
+ return super().__new__(cls, lineno, cause)
+
+
+class TAddArcFn(Protocol):
+ """The type for AstArcAnalyzer.add_arc()."""
+ def __call__(
+ self,
+ start: int,
+ end: int,
+ smsg: Optional[str]=None,
+ emsg: Optional[str]=None,
+ ) -> None:
+ ...
+
+TArc = Tuple[int, int]
+TArcFragments = Dict[TArc, List[Tuple[Optional[str], Optional[str]]]]
+
+class Block:
"""
Blocks need to handle various exiting statements in their own ways.
@@ -439,56 +502,54 @@ class BlockBase:
stack.
"""
# pylint: disable=unused-argument
- def process_break_exits(self, exits, add_arc):
+ def process_break_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
"""Process break exits."""
# Because break can only appear in loops, and most subclasses
# implement process_break_exits, this function is never reached.
raise AssertionError
- def process_continue_exits(self, exits, add_arc):
+ def process_continue_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
"""Process continue exits."""
# Because continue can only appear in loops, and most subclasses
# implement process_continue_exits, this function is never reached.
raise AssertionError
- def process_raise_exits(self, exits, add_arc):
+ def process_raise_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
"""Process raise exits."""
return False
- def process_return_exits(self, exits, add_arc):
+ def process_return_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
"""Process return exits."""
return False
-class LoopBlock(BlockBase):
+class LoopBlock(Block):
"""A block on the block stack representing a `for` or `while` loop."""
- @contract(start=int)
- def __init__(self, start):
+ def __init__(self, start: int) -> None:
# The line number where the loop starts.
self.start = start
# A set of ArcStarts, the arcs from break statements exiting this loop.
- self.break_exits = set()
+ self.break_exits: Set[ArcStart] = set()
- def process_break_exits(self, exits, add_arc):
+ def process_break_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
self.break_exits.update(exits)
return True
- def process_continue_exits(self, exits, add_arc):
+ def process_continue_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
for xit in exits:
add_arc(xit.lineno, self.start, xit.cause)
return True
-class FunctionBlock(BlockBase):
+class FunctionBlock(Block):
"""A block on the block stack representing a function definition."""
- @contract(start=int, name=str)
- def __init__(self, start, name):
+ def __init__(self, start: int, name: str) -> None:
# The line number where the function starts.
self.start = start
# The name of the function.
self.name = name
- def process_raise_exits(self, exits, add_arc):
+ def process_raise_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
for xit in exits:
add_arc(
xit.lineno, -self.start, xit.cause,
@@ -496,7 +557,7 @@ class FunctionBlock(BlockBase):
)
return True
- def process_return_exits(self, exits, add_arc):
+ def process_return_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
for xit in exits:
add_arc(
xit.lineno, -self.start, xit.cause,
@@ -505,10 +566,9 @@ class FunctionBlock(BlockBase):
return True
-class TryBlock(BlockBase):
+class TryBlock(Block):
"""A block on the block stack representing a `try` block."""
- @contract(handler_start='int|None', final_start='int|None')
- def __init__(self, handler_start, final_start):
+ def __init__(self, handler_start: Optional[int], final_start: Optional[int]) -> None:
# The line number of the first "except" handler, if any.
self.handler_start = handler_start
# The line number of the "finally:" clause, if any.
@@ -516,24 +576,24 @@ class TryBlock(BlockBase):
# The ArcStarts for breaks/continues/returns/raises inside the "try:"
# that need to route through the "finally:" clause.
- self.break_from = set()
- self.continue_from = set()
- self.raise_from = set()
- self.return_from = set()
+ self.break_from: Set[ArcStart] = set()
+ self.continue_from: Set[ArcStart] = set()
+ self.raise_from: Set[ArcStart] = set()
+ self.return_from: Set[ArcStart] = set()
- def process_break_exits(self, exits, add_arc):
+ def process_break_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
if self.final_start is not None:
self.break_from.update(exits)
return True
return False
- def process_continue_exits(self, exits, add_arc):
+ def process_continue_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
if self.final_start is not None:
self.continue_from.update(exits)
return True
return False
- def process_raise_exits(self, exits, add_arc):
+ def process_raise_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
if self.handler_start is not None:
for xit in exits:
add_arc(xit.lineno, self.handler_start, xit.cause)
@@ -542,17 +602,16 @@ class TryBlock(BlockBase):
self.raise_from.update(exits)
return True
- def process_return_exits(self, exits, add_arc):
+ def process_return_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
if self.final_start is not None:
self.return_from.update(exits)
return True
return False
-class WithBlock(BlockBase):
+class WithBlock(Block):
"""A block on the block stack representing a `with` block."""
- @contract(start=int)
- def __init__(self, start):
+ def __init__(self, start: int) -> None:
# We only ever use this block if it is needed, so that we don't have to
# check this setting in all the methods.
assert env.PYBEHAVIOR.exit_through_with
@@ -562,11 +621,16 @@ class WithBlock(BlockBase):
# The ArcStarts for breaks/continues/returns/raises inside the "with:"
# that need to go through the with-statement while exiting.
- self.break_from = set()
- self.continue_from = set()
- self.return_from = set()
-
- def _process_exits(self, exits, add_arc, from_set=None):
+ self.break_from: Set[ArcStart] = set()
+ self.continue_from: Set[ArcStart] = set()
+ self.return_from: Set[ArcStart] = set()
+
+ def _process_exits(
+ self,
+ exits: Set[ArcStart],
+ add_arc: TAddArcFn,
+ from_set: Optional[Set[ArcStart]]=None,
+ ) -> bool:
"""Helper to process the four kinds of exits."""
for xit in exits:
add_arc(xit.lineno, self.start, xit.cause)
@@ -574,43 +638,27 @@ class WithBlock(BlockBase):
from_set.update(exits)
return True
- def process_break_exits(self, exits, add_arc):
+ def process_break_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
return self._process_exits(exits, add_arc, self.break_from)
- def process_continue_exits(self, exits, add_arc):
+ def process_continue_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
return self._process_exits(exits, add_arc, self.continue_from)
- def process_raise_exits(self, exits, add_arc):
+ def process_raise_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
return self._process_exits(exits, add_arc)
- def process_return_exits(self, exits, add_arc):
+ def process_return_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
return self._process_exits(exits, add_arc, self.return_from)
-class ArcStart(collections.namedtuple("Arc", "lineno, cause")):
- """The information needed to start an arc.
-
- `lineno` is the line number the arc starts from.
-
- `cause` is an English text fragment used as the `startmsg` for
- AstArcAnalyzer.missing_arc_fragments. It will be used to describe why an
- arc wasn't executed, so should fit well into a sentence of the form,
- "Line 17 didn't run because {cause}." The fragment can include "{lineno}"
- to have `lineno` interpolated into it.
-
- """
- def __new__(cls, lineno, cause=None):
- return super().__new__(cls, lineno, cause)
-
-
-class NodeList:
+class NodeList(ast.AST):
"""A synthetic fictitious node, containing a sequence of nodes.
This is used when collapsing optimized if-statements, to represent the
unconditional execution of one of the clauses.
"""
- def __init__(self, body):
+ def __init__(self, body: Sequence[ast.AST]) -> None:
self.body = body
self.lineno = body[0].lineno
@@ -618,12 +666,19 @@ class NodeList:
# TODO: the cause messages have too many commas.
# TODO: Shouldn't the cause messages join with "and" instead of "or"?
+def _make_expression_code_method(noun: str) -> Callable[[AstArcAnalyzer, ast.AST], None]:
+ """A function to make methods for expression-based callable _code_object__ methods."""
+ def _code_object__expression_callable(self: AstArcAnalyzer, node: ast.AST) -> None:
+ start = self.line_for_node(node)
+ self.add_arc(-start, start, None, f"didn't run the {noun} on line {start}")
+ self.add_arc(start, -start, None, f"didn't finish the {noun} on line {start}")
+ return _code_object__expression_callable
+
class AstArcAnalyzer:
"""Analyze source text with an AST to find executable code paths."""
- @contract(text='unicode', statements=set)
- def __init__(self, text, statements, multiline):
+ def __init__(self, text: str, statements: Set[int], multiline: Dict[int, int]) -> None:
self.root_node = ast.parse(text)
# TODO: I think this is happening in too many places.
self.statements = {multiline.get(l, l) for l in statements}
@@ -639,20 +694,20 @@ class AstArcAnalyzer:
print(f"Multiline map: {self.multiline}")
ast_dump(self.root_node)
- self.arcs = set()
+ self.arcs: Set[TArc] = set()
# A map from arc pairs to a list of pairs of sentence fragments:
# { (start, end): [(startmsg, endmsg), ...], }
#
# For an arc from line 17, they should be usable like:
# "Line 17 {endmsg}, because {startmsg}"
- self.missing_arc_fragments = collections.defaultdict(list)
- self.block_stack = []
+ self.missing_arc_fragments: TArcFragments = collections.defaultdict(list)
+ self.block_stack: List[Block] = []
# $set_env.py: COVERAGE_TRACK_ARCS - Trace possible arcs added while parsing code.
self.debug = bool(int(os.environ.get("COVERAGE_TRACK_ARCS", 0)))
- def analyze(self):
+ def analyze(self) -> None:
"""Examine the AST tree from `root_node` to determine possible arcs.
This sets the `arcs` attribute to be a set of (from, to) line number
@@ -665,8 +720,13 @@ class AstArcAnalyzer:
if code_object_handler is not None:
code_object_handler(node)
- @contract(start=int, end=int)
- def add_arc(self, start, end, smsg=None, emsg=None):
+ def add_arc(
+ self,
+ start: int,
+ end: int,
+ smsg: Optional[str]=None,
+ emsg: Optional[str]=None,
+ ) -> None:
"""Add an arc, including message fragments to use if it is missing."""
if self.debug: # pragma: debugging
print(f"\nAdding possible arc: ({start}, {end}): {smsg!r}, {emsg!r}")
@@ -676,25 +736,27 @@ class AstArcAnalyzer:
if smsg is not None or emsg is not None:
self.missing_arc_fragments[(start, end)].append((smsg, emsg))
- def nearest_blocks(self):
+ def nearest_blocks(self) -> Iterable[Block]:
"""Yield the blocks in nearest-to-farthest order."""
return reversed(self.block_stack)
- @contract(returns=int)
- def line_for_node(self, node):
+ def line_for_node(self, node: ast.AST) -> int:
"""What is the right line number to use for this node?
This dispatches to _line__Node functions where needed.
"""
node_name = node.__class__.__name__
- handler = getattr(self, "_line__" + node_name, None)
+ handler = cast(
+ Optional[Callable[[ast.AST], int]],
+ getattr(self, "_line__" + node_name, None)
+ )
if handler is not None:
return handler(node)
else:
return node.lineno
- def _line_decorated(self, node):
+ def _line_decorated(self, node: ast.FunctionDef) -> int:
"""Compute first line number for things that can be decorated (classes and functions)."""
lineno = node.lineno
if env.PYBEHAVIOR.trace_decorated_def or env.PYBEHAVIOR.def_ast_no_decorator:
@@ -702,12 +764,12 @@ class AstArcAnalyzer:
lineno = node.decorator_list[0].lineno
return lineno
- def _line__Assign(self, node):
+ def _line__Assign(self, node: ast.Assign) -> int:
return self.line_for_node(node.value)
_line__ClassDef = _line_decorated
- def _line__Dict(self, node):
+ def _line__Dict(self, node: ast.Dict) -> int:
if node.keys:
if node.keys[0] is not None:
return node.keys[0].lineno
@@ -721,13 +783,13 @@ class AstArcAnalyzer:
_line__FunctionDef = _line_decorated
_line__AsyncFunctionDef = _line_decorated
- def _line__List(self, node):
+ def _line__List(self, node: ast.List) -> int:
if node.elts:
return self.line_for_node(node.elts[0])
else:
return node.lineno
- def _line__Module(self, node):
+ def _line__Module(self, node: ast.Module) -> int:
if env.PYBEHAVIOR.module_firstline_1:
return 1
elif node.body:
@@ -742,8 +804,7 @@ class AstArcAnalyzer:
"Import", "ImportFrom", "Nonlocal", "Pass",
}
- @contract(returns='ArcStarts')
- def add_arcs(self, node):
+ def add_arcs(self, node: ast.AST) -> Set[ArcStart]:
"""Add the arcs for `node`.
Return a set of ArcStarts, exits from this node to the next. Because a
@@ -760,7 +821,10 @@ class AstArcAnalyzer:
"""
node_name = node.__class__.__name__
- handler = getattr(self, "_handle__" + node_name, None)
+ handler = cast(
+ Optional[Callable[[ast.AST], Set[ArcStart]]],
+ getattr(self, "_handle__" + node_name, None)
+ )
if handler is not None:
return handler(node)
else:
@@ -773,8 +837,12 @@ class AstArcAnalyzer:
# Default for simple statements: one exit from this node.
return {ArcStart(self.line_for_node(node))}
- @contract(returns='ArcStarts')
- def add_body_arcs(self, body, from_start=None, prev_starts=None):
+ def add_body_arcs(
+ self,
+ body: Sequence[ast.AST],
+ from_start: Optional[ArcStart]=None,
+ prev_starts: Optional[Set[ArcStart]]=None
+ ) -> Set[ArcStart]:
"""Add arcs for the body of a compound statement.
`body` is the body node. `from_start` is a single `ArcStart` that can
@@ -786,21 +854,23 @@ class AstArcAnalyzer:
"""
if prev_starts is None:
+ assert from_start is not None
prev_starts = {from_start}
for body_node in body:
lineno = self.line_for_node(body_node)
first_line = self.multiline.get(lineno, lineno)
if first_line not in self.statements:
- body_node = self.find_non_missing_node(body_node)
- if body_node is None:
+ maybe_body_node = self.find_non_missing_node(body_node)
+ if maybe_body_node is None:
continue
+ body_node = maybe_body_node
lineno = self.line_for_node(body_node)
for prev_start in prev_starts:
self.add_arc(prev_start.lineno, lineno, prev_start.cause)
prev_starts = self.add_arcs(body_node)
return prev_starts
- def find_non_missing_node(self, node):
+ def find_non_missing_node(self, node: ast.AST) -> Optional[ast.AST]:
"""Search `node` looking for a child that has not been optimized away.
This might return the node you started with, or it will work recursively
@@ -817,12 +887,15 @@ class AstArcAnalyzer:
if first_line in self.statements:
return node
- missing_fn = getattr(self, "_missing__" + node.__class__.__name__, None)
- if missing_fn:
- node = missing_fn(node)
+ missing_fn = cast(
+ Optional[Callable[[ast.AST], Optional[ast.AST]]],
+ getattr(self, "_missing__" + node.__class__.__name__, None)
+ )
+ if missing_fn is not None:
+ ret_node = missing_fn(node)
else:
- node = None
- return node
+ ret_node = None
+ return ret_node
# Missing nodes: _missing__*
#
@@ -831,7 +904,7 @@ class AstArcAnalyzer:
# find_non_missing_node) to find a node to use instead of the missing
# node. They can return None if the node should truly be gone.
- def _missing__If(self, node):
+ def _missing__If(self, node: ast.If) -> Optional[ast.AST]:
# If the if-node is missing, then one of its children might still be
# here, but not both. So return the first of the two that isn't missing.
# Use a NodeList to hold the clauses as a single node.
@@ -842,14 +915,14 @@ class AstArcAnalyzer:
return self.find_non_missing_node(NodeList(node.orelse))
return None
- def _missing__NodeList(self, node):
+ def _missing__NodeList(self, node: NodeList) -> Optional[ast.AST]:
# A NodeList might be a mixture of missing and present nodes. Find the
# ones that are present.
non_missing_children = []
for child in node.body:
- child = self.find_non_missing_node(child)
- if child is not None:
- non_missing_children.append(child)
+ maybe_child = self.find_non_missing_node(child)
+ if maybe_child is not None:
+ non_missing_children.append(maybe_child)
# Return the simplest representation of the present children.
if not non_missing_children:
@@ -858,7 +931,7 @@ class AstArcAnalyzer:
return non_missing_children[0]
return NodeList(non_missing_children)
- def _missing__While(self, node):
+ def _missing__While(self, node: ast.While) -> Optional[ast.AST]:
body_nodes = self.find_non_missing_node(NodeList(node.body))
if not body_nodes:
return None
@@ -868,16 +941,17 @@ class AstArcAnalyzer:
new_while.test = ast.Name()
new_while.test.lineno = body_nodes.lineno
new_while.test.id = "True"
+ assert hasattr(body_nodes, "body")
new_while.body = body_nodes.body
- new_while.orelse = None
+ new_while.orelse = []
return new_while
- def is_constant_expr(self, node):
+ def is_constant_expr(self, node: ast.AST) -> Optional[str]:
"""Is this a compile-time constant?"""
node_name = node.__class__.__name__
if node_name in ["Constant", "NameConstant", "Num"]:
return "Num"
- elif node_name == "Name":
+ elif isinstance(node, ast.Name):
if node.id in ["True", "False", "None", "__debug__"]:
return "Name"
return None
@@ -889,7 +963,6 @@ class AstArcAnalyzer:
# listcomps hidden in lists: x = [[i for i in range(10)]]
# nested function definitions
-
# Exit processing: process_*_exits
#
# These functions process the four kinds of jump exits: break, continue,
@@ -898,29 +971,25 @@ class AstArcAnalyzer:
# enclosing loop block, or the nearest enclosing finally block, whichever
# is nearer.
- @contract(exits='ArcStarts')
- def process_break_exits(self, exits):
+ def process_break_exits(self, exits: Set[ArcStart]) -> None:
"""Add arcs due to jumps from `exits` being breaks."""
for block in self.nearest_blocks(): # pragma: always breaks
if block.process_break_exits(exits, self.add_arc):
break
- @contract(exits='ArcStarts')
- def process_continue_exits(self, exits):
+ def process_continue_exits(self, exits: Set[ArcStart]) -> None:
"""Add arcs due to jumps from `exits` being continues."""
for block in self.nearest_blocks(): # pragma: always breaks
if block.process_continue_exits(exits, self.add_arc):
break
- @contract(exits='ArcStarts')
- def process_raise_exits(self, exits):
+ def process_raise_exits(self, exits: Set[ArcStart]) -> None:
"""Add arcs due to jumps from `exits` being raises."""
for block in self.nearest_blocks():
if block.process_raise_exits(exits, self.add_arc):
break
- @contract(exits='ArcStarts')
- def process_return_exits(self, exits):
+ def process_return_exits(self, exits: Set[ArcStart]) -> None:
"""Add arcs due to jumps from `exits` being returns."""
for block in self.nearest_blocks(): # pragma: always breaks
if block.process_return_exits(exits, self.add_arc):
@@ -937,17 +1006,16 @@ class AstArcAnalyzer:
# Every node type that represents a statement should have a handler, or it
# should be listed in OK_TO_DEFAULT.
- @contract(returns='ArcStarts')
- def _handle__Break(self, node):
+ def _handle__Break(self, node: ast.Break) -> Set[ArcStart]:
here = self.line_for_node(node)
break_start = ArcStart(here, cause="the break on line {lineno} wasn't executed")
- self.process_break_exits([break_start])
+ self.process_break_exits({break_start})
return set()
- @contract(returns='ArcStarts')
- def _handle_decorated(self, node):
+ def _handle_decorated(self, node: ast.FunctionDef) -> Set[ArcStart]:
"""Add arcs for things that can be decorated (classes and functions)."""
- main_line = last = node.lineno
+ main_line: int = node.lineno
+ last: Optional[int] = node.lineno
decs = node.decorator_list
if decs:
if env.PYBEHAVIOR.trace_decorated_def or env.PYBEHAVIOR.def_ast_no_decorator:
@@ -957,6 +1025,7 @@ class AstArcAnalyzer:
if last is not None and dec_start != last:
self.add_arc(last, dec_start)
last = dec_start
+ assert last is not None
if env.PYBEHAVIOR.trace_decorated_def:
self.add_arc(last, main_line)
last = main_line
@@ -977,19 +1046,18 @@ class AstArcAnalyzer:
self.add_arc(last, lineno)
last = lineno
# The body is handled in collect_arcs.
+ assert last is not None
return {ArcStart(last)}
_handle__ClassDef = _handle_decorated
- @contract(returns='ArcStarts')
- def _handle__Continue(self, node):
+ def _handle__Continue(self, node: ast.Continue) -> Set[ArcStart]:
here = self.line_for_node(node)
continue_start = ArcStart(here, cause="the continue on line {lineno} wasn't executed")
- self.process_continue_exits([continue_start])
+ self.process_continue_exits({continue_start})
return set()
- @contract(returns='ArcStarts')
- def _handle__For(self, node):
+ def _handle__For(self, node: ast.For) -> Set[ArcStart]:
start = self.line_for_node(node.iter)
self.block_stack.append(LoopBlock(start=start))
from_start = ArcStart(start, cause="the loop on line {lineno} never started")
@@ -998,6 +1066,7 @@ class AstArcAnalyzer:
for xit in exits:
self.add_arc(xit.lineno, start, xit.cause)
my_block = self.block_stack.pop()
+ assert isinstance(my_block, LoopBlock)
exits = my_block.break_exits
from_start = ArcStart(start, cause="the loop on line {lineno} didn't complete")
if node.orelse:
@@ -1013,8 +1082,7 @@ class AstArcAnalyzer:
_handle__FunctionDef = _handle_decorated
_handle__AsyncFunctionDef = _handle_decorated
- @contract(returns='ArcStarts')
- def _handle__If(self, node):
+ def _handle__If(self, node: ast.If) -> Set[ArcStart]:
start = self.line_for_node(node.test)
from_start = ArcStart(start, cause="the condition on line {lineno} was never true")
exits = self.add_body_arcs(node.body, from_start=from_start)
@@ -1022,51 +1090,50 @@ class AstArcAnalyzer:
exits |= self.add_body_arcs(node.orelse, from_start=from_start)
return exits
- @contract(returns='ArcStarts')
- def _handle__Match(self, node):
- start = self.line_for_node(node)
- last_start = start
- exits = set()
- had_wildcard = False
- for case in node.cases:
- case_start = self.line_for_node(case.pattern)
- pattern = case.pattern
- while isinstance(pattern, ast.MatchOr):
- pattern = pattern.patterns[-1]
- if isinstance(pattern, ast.MatchAs):
- had_wildcard = True
- self.add_arc(last_start, case_start, "the pattern on line {lineno} always matched")
- from_start = ArcStart(case_start, cause="the pattern on line {lineno} never matched")
- exits |= self.add_body_arcs(case.body, from_start=from_start)
- last_start = case_start
- if not had_wildcard:
- exits.add(from_start)
- return exits
+ if sys.version_info >= (3, 10):
+ def _handle__Match(self, node: ast.Match) -> Set[ArcStart]:
+ start = self.line_for_node(node)
+ last_start = start
+ exits = set()
+ had_wildcard = False
+ for case in node.cases:
+ case_start = self.line_for_node(case.pattern)
+ pattern = case.pattern
+ while isinstance(pattern, ast.MatchOr):
+ pattern = pattern.patterns[-1]
+ if isinstance(pattern, ast.MatchAs):
+ had_wildcard = True
+ self.add_arc(last_start, case_start, "the pattern on line {lineno} always matched")
+ from_start = ArcStart(
+ case_start,
+ cause="the pattern on line {lineno} never matched",
+ )
+ exits |= self.add_body_arcs(case.body, from_start=from_start)
+ last_start = case_start
+ if not had_wildcard:
+ exits.add(from_start)
+ return exits
- @contract(returns='ArcStarts')
- def _handle__NodeList(self, node):
+ def _handle__NodeList(self, node: NodeList) -> Set[ArcStart]:
start = self.line_for_node(node)
exits = self.add_body_arcs(node.body, from_start=ArcStart(start))
return exits
- @contract(returns='ArcStarts')
- def _handle__Raise(self, node):
+ def _handle__Raise(self, node: ast.Raise) -> Set[ArcStart]:
here = self.line_for_node(node)
raise_start = ArcStart(here, cause="the raise on line {lineno} wasn't executed")
- self.process_raise_exits([raise_start])
+ self.process_raise_exits({raise_start})
# `raise` statement jumps away, no exits from here.
return set()
- @contract(returns='ArcStarts')
- def _handle__Return(self, node):
+ def _handle__Return(self, node: ast.Return) -> Set[ArcStart]:
here = self.line_for_node(node)
return_start = ArcStart(here, cause="the return on line {lineno} wasn't executed")
- self.process_return_exits([return_start])
+ self.process_return_exits({return_start})
# `return` statement jumps away, no exits from here.
return set()
- @contract(returns='ArcStarts')
- def _handle__Try(self, node):
+ def _handle__Try(self, node: ast.Try) -> Set[ArcStart]:
if node.handlers:
handler_start = self.line_for_node(node.handlers[0])
else:
@@ -1099,10 +1166,10 @@ class AstArcAnalyzer:
else:
self.block_stack.pop()
- handler_exits = set()
+ handler_exits: Set[ArcStart] = set()
if node.handlers:
- last_handler_start = None
+ last_handler_start: Optional[int] = None
for handler_node in node.handlers:
handler_start = self.line_for_node(handler_node)
if last_handler_start is not None:
@@ -1177,8 +1244,7 @@ class AstArcAnalyzer:
return exits
- @contract(starts='ArcStarts', exits='ArcStarts', returns='ArcStarts')
- def _combine_finally_starts(self, starts, exits):
+ def _combine_finally_starts(self, starts: Set[ArcStart], exits: Set[ArcStart]) -> Set[ArcStart]:
"""Helper for building the cause of `finally` branches.
"finally" clauses might not execute their exits, and the causes could
@@ -1193,8 +1259,7 @@ class AstArcAnalyzer:
exits = {ArcStart(xit.lineno, cause) for xit in exits}
return exits
- @contract(returns='ArcStarts')
- def _handle__While(self, node):
+ def _handle__While(self, node: ast.While) -> Set[ArcStart]:
start = to_top = self.line_for_node(node.test)
constant_test = self.is_constant_expr(node.test)
top_is_body0 = False
@@ -1211,6 +1276,7 @@ class AstArcAnalyzer:
self.add_arc(xit.lineno, to_top, xit.cause)
exits = set()
my_block = self.block_stack.pop()
+ assert isinstance(my_block, LoopBlock)
exits.update(my_block.break_exits)
from_start = ArcStart(start, cause="the condition on line {lineno} was never false")
if node.orelse:
@@ -1222,14 +1288,14 @@ class AstArcAnalyzer:
exits.add(from_start)
return exits
- @contract(returns='ArcStarts')
- def _handle__With(self, node):
+ def _handle__With(self, node: ast.With) -> Set[ArcStart]:
start = self.line_for_node(node)
if env.PYBEHAVIOR.exit_through_with:
self.block_stack.append(WithBlock(start=start))
exits = self.add_body_arcs(node.body, from_start=ArcStart(start))
if env.PYBEHAVIOR.exit_through_with:
with_block = self.block_stack.pop()
+ assert isinstance(with_block, WithBlock)
with_exit = {ArcStart(start)}
if exits:
for xit in exits:
@@ -1256,7 +1322,7 @@ class AstArcAnalyzer:
# These methods are used by analyze() as the start of the analysis.
# There is one for each construct with a code object.
- def _code_object__Module(self, node):
+ def _code_object__Module(self, node: ast.Module) -> None:
start = self.line_for_node(node)
if node.body:
exits = self.add_body_arcs(node.body, from_start=ArcStart(-start))
@@ -1267,7 +1333,7 @@ class AstArcAnalyzer:
self.add_arc(-start, start)
self.add_arc(start, -start)
- def _code_object__FunctionDef(self, node):
+ def _code_object__FunctionDef(self, node: ast.FunctionDef) -> None:
start = self.line_for_node(node)
self.block_stack.append(FunctionBlock(start=start, name=node.name))
exits = self.add_body_arcs(node.body, from_start=ArcStart(-start))
@@ -1276,7 +1342,7 @@ class AstArcAnalyzer:
_code_object__AsyncFunctionDef = _code_object__FunctionDef
- def _code_object__ClassDef(self, node):
+ def _code_object__ClassDef(self, node: ast.ClassDef) -> None:
start = self.line_for_node(node)
self.add_arc(-start, start)
exits = self.add_body_arcs(node.body, from_start=ArcStart(start))
@@ -1286,14 +1352,6 @@ class AstArcAnalyzer:
f"didn't exit the body of class {node.name!r}",
)
- def _make_expression_code_method(noun): # pylint: disable=no-self-argument
- """A function to make methods for expression-based callable _code_object__ methods."""
- def _code_object__expression_callable(self, node):
- start = self.line_for_node(node)
- self.add_arc(-start, start, None, f"didn't run the {noun} on line {start}")
- self.add_arc(start, -start, None, f"didn't finish the {noun} on line {start}")
- return _code_object__expression_callable
-
_code_object__Lambda = _make_expression_code_method("lambda")
_code_object__GeneratorExp = _make_expression_code_method("generator expression")
_code_object__DictComp = _make_expression_code_method("dictionary comprehension")
@@ -1305,14 +1363,18 @@ class AstArcAnalyzer:
SKIP_DUMP_FIELDS = ["ctx"]
-def _is_simple_value(value):
+def _is_simple_value(value: Any) -> bool:
"""Is `value` simple enough to be displayed on a single line?"""
return (
- value in [None, [], (), {}, set()] or
+ value in [None, [], (), {}, set(), frozenset(), Ellipsis] or
isinstance(value, (bytes, int, float, str))
)
-def ast_dump(node, depth=0, print=print): # pylint: disable=redefined-builtin
+def ast_dump(
+ node: ast.AST,
+ depth:int = 0,
+ print: Callable[[str], None]=print, # pylint: disable=redefined-builtin
+) -> None:
"""Dump the AST for `node`.
This recursively walks the AST, printing a readable version.
@@ -1323,6 +1385,7 @@ def ast_dump(node, depth=0, print=print): # pylint: disable=redefined-builtin
if lineno is not None:
linemark = f" @ {node.lineno},{node.col_offset}"
if hasattr(node, "end_lineno"):
+ assert hasattr(node, "end_col_offset")
linemark += ":"
if node.end_lineno != node.lineno:
linemark += f"{node.end_lineno},"
@@ -1344,7 +1407,7 @@ def ast_dump(node, depth=0, print=print): # pylint: disable=redefined-builtin
else:
print(head)
if 0:
- print("{}# mro: {}".format(
+ print("{}# mro: {}".format( # type: ignore[unreachable]
indent, ", ".join(c.__name__ for c in node.__class__.__mro__[1:]),
))
next_indent = indent + " "