summaryrefslogtreecommitdiff
path: root/test/git/async/test_pool.py
blob: 0d779f390feb99ddd13f29cb182bd6d542531126 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
"""Channel testing"""
from test.testlib import *
from git.async.pool import *
from git.async.task import *
from git.async.thread import terminate_threads
from git.async.util import cpu_count
import threading
import time

class TestThreadTaskNode(InputIteratorThreadTask):
	def __init__(self, *args, **kwargs):
		super(TestThreadTaskNode, self).__init__(*args, **kwargs)
		self.reset(self._iterator)
		self.should_fail = False
		self.lock = threading.Lock()		# yes, can't safely do x = x + 1 :)
		self.plock = threading.Lock()
	
	def do_fun(self, item):
		self.lock.acquire()
		self.item_count += 1
		self.lock.release()
		if self.should_fail:
			raise AssertionError("I am failing just for the fun of it")
		return item
	
	def reset(self, iterator):
		self.process_count = 0
		self.item_count = 0
		self._iterator = iterator
		
	def process(self, count=1):
		# must do it first, otherwise we might read and check results before
		# the thread gets here :). Its a lesson !
		self.plock.acquire()
		self.process_count += 1
		self.plock.release()
		super(TestThreadTaskNode, self).process(count)
		
	def _assert(self, pc, fc, check_scheduled=False):
		"""Assert for num process counts (pc) and num function counts (fc)
		:return: self"""
		self.plock.acquire()
		if self.process_count != pc:
			print self.process_count, pc
		assert self.process_count == pc
		self.plock.release()
		self.lock.acquire()
		if self.item_count != fc:
			print self.item_count, fc
		assert self.item_count == fc
		self.lock.release()
		
		# if we read all, we can't really use scheduled items
		if check_scheduled:
			assert self._scheduled_items == 0
		assert not self.error()
		return self
		

