summaryrefslogtreecommitdiff
path: root/test/git/async/test_pool.py
blob: 65b2d228bb325e59f4a25cf280a95df748984ce0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
"""Channel testing"""
from test.testlib import *
from git.async.pool import *
from git.async.task import *
from git.async.util import cpu_count
import threading
import time

class TestThreadTaskNode(InputIteratorThreadTask):
	def __init__(self, *args, **kwargs):
		super(TestThreadTaskNode, self).__init__(*args, **kwargs)
		self.reset(self._iterator)
	
	def do_fun(self, item):
		self.item_count += 1
		return item
	
	def reset(self, iterator):
		self.process_count = 0
		self.item_count = 0
		self._iterator = iterator
		
	def process(self, count=1):
		super(TestThreadTaskNode, self).process(count)
		self.process_count += 1
		
	def _assert(self, pc, fc):
		"""Assert for num process counts (pc) and num function counts (fc)
		:return: self"""
		assert self.process_count == pc
		assert self.item_count == fc
		
		return self
		

class TestThreadPool(TestBase):
	
	max_threads = cpu_count()
	
	def _assert_sync_single_task(self, p):
		"""Performs testing in a synchronized environment"""
		null_tasks = p.num_tasks()		# in case we had some before
		
		# add a simple task
		# it iterates n items
		ni = 20
		assert ni % 2 == 0, "ni needs to be dividable by 2"
		
		def make_iter():
			return iter(range(ni))
		# END utility
		
		task = TestThreadTaskNode(make_iter(), 'iterator', None)
		task.fun = task.do_fun
		
		assert p.num_tasks() == null_tasks
		rc = p.add_task(task)
		assert p.num_tasks() == 1 + null_tasks
		assert isinstance(rc, RPoolChannel)
		assert task._out_wc is not None
		
		# pull the result completely - we should get one task, which calls its 
		# function once. In serial mode, the order matches
		items = rc.read()
		task._assert(1, ni).reset(make_iter())
		assert len(items) == ni
		assert items[0] == 0 and items[-1] == ni-1
		
		# as the task is done, it should have been removed - we have read everything
		assert task.is_done()
		assert p.num_tasks() == null_tasks
		
		# pull individual items
		rc = p.add_task(task)
		assert p.num_tasks() == 1 + null_tasks
		for i in range(ni):
			items = rc.read(1)
			assert len(items) == 1
			assert i == items[0]
		# END for each item
		# it couldn't yet notice that the input is depleted as we pulled exaclty 
		# ni items - the next one would remove it. Instead, we delete our channel 
		# which triggers orphan handling
		assert p.num_tasks() == 1 + null_tasks
		del(rc)
		assert p.num_tasks() == null_tasks
		
		task.reset(make_iter())
		
		# test min count
		# if we query 1 item, it will prepare ni / 2
		task.min_count = ni / 2
		rc = p.add_task(task)
		assert len(rc.read(1)) == 1			# 1
		assert len(rc.read(1)) == 1
		assert len(rc.read(ni-2)) == ni - 2	# rest - it has ni/2 - 2 on the queue, and pulls ni-2
		task._assert(2, ni)						# two chunks, 20 calls ( all items )
		assert p.num_tasks() == 1 + null_tasks	# it still doesn't know, didn't read too much
		assert len(rc.read()) == 0				# now we read too much and its done
		assert p.num_tasks() == null_tasks
		
		# test chunking
		# we always want 4 chunks, these could go to individual nodes
		task.reset(make_iter())
		task.max_chunksize = ni / 4			# 4 chunks
		rc = p.add_task(task)
		# must read a specific item count
		# count is still at ni / 2 - here we want more than that
		assert len(rc.read(ni / 2 + 2)) == ni / 2 + 2		# make sure its uneven ;)
		assert len(rc.read(ni / 2 - 2)) == ni / 2 - 2
		
		# END read chunks
		task._assert(ni / 4, ni)			# read two times, got 4 processing steps
		assert p.num_tasks() == null_tasks	# depleted
		
		# but this only hits if we want too many items, if we want less, it could 
		# still do too much - hence we set the min_count to the same number to enforce
		# at least ni / 4 items to be preocessed, no matter what we request
		task.reset(make_iter())
		task.min_count = None
		rc = p.add_task(task)
		for i in range(ni):
			assert rc.read(1)[0] == i
		# END pull individual items
		# too many processing counts ;)
		task._assert(ni, ni)
		assert p.num_tasks() == 1 + null_tasks
		assert p.del_task(task) is p		# del manually this time
		assert p.num_tasks() == null_tasks
		
		# now with we set the minimum count to reduce the number of processing counts
		task.reset(make_iter())
		task.min_count = ni / 4
		rc = p.add_task(task)
		for i in range(ni):
			assert rc.read(1)[0] == i
		# END for each item
		task._assert(ni / 4, ni)
		del(rc)
		assert p.num_tasks() == null_tasks
		
	def _assert_async_dependent_tasks(self, p):
		pass
	
	def test_base(self):
		p = ThreadPool()
		
		# default pools have no workers
		assert p.size() == 0
		
		# increase and decrease the size
		for i in range(self.max_threads):
			p.set_size(i)
			assert p.size() == i
		for i in range(self.max_threads, -1, -1):
			p.set_size(i)
			assert p.size() == i
			
		# SINGLE TASK SERIAL SYNC MODE
		##############################
		# put a few unrelated tasks that we forget about
		urc1 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None))
		urc2 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None))
		assert p.num_tasks() == 2
		self._assert_sync_single_task(p)
		assert p.num_tasks() == 2
		del(urc1)
		del(urc2)
		assert p.num_tasks() == 0
		
		
		# DEPENDENT TASKS SERIAL
		########################
		self._assert_async_dependent_tasks(p)
		
		
		# SINGLE TASK THREADED SYNC MODE
		################################
		# step one gear up - just one thread for now.
		num_threads = len(threading.enumerate())
		p.set_size(1)
		assert len(threading.enumerate()) == num_threads + 1
		# deleting the pool stops its threads - just to be sure ;)
		del(p)
		assert len(threading.enumerate()) == num_threads
		
		p = ThreadPool(1)
		assert len(threading.enumerate()) == num_threads + 1
		
		# here we go
		self._assert_sync_single_task(p)
		
		
		
		# SINGLE TASK ASYNC MODE
		########################
		# two threads to compete for a single task
		
		
		# DEPENDENT TASK ASYNC MODE
		###########################
		# self._assert_async_dependent_tasks(p)