diff options
| author | Danny Allen <me@dannya.com> | 2014-09-22 12:05:55 +0100 |
|---|---|---|
| committer | Danny Allen <me@dannya.com> | 2014-09-22 12:05:55 +0100 |
| commit | 1b6d0d06624170fb7a17738387387b1f21357e94 (patch) | |
| tree | 335402fdef527656f37d3024345c9b532592bce7 /tests/test_coroutine.py | |
| parent | c4935999f882e7317121e884629d07080f1bc776 (diff) | |
| parent | d68b95f7a0a201b2e8e830b6d4769005ef0223fa (diff) | |
| download | python-coveragepy-git-1b6d0d06624170fb7a17738387387b1f21357e94.tar.gz | |
Merged ned/coveragepy into default
Diffstat (limited to 'tests/test_coroutine.py')
| -rw-r--r-- | tests/test_coroutine.py | 121 |
1 files changed, 93 insertions, 28 deletions
diff --git a/tests/test_coroutine.py b/tests/test_coroutine.py index fe6c8326..4abdd6f6 100644 --- a/tests/test_coroutine.py +++ b/tests/test_coroutine.py @@ -1,8 +1,7 @@ """Tests for coroutining.""" -import os.path, sys +import os, os.path, sys, threading -from nose.plugins.skip import SkipTest import coverage from tests.coveragetest import CoverageTest @@ -20,6 +19,14 @@ try: except ImportError: gevent = None +try: + import greenlet # pylint: disable=import-error +except ImportError: + greenlet = None + +# Are we running with the C tracer or not? +C_TRACER = os.getenv('COVERAGE_TEST_TRACER', 'c') == 'c' + def line_count(s): """How many non-blank non-comment lines are in `s`?""" @@ -96,48 +103,106 @@ class CoroutineTest(CoverageTest): import gevent.queue as queue """ + COMMON - def try_some_code(self, code, args): - """Run some coroutine testing code and see that it was all covered.""" + # Uncomplicated code that doesn't use any of the coroutining stuff, to test + # the simple case under each of the regimes. + SIMPLE = """\ + total = 0 + for i in range({LIMIT}): + total += i + print(total) + """.format(LIMIT=LIMIT) - self.make_file("try_it.py", code) + def try_some_code(self, code, coroutine, the_module, expected_out=None): + """Run some coroutine testing code and see that it was all covered. - out = self.run_command("coverage run --timid %s try_it.py" % args) - expected_out = "%d\n" % (sum(range(self.LIMIT))) - self.assertEqual(out, expected_out) + `code` is the Python code to execute. `coroutine` is the name of the + coroutine regime to test it under. `the_module` is the imported module + that must be available for this to work at all. `expected_out` is the + text we expect the code to produce. - # Read the coverage file and see that try_it.py has all its lines - # executed. - data = coverage.CoverageData() - data.read_file(".coverage") + """ - # If the test fails, it's helpful to see this info: - fname = os.path.abspath("try_it.py") - linenos = data.executed_lines(fname).keys() - print("{0}: {1}".format(len(linenos), linenos)) - print_simple_annotation(code, linenos) + self.make_file("try_it.py", code) - lines = line_count(code) - self.assertEqual(data.summary()['try_it.py'], lines) + cmd = "coverage run --coroutine=%s try_it.py" % coroutine + out = self.run_command(cmd) + + if not the_module: + # We don't even have the underlying module installed, we expect + # coverage to alert us to this fact. + expected_out = ( + "Couldn't trace with coroutine=%s, " + "the module isn't installed.\n" % coroutine + ) + self.assertEqual(out, expected_out) + elif C_TRACER or coroutine == "thread": + # We can fully measure the code if we are using the C tracer, which + # can support all the coroutining, or if we are using threads. + if expected_out is None: + expected_out = "%d\n" % (sum(range(self.LIMIT))) + self.assertEqual(out, expected_out) + + # Read the coverage file and see that try_it.py has all its lines + # executed. + data = coverage.CoverageData() + data.read_file(".coverage") + + # If the test fails, it's helpful to see this info: + fname = os.path.abspath("try_it.py") + linenos = data.executed_lines(fname).keys() + print("{0}: {1}".format(len(linenos), linenos)) + print_simple_annotation(code, linenos) + + lines = line_count(code) + self.assertEqual(data.summary()['try_it.py'], lines) + else: + expected_out = ( + "Can't support coroutine=%s with PyTracer, " + "only threads are supported\n" % coroutine + ) + self.assertEqual(out, expected_out) def test_threads(self): - self.try_some_code(self.THREAD, "") + self.try_some_code(self.THREAD, "thread", threading) + + def test_threads_simple_code(self): + self.try_some_code(self.SIMPLE, "thread", threading) def test_eventlet(self): - if eventlet is None: - raise SkipTest("No eventlet available") + self.try_some_code(self.EVENTLET, "eventlet", eventlet) - self.try_some_code(self.EVENTLET, "--coroutine=eventlet") + def test_eventlet_simple_code(self): + self.try_some_code(self.SIMPLE, "eventlet", eventlet) def test_gevent(self): - raise SkipTest("Still not sure why gevent isn't working...") + self.try_some_code(self.GEVENT, "gevent", gevent) + + def test_gevent_simple_code(self): + self.try_some_code(self.SIMPLE, "gevent", gevent) + + def test_greenlet(self): + GREENLET = """\ + from greenlet import greenlet + + def test1(x, y): + z = gr2.switch(x+y) + print(z) + + def test2(u): + print(u) + gr1.switch(42) - if gevent is None: - raise SkipTest("No gevent available") + gr1 = greenlet(test1) + gr2 = greenlet(test2) + gr1.switch("hello", " world") + """ + self.try_some_code(GREENLET, "greenlet", greenlet, "hello world\n42\n") - self.try_some_code(self.GEVENT, "--coroutine=gevent") + def test_greenlet_simple_code(self): + self.try_some_code(self.SIMPLE, "greenlet", greenlet) def print_simple_annotation(code, linenos): """Print the lines in `code` with X for each line number in `linenos`.""" for lineno, line in enumerate(code.splitlines(), start=1): - print(" {0:s} {1}".format("X" if lineno in linenos else " ", line)) + print(" {0} {1}".format("X" if lineno in linenos else " ", line)) |
