summaryrefslogtreecommitdiff
path: root/coverage/debug.py
diff options
context:
space:
mode:
Diffstat (limited to 'coverage/debug.py')
-rw-r--r--coverage/debug.py164
1 files changed, 111 insertions, 53 deletions
diff --git a/coverage/debug.py b/coverage/debug.py
index e68736f6..9077a3af 100644
--- a/coverage/debug.py
+++ b/coverage/debug.py
@@ -1,12 +1,13 @@
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
-# For details: https://bitbucket.org/ned/coveragepy/src/default/NOTICE.txt
+# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
"""Control of and utilities for debugging."""
import contextlib
+import functools
import inspect
+import itertools
import os
-import re
import sys
try:
import _thread
@@ -24,27 +25,26 @@ os = isolate_module(os)
# This is a list of forced debugging options.
FORCED_DEBUG = []
-# A hack for debugging testing in sub-processes.
-_TEST_NAME_FILE = "" # "/tmp/covtest.txt"
-
class DebugControl(object):
"""Control and output for debugging."""
+ show_repr_attr = False # For SimpleReprMixin
+
def __init__(self, options, output):
"""Configure the options and output file for debugging."""
self.options = list(options) + FORCED_DEBUG
- self.raw_output = output
self.suppress_callers = False
filters = []
if self.should('pid'):
filters.append(add_pid_and_tid)
- self.output = DebugOutputFile(
- self.raw_output,
+ self.output = DebugOutputFile.get_one(
+ output,
show_process=self.should('process'),
filters=filters,
)
+ self.raw_output = self.output.outfile
def __repr__(self):
return "<DebugControl options=%r raw_output=%r>" % (self.options, self.raw_output)
@@ -71,6 +71,10 @@ class DebugControl(object):
`msg` is the line to write. A newline will be appended.
"""
+ if self.should('self'):
+ caller_self = inspect.stack()[1][0].f_locals.get('self')
+ if caller_self is not None:
+ msg = "[self: {!r}] {}".format(caller_self, msg)
self.output.write(msg+"\n")
if self.should('callers'):
dump_stack_frames(out=self.output, skip=1)
@@ -167,6 +171,20 @@ def add_pid_and_tid(text):
return text
+class SimpleReprMixin(object):
+ """A mixin implementing a simple __repr__."""
+ def __repr__(self):
+ show_attrs = (
+ (k, v) for k, v in self.__dict__.items()
+ if getattr(v, "show_repr_attr", True)
+ )
+ return "<{klass} @0x{id:x} {attrs}>".format(
+ klass=self.__class__.__name__,
+ id=id(self),
+ attrs=" ".join("{}={!r}".format(k, v) for k, v in show_attrs),
+ )
+
+
def filter_text(text, filters):
"""Run `text` through a series of filters.
@@ -215,26 +233,40 @@ class DebugOutputFile(object): # pragma: debugging
self.write("New process: executable: %s\n" % (sys.executable,))
self.write("New process: cmd: %s\n" % (cmd,))
if hasattr(os, 'getppid'):
- self.write("New process: parent pid: %s\n" % (os.getppid(),))
+ self.write("New process: pid: %s, parent pid: %s\n" % (os.getpid(), os.getppid()))
SYS_MOD_NAME = '$coverage.debug.DebugOutputFile.the_one'
@classmethod
- def the_one(cls, fileobj=None, show_process=True, filters=()):
- """Get the process-wide singleton DebugOutputFile.
+ def get_one(cls, fileobj=None, show_process=True, filters=()):
+ """Get a DebugOutputFile.
+
+ If `fileobj` is provided, then a new DebugOutputFile is made with it.
+
+ If `fileobj` isn't provided, then a file is chosen
+ (COVERAGE_DEBUG_FILE, or stderr), and a process-wide singleton
+ DebugOutputFile is made.
- If it doesn't exist yet, then create it as a wrapper around the file
- object `fileobj`. `show_process` controls whether the debug file adds
- process-level information.
+ `show_process` controls whether the debug file adds process-level
+ information, and filters is a list of other message filters to apply.
"""
+ if fileobj is not None:
+ # Make DebugOutputFile around the fileobj passed.
+ return cls(fileobj, show_process, filters)
+
# Because of the way igor.py deletes and re-imports modules,
# this class can be defined more than once. But we really want
# a process-wide singleton. So stash it in sys.modules instead of
# on a class attribute. Yes, this is aggressively gross.
the_one = sys.modules.get(cls.SYS_MOD_NAME)
if the_one is None:
- assert fileobj is not None
+ if fileobj is None:
+ debug_file_name = os.environ.get("COVERAGE_DEBUG_FILE")
+ if debug_file_name:
+ fileobj = open(debug_file_name, "a")
+ else:
+ fileobj = sys.stderr
sys.modules[cls.SYS_MOD_NAME] = the_one = cls(fileobj, show_process, filters)
return the_one
@@ -250,46 +282,72 @@ class DebugOutputFile(object): # pragma: debugging
def log(msg, stack=False): # pragma: debugging
"""Write a log message as forcefully as possible."""
- out = DebugOutputFile.the_one()
+ out = DebugOutputFile.get_one()
out.write(msg+"\n")
if stack:
dump_stack_frames(out=out, skip=1)
-def filter_aspectlib_frames(text): # pragma: debugging
- """Aspectlib prints stack traces, but includes its own frames. Scrub those out."""
- # <<< aspectlib/__init__.py:257:function_wrapper < igor.py:143:run_tests < ...
- text = re.sub(r"(?<= )aspectlib/[^.]+\.py:\d+:\w+ < ", "", text)
- return text
-
-
-def enable_aspectlib_maybe(): # pragma: debugging
- """For debugging, we can use aspectlib to trace execution.
-
- Define COVERAGE_ASPECTLIB to enable and configure aspectlib to trace
- execution::
-
- $ export COVERAGE_LOG=covaspect.txt
- $ export COVERAGE_ASPECTLIB=coverage.Coverage:coverage.data.CoverageData
- $ coverage run blah.py ...
-
- This will trace all the public methods on Coverage and CoverageData,
- writing the information to covaspect.txt.
-
- """
- aspects = os.environ.get("COVERAGE_ASPECTLIB", "")
- if not aspects:
- return
-
- import aspectlib # pylint: disable=import-error
- import aspectlib.debug # pylint: disable=import-error
-
- filename = os.environ.get("COVERAGE_LOG", "/tmp/covlog.txt")
- filters = [add_pid_and_tid, filter_aspectlib_frames]
- aspects_file = DebugOutputFile.the_one(open(filename, "a"), show_process=True, filters=filters)
- aspect_log = aspectlib.debug.log(
- print_to=aspects_file, attributes=['id'], stacktrace=30, use_logging=False
- )
- public_methods = re.compile(r'^(__init__|[a-zA-Z].*)$')
- for aspect in aspects.split(':'):
- aspectlib.weave(aspect, aspect_log, methods=public_methods)
+def decorate_methods(decorator, butnot=()): # pragma: debugging
+ """A class decorator to apply a decorator to public methods."""
+ def _decorator(cls):
+ for name, meth in inspect.getmembers(cls, inspect.isroutine):
+ public = name == '__init__' or not name.startswith("_")
+ decorate_it = public and name not in butnot
+ if decorate_it:
+ setattr(cls, name, decorator(meth))
+ return cls
+ return _decorator
+
+
+def break_in_pudb(func): # pragma: debugging
+ """A function decorator to stop in the debugger for each call."""
+ @functools.wraps(func)
+ def _wrapper(*args, **kwargs):
+ import pudb # pylint: disable=import-error
+ sys.stdout = sys.__stdout__
+ pudb.set_trace()
+ return func(*args, **kwargs)
+ return _wrapper
+
+
+OBJ_IDS = itertools.count()
+CALLS = itertools.count()
+OBJ_ID_ATTR = "$coverage.object_id"
+
+def show_calls(show_args=True, show_stack=False): # pragma: debugging
+ """A method decorator to debug-log each call to the function."""
+ def _decorator(func):
+ @functools.wraps(func)
+ def _wrapper(self, *args, **kwargs):
+ oid = getattr(self, OBJ_ID_ATTR, None)
+ if oid is None:
+ oid = "{:08d} {:04d}".format(os.getpid(), next(OBJ_IDS))
+ setattr(self, OBJ_ID_ATTR, oid)
+ extra = ""
+ if show_args:
+ eargs = ", ".join(map(repr, args))
+ ekwargs = ", ".join("{}={!r}".format(*item) for item in kwargs.items())
+ extra += "("
+ extra += eargs
+ if eargs and ekwargs:
+ extra += ", "
+ extra += ekwargs
+ extra += ")"
+ if show_stack:
+ extra += " @ "
+ extra += "; ".join(_clean_stack_line(l) for l in short_stack().splitlines())
+ msg = "{} {:04d} {}{}\n".format(oid, next(CALLS), func.__name__, extra)
+ DebugOutputFile.get_one().write(msg)
+ return func(self, *args, **kwargs)
+ return _wrapper
+ return _decorator
+
+
+def _clean_stack_line(s): # pragma: debugging
+ """Simplify some paths in a stack trace, for compactness."""
+ s = s.strip()
+ s = s.replace(os.path.dirname(__file__) + '/', '')
+ s = s.replace(os.path.dirname(os.__file__) + '/', '')
+ s = s.replace(sys.prefix + '/', '')
+ return s