class TestThreadPool(TestBase):
	
	max_threads = cpu_count()
	
	def _add_triple_task(self, p):
		"""Add a triplet of feeder, transformer and finalizer to the pool, like
		t1 -> t2 -> t3, return all 3 return channels in order"""
		t1 = TestThreadTaskNode(make_iter(), 'iterator', None)
		# TODO:
	
	def _assert_single_task(self, p, async=False):
		"""Performs testing in a synchronized environment"""
		null_tasks = p.num_tasks()		# in case we had some before
		
		# add a simple task
		# it iterates n items
		ni = 500
		assert ni % 2 == 0, "ni needs to be dividable by 2"
		assert ni % 4 == 0, "ni needs to be dividable by 4"
		
		def make_iter():
			return iter(range(ni))
		# END utility
		
		task = TestThreadTaskNode(make_iter(), 'iterator', None)
		task.fun = task.do_fun
		
		assert p.num_tasks() == null_tasks
		rc = p.add_task(task)
		assert p.num_tasks() == 1 + null_tasks
		assert isinstance(rc, RPoolChannel)
		assert task._out_wc is not None
		
		# pull the result completely - we should get one task, which calls its 
		# function once. In sync mode, the order matches
		items = rc.read()
		assert len(items) == ni
		task._assert(1, ni).reset(make_iter())
		assert items[0] == 0 and items[-1] == ni-1
		
		# as the task is done, it should have been removed - we have read everything
		assert task.is_done()
		assert p.num_tasks() == null_tasks
		
		# pull individual items
		rc = p.add_task(task)
		assert p.num_tasks() == 1 + null_tasks
		st = time.time()
		for i in range(ni):
			items = rc.read(1)
			assert len(items) == 1
			
			# can't assert order in async mode
			if not async:
				assert i == items[0]
		# END for each item
		elapsed = time.time() - st
		print >> sys.stderr, "Threadpool: processed %i individual items, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, p.size(), elapsed, ni / elapsed)
		
		# it couldn't yet notice that the input is depleted as we pulled exaclty 
		# ni items - the next one would remove it. Instead, we delete our channel 
		# which triggers orphan handling
		assert p.num_tasks() == 1 + null_tasks
		del(rc)
		assert p.num_tasks() == null_tasks
		
		task.reset(make_iter())
		
		# test min count
		# if we query 1 item, it will prepare ni / 2
		task.min_count = ni / 2
		rc = p.add_task(task)
		items = rc.read(1)
		assert len(items) == 1 and items[0] == 0			# processes ni / 2
		items = rc.read(1)
		assert len(items) == 1 and items[0] == 1			# processes nothing
		# rest - it has ni/2 - 2 on the queue, and pulls ni-2
		# It wants too much, so the task realizes its done. The task
		# doesn't care about the items in its output channel
		items = rc.read(ni-2)
		assert len(items) == ni - 2
		assert p.num_tasks() == null_tasks
		task._assert(2, ni)						# two chunks, ni calls
		
		# its already done, gives us no more
		assert len(rc.read()) == 0
		
		# test chunking
		# we always want 4 chunks, these could go to individual nodes
		task.reset(make_iter())
		task.max_chunksize = ni / 4			# 4 chunks
		rc = p.add_task(task)
		# must read a specific item count
		# count is still at ni / 2 - here we want more than that
		# 2 steps with n / 4 items, + 1 step with n/4 items to get + 2
		assert len(rc.read(ni / 2 + 2)) == ni / 2 + 2
		# have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing
		# ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing
		items = rc.read(ni / 2 - 2)
		assert len(items) == ni / 2 - 2
		
		task._assert( 5, ni)
		assert p.num_tasks() == null_tasks	# depleted
		
		# but this only hits if we want too many items, if we want less, it could 
		# still do too much - hence we set the min_count to the same number to enforce
		# at least ni / 4 items to be preocessed, no matter what we request
		task.reset(make_iter())
		task.min_count = None
		rc = p.add_task(task)
		st = time.time()
		for i in range(ni):
			if async:
				assert len(rc.read(1)) == 1
			else:
				assert rc.read(1)[0] == i
			# END handle async mode
		# END pull individual items
		# too many processing counts ;)
		elapsed = time.time() - st
		print >> sys.stderr, "Threadpool: processed %i individual items in chunks of %i, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, ni/4, p.size(), elapsed, ni / elapsed)
		
		task._assert(ni, ni)
		assert p.num_tasks() == 1 + null_tasks
		assert p.del_task(task) is p		# del manually this time
		assert p.num_tasks() == null_tasks
		
		# now with we set the minimum count to reduce the number of processing counts
		task.reset(make_iter())
		task.min_count = ni / 4
		rc = p.add_task(task)
		for i in range(ni):
			if async:
				assert len(rc.read(1)) == 1
			else:
				assert rc.read(1)[0] == i
		# END for each item
		task._assert(ni / task.min_count, ni)
		del(rc)
		assert p.num_tasks() == null_tasks
		
		# test failure
		# on failure, the processing stops and the task is finished, keeping 
		# his error for later
		task.reset(make_iter())
		task.should_fail = True
		rc = p.add_task(task)
		assert len(rc.read()) == 0		# failure on first item
		assert isinstance(task.error(), AssertionError)
		assert p.num_tasks() == null_tasks
		
	def _assert_async_dependent_tasks(self, p):
		# includes failure in center task, 'recursive' orphan cleanup
		# This will also verify that the channel-close mechanism works
		# t1 -> t2 -> t3
		# t1 -> x -> t3
		pass
	
	@terminate_threads
	def test_base(self):
		p = ThreadPool()
		
		# default pools have no workers
		assert p.size() == 0
		
		# increase and decrease the size
		for i in range(self.max_threads):
			p.set_size(i)
			assert p.size() == i
		for i in range(self.max_threads, -1, -1):
			p.set_size(i)
			assert p.size() == i
			
		# SINGLE TASK SERIAL SYNC MODE
		##############################
		# put a few unrelated tasks that we forget about
		urc1 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None))
		urc2 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None))
		assert p.num_tasks() == 2
		
		## SINGLE TASK #################
		self._assert_single_task(p, False)
		assert p.num_tasks() == 2
		del(urc1)
		del(urc2)
		assert p.num_tasks() == 0
		
		
		# DEPENDENT TASKS SERIAL
		########################
		self._assert_async_dependent_tasks(p)
		
		
		# SINGLE TASK THREADED SYNC MODE
		################################
		# step one gear up - just one thread for now.
		num_threads = len(threading.enumerate())
		p.set_size(1)
		assert len(threading.enumerate()) == num_threads + 1
		# deleting the pool stops its threads - just to be sure ;)
		del(p)
		assert len(threading.enumerate()) == num_threads
		
		p = ThreadPool(1)
		assert len(threading.enumerate()) == num_threads + 1
		
		# here we go
		self._assert_single_task(p, False)
		
		
		
		# SINGLE TASK ASYNC MODE
		########################
		# two threads to compete for a single task
		p.set_size(2)
		self._assert_single_task(p, True)
		
		# real stress test-  should be native on every dual-core cpu with 2 hardware
		# threads per core
		p.set_size(4)
		self._assert_single_task(p, True)
		
		# DEPENDENT TASK ASYNC MODE
		###########################
		self._assert_async_dependent_tasks(p)