summaryrefslogtreecommitdiff
path: root/tests/test_coroutine.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_coroutine.py')
-rw-r--r--tests/test_coroutine.py208
1 files changed, 0 insertions, 208 deletions
diff --git a/tests/test_coroutine.py b/tests/test_coroutine.py
deleted file mode 100644
index 4abdd6f..0000000
--- a/tests/test_coroutine.py
+++ /dev/null
@@ -1,208 +0,0 @@
-"""Tests for coroutining."""
-
-import os, os.path, sys, threading
-
-import coverage
-
-from tests.coveragetest import CoverageTest
-
-
-# These libraries aren't always available, we'll skip tests if they aren't.
-
-try:
- import eventlet # pylint: disable=import-error
-except ImportError:
- eventlet = None
-
-try:
- import gevent # pylint: disable=import-error
-except ImportError:
- gevent = None
-
-try:
- import greenlet # pylint: disable=import-error
-except ImportError:
- greenlet = None
-
-# Are we running with the C tracer or not?
-C_TRACER = os.getenv('COVERAGE_TEST_TRACER', 'c') == 'c'
-
-
-def line_count(s):
- """How many non-blank non-comment lines are in `s`?"""
- def code_line(l):
- """Is this a code line? Not blank, and not a full-line comment."""
- return l.strip() and not l.strip().startswith('#')
- return sum(1 for l in s.splitlines() if code_line(l))
-
-
-class CoroutineTest(CoverageTest):
- """Tests of the coroutine support in coverage.py."""
-
- LIMIT = 1000
-
- # The code common to all the concurrency models.
- COMMON = """
- class Producer(threading.Thread):
- def __init__(self, q):
- threading.Thread.__init__(self)
- self.q = q
-
- def run(self):
- for i in range({LIMIT}):
- self.q.put(i)
- self.q.put(None)
-
- class Consumer(threading.Thread):
- def __init__(self, q):
- threading.Thread.__init__(self)
- self.q = q
-
- def run(self):
- sum = 0
- while True:
- i = self.q.get()
- if i is None:
- print(sum)
- break
- sum += i
-
- q = queue.Queue()
- c = Consumer(q)
- p = Producer(q)
- c.start()
- p.start()
-
- p.join()
- c.join()
- """.format(LIMIT=LIMIT)
-
- # Import the things to use threads.
- if sys.version_info < (3, 0):
- THREAD = """\
- import threading
- import Queue as queue
- """ + COMMON
- else:
- THREAD = """\
- import threading
- import queue
- """ + COMMON
-
- # Import the things to use eventlet.
- EVENTLET = """\
- import eventlet.green.threading as threading
- import eventlet.queue as queue
- """ + COMMON
-
- # Import the things to use gevent.
- GEVENT = """\
- from gevent import monkey
- monkey.patch_thread()
- import threading
- import gevent.queue as queue
- """ + COMMON
-
- # Uncomplicated code that doesn't use any of the coroutining stuff, to test
- # the simple case under each of the regimes.
- SIMPLE = """\
- total = 0
- for i in range({LIMIT}):
- total += i
- print(total)
- """.format(LIMIT=LIMIT)
-
- def try_some_code(self, code, coroutine, the_module, expected_out=None):
- """Run some coroutine testing code and see that it was all covered.
-
- `code` is the Python code to execute. `coroutine` is the name of the
- coroutine regime to test it under. `the_module` is the imported module
- that must be available for this to work at all. `expected_out` is the
- text we expect the code to produce.
-
- """
-
- self.make_file("try_it.py", code)
-
- cmd = "coverage run --coroutine=%s try_it.py" % coroutine
- out = self.run_command(cmd)
-
- if not the_module:
- # We don't even have the underlying module installed, we expect
- # coverage to alert us to this fact.
- expected_out = (
- "Couldn't trace with coroutine=%s, "
- "the module isn't installed.\n" % coroutine
- )
- self.assertEqual(out, expected_out)
- elif C_TRACER or coroutine == "thread":
- # We can fully measure the code if we are using the C tracer, which
- # can support all the coroutining, or if we are using threads.
- if expected_out is None:
- expected_out = "%d\n" % (sum(range(self.LIMIT)))
- self.assertEqual(out, expected_out)
-
- # Read the coverage file and see that try_it.py has all its lines
- # executed.
- data = coverage.CoverageData()
- data.read_file(".coverage")
-
- # If the test fails, it's helpful to see this info:
- fname = os.path.abspath("try_it.py")
- linenos = data.executed_lines(fname).keys()
- print("{0}: {1}".format(len(linenos), linenos))
- print_simple_annotation(code, linenos)
-
- lines = line_count(code)
- self.assertEqual(data.summary()['try_it.py'], lines)
- else:
- expected_out = (
- "Can't support coroutine=%s with PyTracer, "
- "only threads are supported\n" % coroutine
- )
- self.assertEqual(out, expected_out)
-
- def test_threads(self):
- self.try_some_code(self.THREAD, "thread", threading)
-
- def test_threads_simple_code(self):
- self.try_some_code(self.SIMPLE, "thread", threading)
-
- def test_eventlet(self):
- self.try_some_code(self.EVENTLET, "eventlet", eventlet)
-
- def test_eventlet_simple_code(self):
- self.try_some_code(self.SIMPLE, "eventlet", eventlet)
-
- def test_gevent(self):
- self.try_some_code(self.GEVENT, "gevent", gevent)
-
- def test_gevent_simple_code(self):
- self.try_some_code(self.SIMPLE, "gevent", gevent)
-
- def test_greenlet(self):
- GREENLET = """\
- from greenlet import greenlet
-
- def test1(x, y):
- z = gr2.switch(x+y)
- print(z)
-
- def test2(u):
- print(u)
- gr1.switch(42)
-
- gr1 = greenlet(test1)
- gr2 = greenlet(test2)
- gr1.switch("hello", " world")
- """
- self.try_some_code(GREENLET, "greenlet", greenlet, "hello world\n42\n")
-
- def test_greenlet_simple_code(self):
- self.try_some_code(self.SIMPLE, "greenlet", greenlet)
-
-
-def print_simple_annotation(code, linenos):
- """Print the lines in `code` with X for each line number in `linenos`."""
- for lineno, line in enumerate(code.splitlines(), start=1):
- print(" {0} {1}".format("X" if lineno in linenos else " ", line))