summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--coverage/collector.py63
-rw-r--r--coverage/pytracer.py17
-rw-r--r--tests/test_coroutine.py88
3 files changed, 106 insertions, 62 deletions
diff --git a/coverage/collector.py b/coverage/collector.py
index 9505c6c9..dccd797e 100644
--- a/coverage/collector.py
+++ b/coverage/collector.py
@@ -2,6 +2,7 @@
import os, sys
+from coverage.misc import CoverageException
from coverage.pytracer import PyTracer
try:
@@ -44,7 +45,9 @@ class Collector(object):
# the top, and resumed when they become the top again.
_collectors = []
- def __init__(self, should_trace, check_include, timid, branch, warn, coroutine):
+ def __init__(self,
+ should_trace, check_include, timid, branch, warn, coroutine,
+ ):
"""Create a collector.
`should_trace` is a function, taking a filename, and returning a
@@ -73,23 +76,35 @@ class Collector(object):
self.warn = warn
self.branch = branch
self.threading = None
-
- if coroutine == "greenlet":
- import greenlet
- self.coroutine_id_func = greenlet.getcurrent
- elif coroutine == "eventlet":
- import eventlet.greenthread
- self.coroutine_id_func = eventlet.greenthread.getcurrent
- elif coroutine == "gevent":
- import gevent
- self.coroutine_id_func = gevent.getcurrent
- else:
- # It's important to import threading only if we need it. If it's
- # imported early, and the program being measured uses gevent, then
- # gevent's monkey-patching won't work properly.
- import threading
- self.coroutine_id_func = None
- self.threading = threading
+ self.coroutine = coroutine
+
+ self.coroutine_id_func = None
+
+ try:
+ if coroutine == "greenlet":
+ import greenlet
+ self.coroutine_id_func = greenlet.getcurrent
+ elif coroutine == "eventlet":
+ import eventlet.greenthread
+ self.coroutine_id_func = eventlet.greenthread.getcurrent
+ elif coroutine == "gevent":
+ import gevent
+ self.coroutine_id_func = gevent.getcurrent
+ elif coroutine == "thread" or not coroutine:
+ # It's important to import threading only if we need it. If
+ # it's imported early, and the program being measured uses
+ # gevent, then gevent's monkey-patching won't work properly.
+ import threading
+ self.threading = threading
+ else:
+ raise CoverageException(
+ "Don't understand coroutine=%s" % coroutine
+ )
+ except ImportError:
+ raise CoverageException(
+ "Couldn't trace with coroutine=%s, "
+ "the module isn't installed." % coroutine
+ )
self.reset()
@@ -101,6 +116,7 @@ class Collector(object):
# trace function.
self._trace_class = CTracer or PyTracer
+
def __repr__(self):
return "<Collector at 0x%x>" % id(self)
@@ -132,14 +148,25 @@ class Collector(object):
tracer.should_trace = self.should_trace
tracer.should_trace_cache = self.should_trace_cache
tracer.warn = self.warn
+
if hasattr(tracer, 'coroutine_id_func'):
tracer.coroutine_id_func = self.coroutine_id_func
+ elif self.coroutine_id_func:
+ raise CoverageException(
+ "Can't support coroutine=%s with %s, "
+ "only threads are supported" % (
+ self.coroutine, self.tracer_name(),
+ )
+ )
+
if hasattr(tracer, 'plugin_data'):
tracer.plugin_data = self.plugin_data
if hasattr(tracer, 'threading'):
tracer.threading = self.threading
+
fn = tracer.start()
self.tracers.append(tracer)
+
return fn
# The trace function has to be set individually on each thread before
diff --git a/coverage/pytracer.py b/coverage/pytracer.py
index 733abb1c..a82b32c9 100644
--- a/coverage/pytracer.py
+++ b/coverage/pytracer.py
@@ -1,6 +1,6 @@
"""Raw data collector for Coverage."""
-import collections, sys
+import sys
class PyTracer(object):
@@ -38,13 +38,10 @@ class PyTracer(object):
self.last_line = [0]
self.data_stack = []
- self.data_stacks = collections.defaultdict(list)
self.last_exc_back = None
self.last_exc_firstlineno = 0
self.thread = None
self.stopped = False
- self.coroutine_id_func = None
- self.last_coroutine = None
def __repr__(self):
return "<PyTracer at 0x{0:0x}: {1} lines in {2} files>".format(
@@ -65,8 +62,6 @@ class PyTracer(object):
if self.arcs and self.cur_file_dict:
pair = (self.last_line, -self.last_exc_firstlineno)
self.cur_file_dict[pair] = None
- if self.coroutine_id_func:
- self.data_stack = self.data_stacks[self.coroutine_id_func()]
self.plugin, self.cur_file_dict, self.last_line = (
self.data_stack.pop()
)
@@ -75,9 +70,6 @@ class PyTracer(object):
if event == 'call':
# Entering a new function context. Decide if we should trace
# in this file.
- if self.coroutine_id_func:
- self.data_stack = self.data_stacks[self.coroutine_id_func()]
- self.last_coroutine = self.coroutine_id_func()
self.data_stack.append(
(self.plugin, self.cur_file_dict, self.last_line)
)
@@ -119,7 +111,9 @@ class PyTracer(object):
if lineno_from != -1:
if self.cur_file_dict is not None:
if self.arcs:
- self.cur_file_dict[(self.last_line, lineno_from)] = None
+ self.cur_file_dict[
+ (self.last_line, lineno_from)
+ ] = None
else:
for lineno in range(lineno_from, lineno_to+1):
self.cur_file_dict[lineno] = None
@@ -129,9 +123,6 @@ class PyTracer(object):
first = frame.f_code.co_firstlineno
self.cur_file_dict[(self.last_line, -first)] = None
# Leaving this function, pop the filename stack.
- if self.coroutine_id_func:
- self.data_stack = self.data_stacks[self.coroutine_id_func()]
- self.last_coroutine = self.coroutine_id_func()
self.plugin, self.cur_file_dict, self.last_line = (
self.data_stack.pop()
)
diff --git a/tests/test_coroutine.py b/tests/test_coroutine.py
index 9ad1f31f..2b4d298f 100644
--- a/tests/test_coroutine.py
+++ b/tests/test_coroutine.py
@@ -1,6 +1,6 @@
"""Tests for coroutining."""
-import os.path, sys
+import os, os.path, sys, threading
import coverage
@@ -19,6 +19,9 @@ try:
except ImportError:
gevent = None
+# Are we running with the C tracer or not?
+C_TRACER = os.getenv('COVERAGE_TEST_TRACER', 'c') == 'c'
+
def line_count(s):
"""How many non-blank non-comment lines are in `s`?"""
@@ -104,48 +107,71 @@ class CoroutineTest(CoverageTest):
print(total)
""".format(LIMIT=LIMIT)
- def try_some_code(self, code, args):
- """Run some coroutine testing code and see that it was all covered."""
-
- self.make_file("try_it.py", code)
+ def try_some_code(self, code, coroutine, the_module):
+ """Run some coroutine testing code and see that it was all covered.
- out = self.run_command("coverage run %s try_it.py" % args)
- expected_out = "%d\n" % (sum(range(self.LIMIT)))
- self.assertEqual(out, expected_out)
+ `code` is the Python code to execute. `coroutine` is the name of the
+ coroutine regime to test it under. `the_module` is the imported module
+ that must be available for this to work at all.
- # Read the coverage file and see that try_it.py has all its lines
- # executed.
- data = coverage.CoverageData()
- data.read_file(".coverage")
+ """
- # If the test fails, it's helpful to see this info:
- fname = os.path.abspath("try_it.py")
- linenos = data.executed_lines(fname).keys()
- print("{0}: {1}".format(len(linenos), linenos))
- print_simple_annotation(code, linenos)
+ self.make_file("try_it.py", code)
- lines = line_count(code)
- self.assertEqual(data.summary()['try_it.py'], lines)
+ cmd = "coverage run --coroutine=%s try_it.py" % coroutine
+ out = self.run_command(cmd)
+
+ if not the_module:
+ # We don't even have the underlying module installed, we expect
+ # coverage to alert us to this fact.
+ expected_out = (
+ "Couldn't trace with coroutine=%s, "
+ "the module isn't installed.\n" % coroutine
+ )
+ self.assertEqual(out, expected_out)
+ elif C_TRACER or coroutine == "thread":
+ # We can fully measure the code if we are using the C tracer, which
+ # can support all the coroutining, or if we are using threads.
+ expected_out = "%d\n" % (sum(range(self.LIMIT)))
+ self.assertEqual(out, expected_out)
+
+ # Read the coverage file and see that try_it.py has all its lines
+ # executed.
+ data = coverage.CoverageData()
+ data.read_file(".coverage")
+
+ # If the test fails, it's helpful to see this info:
+ fname = os.path.abspath("try_it.py")
+ linenos = data.executed_lines(fname).keys()
+ print("{0}: {1}".format(len(linenos), linenos))
+ print_simple_annotation(code, linenos)
+
+ lines = line_count(code)
+ self.assertEqual(data.summary()['try_it.py'], lines)
+ else:
+ expected_out = (
+ "Can't support coroutine=%s with PyTracer, "
+ "only threads are supported\n" % coroutine
+ )
+ self.assertEqual(out, expected_out)
def test_threads(self):
- self.try_some_code(self.THREAD, "")
+ self.try_some_code(self.THREAD, "thread", threading)
def test_threads_simple_code(self):
- self.try_some_code(self.SIMPLE, "")
+ self.try_some_code(self.SIMPLE, "thread", threading)
- if eventlet is not None:
- def test_eventlet(self):
- self.try_some_code(self.EVENTLET, "--coroutine=eventlet")
+ def test_eventlet(self):
+ self.try_some_code(self.EVENTLET, "eventlet", eventlet)
- def test_eventlet_simple_code(self):
- self.try_some_code(self.SIMPLE, "--coroutine=eventlet")
+ def test_eventlet_simple_code(self):
+ self.try_some_code(self.SIMPLE, "eventlet", eventlet)
- if gevent is not None:
- def test_gevent(self):
- self.try_some_code(self.GEVENT, "--coroutine=gevent")
+ def test_gevent(self):
+ self.try_some_code(self.GEVENT, "gevent", gevent)
- def test_gevent_simple_code(self):
- self.try_some_code(self.SIMPLE, "--coroutine=gevent")
+ def test_gevent_simple_code(self):
+ self.try_some_code(self.SIMPLE, "gevent", gevent)
def print_simple_annotation(code, linenos):