1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
|
"""Channel testing"""
from test.testlib import *
from git.async.pool import *
from git.async.task import *
from git.async.util import cpu_count
import threading
import time
class TestThreadTaskNode(InputIteratorThreadTask):
def __init__(self, *args, **kwargs):
super(TestThreadTaskNode, self).__init__(*args, **kwargs)
self.reset(self._iterator)
self.should_fail = False
def do_fun(self, item):
self.item_count += 1
if self.should_fail:
raise AssertionError("I am failing just for the fun of it")
return item
def reset(self, iterator):
self.process_count = 0
self.item_count = 0
self._iterator = iterator
def process(self, count=1):
super(TestThreadTaskNode, self).process(count)
self.process_count += 1
def _assert(self, pc, fc):
"""Assert for num process counts (pc) and num function counts (fc)
:return: self"""
assert self.process_count == pc
assert self.item_count == fc
assert not self.error()
return self
class TestThreadPool(TestBase):
max_threads = cpu_count()
def _assert_sync_single_task(self, p):
"""Performs testing in a synchronized environment"""
null_tasks = p.num_tasks() # in case we had some before
# add a simple task
# it iterates n items
ni = 20
assert ni % 2 == 0, "ni needs to be dividable by 2"
def make_iter():
return iter(range(ni))
# END utility
task = TestThreadTaskNode(make_iter(), 'iterator', None)
task.fun = task.do_fun
assert p.num_tasks() == null_tasks
rc = p.add_task(task)
assert p.num_tasks() == 1 + null_tasks
assert isinstance(rc, RPoolChannel)
assert task._out_wc is not None
# pull the result completely - we should get one task, which calls its
# function once. In sync mode, the order matches
items = rc.read()
assert len(items) == ni
task._assert(1, ni).reset(make_iter())
assert items[0] == 0 and items[-1] == ni-1
# as the task is done, it should have been removed - we have read everything
assert task.is_done()
assert p.num_tasks() == null_tasks
# pull individual items
rc = p.add_task(task)
assert p.num_tasks() == 1 + null_tasks
for i in range(ni):
items = rc.read(1)
assert len(items) == 1
assert i == items[0]
# END for each item
# it couldn't yet notice that the input is depleted as we pulled exaclty
# ni items - the next one would remove it. Instead, we delete our channel
# which triggers orphan handling
assert p.num_tasks() == 1 + null_tasks
del(rc)
assert p.num_tasks() == null_tasks
task.reset(make_iter())
# test min count
# if we query 1 item, it will prepare ni / 2
task.min_count = ni / 2
rc = p.add_task(task)
assert len(rc.read(1)) == 1 # processes ni / 2
assert len(rc.read(1)) == 1 # processes nothing
# rest - it has ni/2 - 2 on the queue, and pulls ni-2
# It wants too much, so the task realizes its done. The task
# doesn't care about the items in its output channel
assert len(rc.read(ni-2)) == ni - 2
assert p.num_tasks() == null_tasks
task._assert(2, ni) # two chunks, 20 calls ( all items )
# its already done, gives us no more
assert len(rc.read()) == 0
# test chunking
# we always want 4 chunks, these could go to individual nodes
task.reset(make_iter())
task.max_chunksize = ni / 4 # 4 chunks
rc = p.add_task(task)
# must read a specific item count
# count is still at ni / 2 - here we want more than that
assert len(rc.read(ni / 2 + 2)) == ni / 2 + 2 # make sure its uneven ;)
assert len(rc.read(ni / 2 - 2)) == ni / 2 - 2
# END read chunks
task._assert(ni / 4, ni) # read two times, got 4 processing steps
assert p.num_tasks() == null_tasks # depleted
# but this only hits if we want too many items, if we want less, it could
# still do too much - hence we set the min_count to the same number to enforce
# at least ni / 4 items to be preocessed, no matter what we request
task.reset(make_iter())
task.min_count = None
rc = p.add_task(task)
for i in range(ni):
assert rc.read(1)[0] == i
# END pull individual items
# too many processing counts ;)
task._assert(ni, ni)
assert p.num_tasks() == 1 + null_tasks
assert p.del_task(task) is p # del manually this time
assert p.num_tasks() == null_tasks
# now with we set the minimum count to reduce the number of processing counts
task.reset(make_iter())
task.min_count = ni / 4
rc = p.add_task(task)
for i in range(ni):
assert rc.read(1)[0] == i
# END for each item
task._assert(ni / task.min_count, ni)
del(rc)
assert p.num_tasks() == null_tasks
# test failure
# on failure, the processing stops and the task is finished, keeping
# his error for later
task.reset(make_iter())
task.should_fail = True
rc = p.add_task(task)
assert len(rc.read()) == 0 # failure on first item
assert isinstance(task.error(), AssertionError)
assert p.num_tasks() == null_tasks
def _assert_async_dependent_tasks(self, p):
# includes failure in center task, 'recursive' orphan cleanup
# This will also verify that the channel-close mechanism works
# t1 -> t2 -> t3
# t1 -> x -> t3
pass
def test_base(self):
p = ThreadPool()
# default pools have no workers
assert p.size() == 0
# increase and decrease the size
for i in range(self.max_threads):
p.set_size(i)
assert p.size() == i
for i in range(self.max_threads, -1, -1):
p.set_size(i)
assert p.size() == i
# SINGLE TASK SERIAL SYNC MODE
##############################
# put a few unrelated tasks that we forget about
urc1 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None))
urc2 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None))
assert p.num_tasks() == 2
self._assert_sync_single_task(p)
assert p.num_tasks() == 2
del(urc1)
del(urc2)
assert p.num_tasks() == 0
# DEPENDENT TASKS SERIAL
########################
self._assert_async_dependent_tasks(p)
# SINGLE TASK THREADED SYNC MODE
################################
# step one gear up - just one thread for now.
num_threads = len(threading.enumerate())
p.set_size(1)
assert len(threading.enumerate()) == num_threads + 1
# deleting the pool stops its threads - just to be sure ;)
del(p)
assert len(threading.enumerate()) == num_threads
p = ThreadPool(1)
assert len(threading.enumerate()) == num_threads + 1
# here we go
self._assert_sync_single_task(p)
# SINGLE TASK ASYNC MODE
########################
# two threads to compete for a single task
# DEPENDENT TASK ASYNC MODE
###########################
self._assert_async_dependent_tasks(p)
|