summaryrefslogtreecommitdiff
path: root/tests/test_coroutine.py
diff options
context:
space:
mode:
authorDanny Allen <me@dannya.com>2014-09-22 12:05:55 +0100
committerDanny Allen <me@dannya.com>2014-09-22 12:05:55 +0100
commit1b6d0d06624170fb7a17738387387b1f21357e94 (patch)
tree335402fdef527656f37d3024345c9b532592bce7 /tests/test_coroutine.py
parentc4935999f882e7317121e884629d07080f1bc776 (diff)
parentd68b95f7a0a201b2e8e830b6d4769005ef0223fa (diff)
downloadpython-coveragepy-git-1b6d0d06624170fb7a17738387387b1f21357e94.tar.gz
Merged ned/coveragepy into default
Diffstat (limited to 'tests/test_coroutine.py')
-rw-r--r--tests/test_coroutine.py121
1 files changed, 93 insertions, 28 deletions
diff --git a/tests/test_coroutine.py b/tests/test_coroutine.py
index fe6c8326..4abdd6f6 100644
--- a/tests/test_coroutine.py
+++ b/tests/test_coroutine.py
@@ -1,8 +1,7 @@
"""Tests for coroutining."""
-import os.path, sys
+import os, os.path, sys, threading
-from nose.plugins.skip import SkipTest
import coverage
from tests.coveragetest import CoverageTest
@@ -20,6 +19,14 @@ try:
except ImportError:
gevent = None
+try:
+ import greenlet # pylint: disable=import-error
+except ImportError:
+ greenlet = None
+
+# Are we running with the C tracer or not?
+C_TRACER = os.getenv('COVERAGE_TEST_TRACER', 'c') == 'c'
+
def line_count(s):
"""How many non-blank non-comment lines are in `s`?"""
@@ -96,48 +103,106 @@ class CoroutineTest(CoverageTest):
import gevent.queue as queue
""" + COMMON
- def try_some_code(self, code, args):
- """Run some coroutine testing code and see that it was all covered."""
+ # Uncomplicated code that doesn't use any of the coroutining stuff, to test
+ # the simple case under each of the regimes.
+ SIMPLE = """\
+ total = 0
+ for i in range({LIMIT}):
+ total += i
+ print(total)
+ """.format(LIMIT=LIMIT)
- self.make_file("try_it.py", code)
+ def try_some_code(self, code, coroutine, the_module, expected_out=None):
+ """Run some coroutine testing code and see that it was all covered.
- out = self.run_command("coverage run --timid %s try_it.py" % args)
- expected_out = "%d\n" % (sum(range(self.LIMIT)))
- self.assertEqual(out, expected_out)
+ `code` is the Python code to execute. `coroutine` is the name of the
+ coroutine regime to test it under. `the_module` is the imported module
+ that must be available for this to work at all. `expected_out` is the
+ text we expect the code to produce.
- # Read the coverage file and see that try_it.py has all its lines
- # executed.
- data = coverage.CoverageData()
- data.read_file(".coverage")
+ """
- # If the test fails, it's helpful to see this info:
- fname = os.path.abspath("try_it.py")
- linenos = data.executed_lines(fname).keys()
- print("{0}: {1}".format(len(linenos), linenos))
- print_simple_annotation(code, linenos)
+ self.make_file("try_it.py", code)
- lines = line_count(code)
- self.assertEqual(data.summary()['try_it.py'], lines)
+ cmd = "coverage run --coroutine=%s try_it.py" % coroutine
+ out = self.run_command(cmd)
+
+ if not the_module:
+ # We don't even have the underlying module installed, we expect
+ # coverage to alert us to this fact.
+ expected_out = (
+ "Couldn't trace with coroutine=%s, "
+ "the module isn't installed.\n" % coroutine
+ )
+ self.assertEqual(out, expected_out)
+ elif C_TRACER or coroutine == "thread":
+ # We can fully measure the code if we are using the C tracer, which
+ # can support all the coroutining, or if we are using threads.
+ if expected_out is None:
+ expected_out = "%d\n" % (sum(range(self.LIMIT)))
+ self.assertEqual(out, expected_out)
+
+ # Read the coverage file and see that try_it.py has all its lines
+ # executed.
+ data = coverage.CoverageData()
+ data.read_file(".coverage")
+
+ # If the test fails, it's helpful to see this info:
+ fname = os.path.abspath("try_it.py")
+ linenos = data.executed_lines(fname).keys()
+ print("{0}: {1}".format(len(linenos), linenos))
+ print_simple_annotation(code, linenos)
+
+ lines = line_count(code)
+ self.assertEqual(data.summary()['try_it.py'], lines)
+ else:
+ expected_out = (
+ "Can't support coroutine=%s with PyTracer, "
+ "only threads are supported\n" % coroutine
+ )
+ self.assertEqual(out, expected_out)
def test_threads(self):
- self.try_some_code(self.THREAD, "")
+ self.try_some_code(self.THREAD, "thread", threading)
+
+ def test_threads_simple_code(self):
+ self.try_some_code(self.SIMPLE, "thread", threading)
def test_eventlet(self):
- if eventlet is None:
- raise SkipTest("No eventlet available")
+ self.try_some_code(self.EVENTLET, "eventlet", eventlet)
- self.try_some_code(self.EVENTLET, "--coroutine=eventlet")
+ def test_eventlet_simple_code(self):
+ self.try_some_code(self.SIMPLE, "eventlet", eventlet)
def test_gevent(self):
- raise SkipTest("Still not sure why gevent isn't working...")
+ self.try_some_code(self.GEVENT, "gevent", gevent)
+
+ def test_gevent_simple_code(self):
+ self.try_some_code(self.SIMPLE, "gevent", gevent)
+
+ def test_greenlet(self):
+ GREENLET = """\
+ from greenlet import greenlet
+
+ def test1(x, y):
+ z = gr2.switch(x+y)
+ print(z)
+
+ def test2(u):
+ print(u)
+ gr1.switch(42)
- if gevent is None:
- raise SkipTest("No gevent available")
+ gr1 = greenlet(test1)
+ gr2 = greenlet(test2)
+ gr1.switch("hello", " world")
+ """
+ self.try_some_code(GREENLET, "greenlet", greenlet, "hello world\n42\n")
- self.try_some_code(self.GEVENT, "--coroutine=gevent")
+ def test_greenlet_simple_code(self):
+ self.try_some_code(self.SIMPLE, "greenlet", greenlet)
def print_simple_annotation(code, linenos):
"""Print the lines in `code` with X for each line number in `linenos`."""
for lineno, line in enumerate(code.splitlines(), start=1):
- print(" {0:s} {1}".format("X" if lineno in linenos else " ", line))
+ print(" {0} {1}".format("X" if lineno in linenos else " ", line))