summaryrefslogtreecommitdiff
path: root/tests/test_coroutine.py
diff options
context:
space:
mode:
authorNed Batchelder <ned@nedbatchelder.com>2014-07-06 14:17:19 -0400
committerNed Batchelder <ned@nedbatchelder.com>2014-07-06 14:17:19 -0400
commita63bf56c15835c763f60ff0ba3f782c8fb86363c (patch)
tree70a6d5ee4a5815e41128d6b6bc89f6dcaa740a43 /tests/test_coroutine.py
parent29d841f46a3c3ef4642e6b4aff9f98bc99fac1ee (diff)
downloadpython-coveragepy-git-a63bf56c15835c763f60ff0ba3f782c8fb86363c.tar.gz
Fix up the coroutine tests, now eventlet works? But gevent still doesn't
Diffstat (limited to 'tests/test_coroutine.py')
-rw-r--r--tests/test_coroutine.py61
1 files changed, 42 insertions, 19 deletions
diff --git a/tests/test_coroutine.py b/tests/test_coroutine.py
index 28539801..fe6c8326 100644
--- a/tests/test_coroutine.py
+++ b/tests/test_coroutine.py
@@ -1,5 +1,7 @@
"""Tests for coroutining."""
+import os.path, sys
+
from nose.plugins.skip import SkipTest
import coverage
@@ -20,15 +22,19 @@ except ImportError:
def line_count(s):
- """How many non-blank lines are in `s`?"""
- return sum(1 for l in s.splitlines() if l.strip())
+ """How many non-blank non-comment lines are in `s`?"""
+ def code_line(l):
+ """Is this a code line? Not blank, and not a full-line comment."""
+ return l.strip() and not l.strip().startswith('#')
+ return sum(1 for l in s.splitlines() if code_line(l))
class CoroutineTest(CoverageTest):
"""Tests of the coroutine support in coverage.py."""
- # The code common to all the concurrency models. Don't use any comments,
- # we're counting non-blank lines to see they are all covered.
+ LIMIT = 1000
+
+ # The code common to all the concurrency models.
COMMON = """
class Producer(threading.Thread):
def __init__(self, q):
@@ -36,7 +42,7 @@ class CoroutineTest(CoverageTest):
self.q = q
def run(self):
- for i in range(1000):
+ for i in range({LIMIT}):
self.q.put(i)
self.q.put(None)
@@ -54,28 +60,32 @@ class CoroutineTest(CoverageTest):
break
sum += i
- q = Queue.Queue()
+ q = queue.Queue()
c = Consumer(q)
- c.start()
p = Producer(q)
+ c.start()
p.start()
+
p.join()
c.join()
- """
+ """.format(LIMIT=LIMIT)
# Import the things to use threads.
- THREAD = """\
+ if sys.version_info < (3, 0):
+ THREAD = """\
+ import threading
+ import Queue as queue
+ """ + COMMON
+ else:
+ THREAD = """\
import threading
- try:
- import Queue
- except ImportError: # Python 3 :)
- import queue as Queue
+ import queue
""" + COMMON
# Import the things to use eventlet.
EVENTLET = """\
import eventlet.green.threading as threading
- import eventlet.queue as Queue
+ import eventlet.queue as queue
""" + COMMON
# Import the things to use gevent.
@@ -83,7 +93,7 @@ class CoroutineTest(CoverageTest):
from gevent import monkey
monkey.patch_thread()
import threading
- import gevent.queue as Queue
+ import gevent.queue as queue
""" + COMMON
def try_some_code(self, code, args):
@@ -91,16 +101,21 @@ class CoroutineTest(CoverageTest):
self.make_file("try_it.py", code)
- raise SkipTest("Need to put this on a back burner for a while...")
-
- out = self.run_command("coverage run %s try_it.py" % args)
- expected_out = "%d\n" % (sum(range(1000)))
+ out = self.run_command("coverage run --timid %s try_it.py" % args)
+ expected_out = "%d\n" % (sum(range(self.LIMIT)))
self.assertEqual(out, expected_out)
# Read the coverage file and see that try_it.py has all its lines
# executed.
data = coverage.CoverageData()
data.read_file(".coverage")
+
+ # If the test fails, it's helpful to see this info:
+ fname = os.path.abspath("try_it.py")
+ linenos = data.executed_lines(fname).keys()
+ print("{0}: {1}".format(len(linenos), linenos))
+ print_simple_annotation(code, linenos)
+
lines = line_count(code)
self.assertEqual(data.summary()['try_it.py'], lines)
@@ -114,7 +129,15 @@ class CoroutineTest(CoverageTest):
self.try_some_code(self.EVENTLET, "--coroutine=eventlet")
def test_gevent(self):
+ raise SkipTest("Still not sure why gevent isn't working...")
+
if gevent is None:
raise SkipTest("No gevent available")
self.try_some_code(self.GEVENT, "--coroutine=gevent")
+
+
+def print_simple_annotation(code, linenos):
+ """Print the lines in `code` with X for each line number in `linenos`."""
+ for lineno, line in enumerate(code.splitlines(), start=1):
+ print(" {0:s} {1}".format("X" if lineno in linenos else " ", line))