"""Core control stuff for coverage.py""" import os, re, sys from coverage.data import CoverageData from coverage.misc import nice_pair, CoverageException from coverage.codeunit import code_unit_factory from coverage.files import FileLocator class coverage: def __init__(self): from coverage.collector import Collector self.parallel_mode = False self.exclude_re = '' self.nesting = 0 self.cstack = [] self.xstack = [] self.file_locator = FileLocator() self.collector = Collector(self.should_trace) self.data = CoverageData() # Cache of results of calling the analysis2() method, so that you can # specify both -r and -a without doing double work. self.analysis_cache = {} # The default exclude pattern. self.exclude('# *pragma[: ]*[nN][oO] *[cC][oO][vV][eE][rR]') # Save coverage data when Python exits. import atexit atexit.register(self.save) def should_trace(self, filename): """Decide whether to trace execution in `filename` Returns a canonicalized filename if it should be traced, False if it should not. """ if filename == '': # There's no point in ever tracing string executions, we can't do # anything with the data later anyway. return False # TODO: flag: ignore std lib? # TODO: ignore by module as well as file? return self.file_locator.canonical_filename(filename) def use_cache(self, usecache, cache_file=None): self.data.usefile(usecache, cache_file) def get_ready(self): self.collector.reset() self.data.read(parallel=self.parallel_mode) self.analysis_cache = {} def start(self): self.get_ready() if self.nesting == 0: #pragma: no cover self.collector.start() self.nesting += 1 def stop(self): self.nesting -= 1 if self.nesting == 0: #pragma: no cover self.collector.stop() def erase(self): self.get_ready() self.collector.reset() self.analysis_cache = {} self.data.erase() def exclude(self, regex): if self.exclude_re: self.exclude_re += "|" self.exclude_re += "(" + regex + ")" def begin_recursive(self): #self.cstack.append(self.c) self.xstack.append(self.exclude_re) def end_recursive(self): #self.c = self.cstack.pop() self.exclude_re = self.xstack.pop() def save(self): self.group_collected_data() self.data.write() def combine(self): """Entry point for combining together parallel-mode coverage data.""" self.data.combine_parallel_data() def group_collected_data(self): """Group the collected data by filename and reset the collector.""" self.data.add_raw_data(self.collector.data_points()) self.collector.reset() def analyze(self, code_unit): """Analyze a single code unit. If the source code can't be found, raise an error. Otherwise, return a tuple of (1) the canonical filename of the source code for the module, (2) a list of lines of statements in the source code, (3) a list of lines of excluded statements, and (4), a map of line numbers to multi-line line number ranges, for statements that cross lines. """ from coverage.analyzer import CodeAnalyzer if self.analysis_cache.has_key(code_unit.filename): return self.analysis_cache[code_unit.filename] filename = code_unit.filename ext = os.path.splitext(filename)[1] source = None if ext == '.pyc': filename = filename[:-1] ext = '.py' if ext == '.py': if not os.path.exists(filename): source = self.file_locator.get_zip_data(filename) if not source: raise CoverageException( "No source for code '%s'." % code_unit.filename ) analyzer = CodeAnalyzer() lines, excluded_lines, line_map = analyzer.analyze_source( text=source, filename=filename, exclude=self.exclude_re ) result = filename, lines, excluded_lines, line_map self.analysis_cache[code_unit.filename] = result return result def format_lines(self, statements, lines): """Nicely format a list of line numbers. Format a list of line numbers for printing by coalescing groups of lines as long as the lines represent consecutive statements. This will coalesce even if there are gaps between statements. For example, if `statements` is [1,2,3,4,5,10,11,12,13,14] and `lines` is [1,2,5,10,11,13,14] then the result will be "1-2, 5-11, 13-14". """ pairs = [] i = 0 j = 0 start = None pairs = [] while i < len(statements) and j < len(lines): if statements[i] == lines[j]: if start == None: start = lines[j] end = lines[j] j = j + 1 elif start: pairs.append((start, end)) start = None i = i + 1 if start: pairs.append((start, end)) ret = ', '.join(map(nice_pair, pairs)) return ret # Backward compatibility with version 1. def analysis(self, morf): f, s, _, m, mf = self.analysis2(morf) return f, s, m, mf def analysis2(self, morf): code_units = code_unit_factory(morf, self.file_locator) return self.analysis_engine(code_units[0]) def analysis_engine(self, code_unit): filename, statements, excluded, line_map = self.analyze(code_unit) self.group_collected_data() # Identify missing statements. missing = [] execed = self.data.executed_lines(filename) for line in statements: lines = line_map.get(line) if lines: for l in range(lines[0], lines[1]+1): if l in execed: break else: missing.append(line) else: if line not in execed: missing.append(line) return (filename, statements, excluded, missing, self.format_lines(statements, missing)) # Programmatic entry point def report(self, morfs, show_missing=True, ignore_errors=False, file=None): self.report_engine(morfs, show_missing=show_missing, ignore_errors=ignore_errors, outfile=file) def report_engine(self, morfs, show_missing=True, ignore_errors=False, outfile=None, omit_prefixes=None): morfs = morfs or self.data.executed_files() code_units = code_unit_factory(morfs, self.file_locator, omit_prefixes) code_units.sort() max_name = max(5, max(map(lambda cu: len(cu.name), code_units))) fmt_name = "%%- %ds " % max_name fmt_err = fmt_name + "%s: %s" header = fmt_name % "Name" + " Stmts Exec Cover" fmt_coverage = fmt_name + "% 6d % 6d % 5d%%" if show_missing: header = header + " Missing" fmt_coverage = fmt_coverage + " %s" if not outfile: outfile = sys.stdout print >>outfile, header print >>outfile, "-" * len(header) total_statements = 0 total_executed = 0 for cu in code_units: try: _, statements, _, missing, readable = self.analysis_engine(cu) n = len(statements) m = n - len(missing) if n > 0: pc = 100.0 * m / n else: pc = 100.0 args = (cu.name, n, m, pc) if show_missing: args = args + (readable,) print >>outfile, fmt_coverage % args total_statements = total_statements + n total_executed = total_executed + m except KeyboardInterrupt: #pragma: no cover raise except: if not ignore_errors: typ, msg = sys.exc_info()[:2] print >>outfile, fmt_err % (cu.name, typ, msg) if len(code_units) > 1: print >>outfile, "-" * len(header) if total_statements > 0: pc = 100.0 * total_executed / total_statements else: pc = 100.0 args = ("TOTAL", total_statements, total_executed, pc) if show_missing: args = args + ("",) print >>outfile, fmt_coverage % args # annotate(morfs, ignore_errors). blank_re = re.compile(r"\s*(#|$)") else_re = re.compile(r"\s*else\s*:\s*(#|$)") def annotate(self, morfs, directory=None, ignore_errors=False, omit_prefixes=None): morfs = morfs or self.data.executed_files() code_units = code_unit_factory(morfs, self.file_locator, omit_prefixes) for cu in code_units: try: filename, statements, excluded, missing, _ = self.analysis_engine(cu) self.annotate_file(filename, statements, excluded, missing, directory) except KeyboardInterrupt: raise except: if not ignore_errors: raise def annotate_file(self, filename, statements, excluded, missing, directory=None): source = open(filename, 'r') if directory: dest_file = os.path.join(directory, os.path.basename(filename) + ',cover') else: dest_file = filename + ',cover' dest = open(dest_file, 'w') lineno = 0 i = 0 j = 0 covered = True while True: line = source.readline() if line == '': break lineno = lineno + 1 while i < len(statements) and statements[i] < lineno: i = i + 1 while j < len(missing) and missing[j] < lineno: j = j + 1 if i < len(statements) and statements[i] == lineno: covered = j >= len(missing) or missing[j] > lineno if self.blank_re.match(line): dest.write(' ') elif self.else_re.match(line): # Special logic for lines containing only 'else:'. if i >= len(statements) and j >= len(missing): dest.write('! ') elif i >= len(statements) or j >= len(missing): dest.write('> ') elif statements[i] == missing[j]: dest.write('! ') else: dest.write('> ') elif lineno in excluded: dest.write('- ') elif covered: dest.write('> ') else: dest.write('! ') dest.write(line) source.close() dest.close()