diff options
Diffstat (limited to 'tests/test_coroutine.py')
| -rw-r--r-- | tests/test_coroutine.py | 208 |
1 files changed, 0 insertions, 208 deletions
diff --git a/tests/test_coroutine.py b/tests/test_coroutine.py deleted file mode 100644 index 4abdd6f..0000000 --- a/tests/test_coroutine.py +++ /dev/null @@ -1,208 +0,0 @@ -"""Tests for coroutining.""" - -import os, os.path, sys, threading - -import coverage - -from tests.coveragetest import CoverageTest - - -# These libraries aren't always available, we'll skip tests if they aren't. - -try: - import eventlet # pylint: disable=import-error -except ImportError: - eventlet = None - -try: - import gevent # pylint: disable=import-error -except ImportError: - gevent = None - -try: - import greenlet # pylint: disable=import-error -except ImportError: - greenlet = None - -# Are we running with the C tracer or not? -C_TRACER = os.getenv('COVERAGE_TEST_TRACER', 'c') == 'c' - - -def line_count(s): - """How many non-blank non-comment lines are in `s`?""" - def code_line(l): - """Is this a code line? Not blank, and not a full-line comment.""" - return l.strip() and not l.strip().startswith('#') - return sum(1 for l in s.splitlines() if code_line(l)) - - -class CoroutineTest(CoverageTest): - """Tests of the coroutine support in coverage.py.""" - - LIMIT = 1000 - - # The code common to all the concurrency models. - COMMON = """ - class Producer(threading.Thread): - def __init__(self, q): - threading.Thread.__init__(self) - self.q = q - - def run(self): - for i in range({LIMIT}): - self.q.put(i) - self.q.put(None) - - class Consumer(threading.Thread): - def __init__(self, q): - threading.Thread.__init__(self) - self.q = q - - def run(self): - sum = 0 - while True: - i = self.q.get() - if i is None: - print(sum) - break - sum += i - - q = queue.Queue() - c = Consumer(q) - p = Producer(q) - c.start() - p.start() - - p.join() - c.join() - """.format(LIMIT=LIMIT) - - # Import the things to use threads. - if sys.version_info < (3, 0): - THREAD = """\ - import threading - import Queue as queue - """ + COMMON - else: - THREAD = """\ - import threading - import queue - """ + COMMON - - # Import the things to use eventlet. - EVENTLET = """\ - import eventlet.green.threading as threading - import eventlet.queue as queue - """ + COMMON - - # Import the things to use gevent. - GEVENT = """\ - from gevent import monkey - monkey.patch_thread() - import threading - import gevent.queue as queue - """ + COMMON - - # Uncomplicated code that doesn't use any of the coroutining stuff, to test - # the simple case under each of the regimes. - SIMPLE = """\ - total = 0 - for i in range({LIMIT}): - total += i - print(total) - """.format(LIMIT=LIMIT) - - def try_some_code(self, code, coroutine, the_module, expected_out=None): - """Run some coroutine testing code and see that it was all covered. - - `code` is the Python code to execute. `coroutine` is the name of the - coroutine regime to test it under. `the_module` is the imported module - that must be available for this to work at all. `expected_out` is the - text we expect the code to produce. - - """ - - self.make_file("try_it.py", code) - - cmd = "coverage run --coroutine=%s try_it.py" % coroutine - out = self.run_command(cmd) - - if not the_module: - # We don't even have the underlying module installed, we expect - # coverage to alert us to this fact. - expected_out = ( - "Couldn't trace with coroutine=%s, " - "the module isn't installed.\n" % coroutine - ) - self.assertEqual(out, expected_out) - elif C_TRACER or coroutine == "thread": - # We can fully measure the code if we are using the C tracer, which - # can support all the coroutining, or if we are using threads. - if expected_out is None: - expected_out = "%d\n" % (sum(range(self.LIMIT))) - self.assertEqual(out, expected_out) - - # Read the coverage file and see that try_it.py has all its lines - # executed. - data = coverage.CoverageData() - data.read_file(".coverage") - - # If the test fails, it's helpful to see this info: - fname = os.path.abspath("try_it.py") - linenos = data.executed_lines(fname).keys() - print("{0}: {1}".format(len(linenos), linenos)) - print_simple_annotation(code, linenos) - - lines = line_count(code) - self.assertEqual(data.summary()['try_it.py'], lines) - else: - expected_out = ( - "Can't support coroutine=%s with PyTracer, " - "only threads are supported\n" % coroutine - ) - self.assertEqual(out, expected_out) - - def test_threads(self): - self.try_some_code(self.THREAD, "thread", threading) - - def test_threads_simple_code(self): - self.try_some_code(self.SIMPLE, "thread", threading) - - def test_eventlet(self): - self.try_some_code(self.EVENTLET, "eventlet", eventlet) - - def test_eventlet_simple_code(self): - self.try_some_code(self.SIMPLE, "eventlet", eventlet) - - def test_gevent(self): - self.try_some_code(self.GEVENT, "gevent", gevent) - - def test_gevent_simple_code(self): - self.try_some_code(self.SIMPLE, "gevent", gevent) - - def test_greenlet(self): - GREENLET = """\ - from greenlet import greenlet - - def test1(x, y): - z = gr2.switch(x+y) - print(z) - - def test2(u): - print(u) - gr1.switch(42) - - gr1 = greenlet(test1) - gr2 = greenlet(test2) - gr1.switch("hello", " world") - """ - self.try_some_code(GREENLET, "greenlet", greenlet, "hello world\n42\n") - - def test_greenlet_simple_code(self): - self.try_some_code(self.SIMPLE, "greenlet", greenlet) - - -def print_simple_annotation(code, linenos): - """Print the lines in `code` with X for each line number in `linenos`.""" - for lineno, line in enumerate(code.splitlines(), start=1): - print(" {0} {1}".format("X" if lineno in linenos else " ", line)) |
