diff options
Diffstat (limited to 'coverage/debug.py')
-rw-r--r-- | coverage/debug.py | 164 |
1 files changed, 111 insertions, 53 deletions
diff --git a/coverage/debug.py b/coverage/debug.py index e68736f6..9077a3af 100644 --- a/coverage/debug.py +++ b/coverage/debug.py @@ -1,12 +1,13 @@ # Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0 -# For details: https://bitbucket.org/ned/coveragepy/src/default/NOTICE.txt +# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt """Control of and utilities for debugging.""" import contextlib +import functools import inspect +import itertools import os -import re import sys try: import _thread @@ -24,27 +25,26 @@ os = isolate_module(os) # This is a list of forced debugging options. FORCED_DEBUG = [] -# A hack for debugging testing in sub-processes. -_TEST_NAME_FILE = "" # "/tmp/covtest.txt" - class DebugControl(object): """Control and output for debugging.""" + show_repr_attr = False # For SimpleReprMixin + def __init__(self, options, output): """Configure the options and output file for debugging.""" self.options = list(options) + FORCED_DEBUG - self.raw_output = output self.suppress_callers = False filters = [] if self.should('pid'): filters.append(add_pid_and_tid) - self.output = DebugOutputFile( - self.raw_output, + self.output = DebugOutputFile.get_one( + output, show_process=self.should('process'), filters=filters, ) + self.raw_output = self.output.outfile def __repr__(self): return "<DebugControl options=%r raw_output=%r>" % (self.options, self.raw_output) @@ -71,6 +71,10 @@ class DebugControl(object): `msg` is the line to write. A newline will be appended. """ + if self.should('self'): + caller_self = inspect.stack()[1][0].f_locals.get('self') + if caller_self is not None: + msg = "[self: {!r}] {}".format(caller_self, msg) self.output.write(msg+"\n") if self.should('callers'): dump_stack_frames(out=self.output, skip=1) @@ -167,6 +171,20 @@ def add_pid_and_tid(text): return text +class SimpleReprMixin(object): + """A mixin implementing a simple __repr__.""" + def __repr__(self): + show_attrs = ( + (k, v) for k, v in self.__dict__.items() + if getattr(v, "show_repr_attr", True) + ) + return "<{klass} @0x{id:x} {attrs}>".format( + klass=self.__class__.__name__, + id=id(self), + attrs=" ".join("{}={!r}".format(k, v) for k, v in show_attrs), + ) + + def filter_text(text, filters): """Run `text` through a series of filters. @@ -215,26 +233,40 @@ class DebugOutputFile(object): # pragma: debugging self.write("New process: executable: %s\n" % (sys.executable,)) self.write("New process: cmd: %s\n" % (cmd,)) if hasattr(os, 'getppid'): - self.write("New process: parent pid: %s\n" % (os.getppid(),)) + self.write("New process: pid: %s, parent pid: %s\n" % (os.getpid(), os.getppid())) SYS_MOD_NAME = '$coverage.debug.DebugOutputFile.the_one' @classmethod - def the_one(cls, fileobj=None, show_process=True, filters=()): - """Get the process-wide singleton DebugOutputFile. + def get_one(cls, fileobj=None, show_process=True, filters=()): + """Get a DebugOutputFile. + + If `fileobj` is provided, then a new DebugOutputFile is made with it. + + If `fileobj` isn't provided, then a file is chosen + (COVERAGE_DEBUG_FILE, or stderr), and a process-wide singleton + DebugOutputFile is made. - If it doesn't exist yet, then create it as a wrapper around the file - object `fileobj`. `show_process` controls whether the debug file adds - process-level information. + `show_process` controls whether the debug file adds process-level + information, and filters is a list of other message filters to apply. """ + if fileobj is not None: + # Make DebugOutputFile around the fileobj passed. + return cls(fileobj, show_process, filters) + # Because of the way igor.py deletes and re-imports modules, # this class can be defined more than once. But we really want # a process-wide singleton. So stash it in sys.modules instead of # on a class attribute. Yes, this is aggressively gross. the_one = sys.modules.get(cls.SYS_MOD_NAME) if the_one is None: - assert fileobj is not None + if fileobj is None: + debug_file_name = os.environ.get("COVERAGE_DEBUG_FILE") + if debug_file_name: + fileobj = open(debug_file_name, "a") + else: + fileobj = sys.stderr sys.modules[cls.SYS_MOD_NAME] = the_one = cls(fileobj, show_process, filters) return the_one @@ -250,46 +282,72 @@ class DebugOutputFile(object): # pragma: debugging def log(msg, stack=False): # pragma: debugging """Write a log message as forcefully as possible.""" - out = DebugOutputFile.the_one() + out = DebugOutputFile.get_one() out.write(msg+"\n") if stack: dump_stack_frames(out=out, skip=1) -def filter_aspectlib_frames(text): # pragma: debugging - """Aspectlib prints stack traces, but includes its own frames. Scrub those out.""" - # <<< aspectlib/__init__.py:257:function_wrapper < igor.py:143:run_tests < ... - text = re.sub(r"(?<= )aspectlib/[^.]+\.py:\d+:\w+ < ", "", text) - return text - - -def enable_aspectlib_maybe(): # pragma: debugging - """For debugging, we can use aspectlib to trace execution. - - Define COVERAGE_ASPECTLIB to enable and configure aspectlib to trace - execution:: - - $ export COVERAGE_LOG=covaspect.txt - $ export COVERAGE_ASPECTLIB=coverage.Coverage:coverage.data.CoverageData - $ coverage run blah.py ... - - This will trace all the public methods on Coverage and CoverageData, - writing the information to covaspect.txt. - - """ - aspects = os.environ.get("COVERAGE_ASPECTLIB", "") - if not aspects: - return - - import aspectlib # pylint: disable=import-error - import aspectlib.debug # pylint: disable=import-error - - filename = os.environ.get("COVERAGE_LOG", "/tmp/covlog.txt") - filters = [add_pid_and_tid, filter_aspectlib_frames] - aspects_file = DebugOutputFile.the_one(open(filename, "a"), show_process=True, filters=filters) - aspect_log = aspectlib.debug.log( - print_to=aspects_file, attributes=['id'], stacktrace=30, use_logging=False - ) - public_methods = re.compile(r'^(__init__|[a-zA-Z].*)$') - for aspect in aspects.split(':'): - aspectlib.weave(aspect, aspect_log, methods=public_methods) +def decorate_methods(decorator, butnot=()): # pragma: debugging + """A class decorator to apply a decorator to public methods.""" + def _decorator(cls): + for name, meth in inspect.getmembers(cls, inspect.isroutine): + public = name == '__init__' or not name.startswith("_") + decorate_it = public and name not in butnot + if decorate_it: + setattr(cls, name, decorator(meth)) + return cls + return _decorator + + +def break_in_pudb(func): # pragma: debugging + """A function decorator to stop in the debugger for each call.""" + @functools.wraps(func) + def _wrapper(*args, **kwargs): + import pudb # pylint: disable=import-error + sys.stdout = sys.__stdout__ + pudb.set_trace() + return func(*args, **kwargs) + return _wrapper + + +OBJ_IDS = itertools.count() +CALLS = itertools.count() +OBJ_ID_ATTR = "$coverage.object_id" + +def show_calls(show_args=True, show_stack=False): # pragma: debugging + """A method decorator to debug-log each call to the function.""" + def _decorator(func): + @functools.wraps(func) + def _wrapper(self, *args, **kwargs): + oid = getattr(self, OBJ_ID_ATTR, None) + if oid is None: + oid = "{:08d} {:04d}".format(os.getpid(), next(OBJ_IDS)) + setattr(self, OBJ_ID_ATTR, oid) + extra = "" + if show_args: + eargs = ", ".join(map(repr, args)) + ekwargs = ", ".join("{}={!r}".format(*item) for item in kwargs.items()) + extra += "(" + extra += eargs + if eargs and ekwargs: + extra += ", " + extra += ekwargs + extra += ")" + if show_stack: + extra += " @ " + extra += "; ".join(_clean_stack_line(l) for l in short_stack().splitlines()) + msg = "{} {:04d} {}{}\n".format(oid, next(CALLS), func.__name__, extra) + DebugOutputFile.get_one().write(msg) + return func(self, *args, **kwargs) + return _wrapper + return _decorator + + +def _clean_stack_line(s): # pragma: debugging + """Simplify some paths in a stack trace, for compactness.""" + s = s.strip() + s = s.replace(os.path.dirname(__file__) + '/', '') + s = s.replace(os.path.dirname(os.__file__) + '/', '') + s = s.replace(sys.prefix + '/', '') + return s |