1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
"""Tests for coroutining."""
from nose.plugins.skip import SkipTest
import coverage
from tests.coveragetest import CoverageTest
# These libraries aren't always available, we'll skip tests if they aren't.
try:
import eventlet # pylint: disable=import-error
except ImportError:
eventlet = None
try:
import gevent # pylint: disable=import-error
except ImportError:
gevent = None
def line_count(s):
"""How many non-blank lines are in `s`?"""
return sum(1 for l in s.splitlines() if l.strip())
class CoroutineTest(CoverageTest):
"""Tests of the coroutine support in coverage.py."""
# The code common to all the concurrency models. Don't use any comments,
# we're counting non-blank lines to see they are all covered.
COMMON = """
class Producer(threading.Thread):
def __init__(self, q):
threading.Thread.__init__(self)
self.q = q
def run(self):
for i in range(1000):
self.q.put(i)
self.q.put(None)
class Consumer(threading.Thread):
def __init__(self, q):
threading.Thread.__init__(self)
self.q = q
def run(self):
sum = 0
while True:
i = self.q.get()
if i is None:
print(sum)
break
sum += i
q = Queue.Queue()
c = Consumer(q)
c.start()
p = Producer(q)
p.start()
p.join()
c.join()
"""
# Import the things to use threads.
THREAD = """\
import threading
try:
import Queue
except ImportError: # Python 3 :)
import queue as Queue
""" + COMMON
# Import the things to use eventlet.
EVENTLET = """\
import eventlet.green.threading as threading
import eventlet.queue as Queue
""" + COMMON
# Import the things to use gevent.
GEVENT = """\
from gevent import monkey
monkey.patch_thread()
import threading
import gevent.queue as Queue
""" + COMMON
def try_some_code(self, code, args):
"""Run some coroutine testing code and see that it was all covered."""
self.make_file("try_it.py", code)
raise SkipTest("Need to put this on a back burner for a while...")
out = self.run_command("coverage run %s try_it.py" % args)
expected_out = "%d\n" % (sum(range(1000)))
self.assertEqual(out, expected_out)
# Read the coverage file and see that try_it.py has all its lines
# executed.
data = coverage.CoverageData()
data.read_file(".coverage")
lines = line_count(code)
self.assertEqual(data.summary()['try_it.py'], lines)
def test_threads(self):
self.try_some_code(self.THREAD, "")
def test_eventlet(self):
if eventlet is None:
raise SkipTest("No eventlet available")
self.try_some_code(self.EVENTLET, "--coroutine=eventlet")
def test_gevent(self):
if gevent is None:
raise SkipTest("No gevent available")
self.try_some_code(self.GEVENT, "--coroutine=gevent")
|