diff options
author | Ned Batchelder <ned@nedbatchelder.com> | 2014-07-06 14:17:19 -0400 |
---|---|---|
committer | Ned Batchelder <ned@nedbatchelder.com> | 2014-07-06 14:17:19 -0400 |
commit | a63bf56c15835c763f60ff0ba3f782c8fb86363c (patch) | |
tree | 70a6d5ee4a5815e41128d6b6bc89f6dcaa740a43 | |
parent | 29d841f46a3c3ef4642e6b4aff9f98bc99fac1ee (diff) | |
download | python-coveragepy-git-a63bf56c15835c763f60ff0ba3f782c8fb86363c.tar.gz |
Fix up the coroutine tests, now eventlet works? But gevent still doesn't
-rw-r--r-- | tests/test_coroutine.py | 61 |
1 files changed, 42 insertions, 19 deletions
diff --git a/tests/test_coroutine.py b/tests/test_coroutine.py index 28539801..fe6c8326 100644 --- a/tests/test_coroutine.py +++ b/tests/test_coroutine.py @@ -1,5 +1,7 @@ """Tests for coroutining.""" +import os.path, sys + from nose.plugins.skip import SkipTest import coverage @@ -20,15 +22,19 @@ except ImportError: def line_count(s): - """How many non-blank lines are in `s`?""" - return sum(1 for l in s.splitlines() if l.strip()) + """How many non-blank non-comment lines are in `s`?""" + def code_line(l): + """Is this a code line? Not blank, and not a full-line comment.""" + return l.strip() and not l.strip().startswith('#') + return sum(1 for l in s.splitlines() if code_line(l)) class CoroutineTest(CoverageTest): """Tests of the coroutine support in coverage.py.""" - # The code common to all the concurrency models. Don't use any comments, - # we're counting non-blank lines to see they are all covered. + LIMIT = 1000 + + # The code common to all the concurrency models. COMMON = """ class Producer(threading.Thread): def __init__(self, q): @@ -36,7 +42,7 @@ class CoroutineTest(CoverageTest): self.q = q def run(self): - for i in range(1000): + for i in range({LIMIT}): self.q.put(i) self.q.put(None) @@ -54,28 +60,32 @@ class CoroutineTest(CoverageTest): break sum += i - q = Queue.Queue() + q = queue.Queue() c = Consumer(q) - c.start() p = Producer(q) + c.start() p.start() + p.join() c.join() - """ + """.format(LIMIT=LIMIT) # Import the things to use threads. - THREAD = """\ + if sys.version_info < (3, 0): + THREAD = """\ + import threading + import Queue as queue + """ + COMMON + else: + THREAD = """\ import threading - try: - import Queue - except ImportError: # Python 3 :) - import queue as Queue + import queue """ + COMMON # Import the things to use eventlet. EVENTLET = """\ import eventlet.green.threading as threading - import eventlet.queue as Queue + import eventlet.queue as queue """ + COMMON # Import the things to use gevent. @@ -83,7 +93,7 @@ class CoroutineTest(CoverageTest): from gevent import monkey monkey.patch_thread() import threading - import gevent.queue as Queue + import gevent.queue as queue """ + COMMON def try_some_code(self, code, args): @@ -91,16 +101,21 @@ class CoroutineTest(CoverageTest): self.make_file("try_it.py", code) - raise SkipTest("Need to put this on a back burner for a while...") - - out = self.run_command("coverage run %s try_it.py" % args) - expected_out = "%d\n" % (sum(range(1000))) + out = self.run_command("coverage run --timid %s try_it.py" % args) + expected_out = "%d\n" % (sum(range(self.LIMIT))) self.assertEqual(out, expected_out) # Read the coverage file and see that try_it.py has all its lines # executed. data = coverage.CoverageData() data.read_file(".coverage") + + # If the test fails, it's helpful to see this info: + fname = os.path.abspath("try_it.py") + linenos = data.executed_lines(fname).keys() + print("{0}: {1}".format(len(linenos), linenos)) + print_simple_annotation(code, linenos) + lines = line_count(code) self.assertEqual(data.summary()['try_it.py'], lines) @@ -114,7 +129,15 @@ class CoroutineTest(CoverageTest): self.try_some_code(self.EVENTLET, "--coroutine=eventlet") def test_gevent(self): + raise SkipTest("Still not sure why gevent isn't working...") + if gevent is None: raise SkipTest("No gevent available") self.try_some_code(self.GEVENT, "--coroutine=gevent") + + +def print_simple_annotation(code, linenos): + """Print the lines in `code` with X for each line number in `linenos`.""" + for lineno, line in enumerate(code.splitlines(), start=1): + print(" {0:s} {1}".format("X" if lineno in linenos else " ", line)) |