summaryrefslogtreecommitdiff
path: root/test/git/async/test_pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/git/async/test_pool.py')
-rw-r--r--test/git/async/test_pool.py51
1 files changed, 29 insertions, 22 deletions
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
index 3077dc32..82947988 100644
--- a/test/git/async/test_pool.py
+++ b/test/git/async/test_pool.py
@@ -6,14 +6,17 @@ from git.async.thread import terminate_threads
from git.async.util import cpu_count
import threading
import time
+import sys
class TestThreadTaskNode(InputIteratorThreadTask):
def __init__(self, *args, **kwargs):
super(TestThreadTaskNode, self).__init__(*args, **kwargs)
- self.reset(self._iterator)
self.should_fail = False
self.lock = threading.Lock() # yes, can't safely do x = x + 1 :)
self.plock = threading.Lock()
+ self.item_count = 0
+ self.process_count = 0
+ self._scheduled_items = 0
def do_fun(self, item):
self.lock.acquire()
@@ -23,13 +26,6 @@ class TestThreadTaskNode(InputIteratorThreadTask):
raise AssertionError("I am failing just for the fun of it")
return item
- def reset(self, iterator):
- self.process_count = 0
- self.item_count = 0
- self._exc = None
- self._iterator = iterator
- self._done = False
-
def process(self, count=1):
# must do it first, otherwise we might read and check results before
# the thread gets here :). Its a lesson !
@@ -68,7 +64,7 @@ class TestThreadPool(TestBase):
def _add_triple_task(self, p):
"""Add a triplet of feeder, transformer and finalizer to the pool, like
t1 -> t2 -> t3, return all 3 return channels in order"""
- t1 = TestThreadTaskNode(make_iter(), 'iterator', None)
+ # t1 = TestThreadTaskNode(make_task(), 'iterator', None)
# TODO:
def _assert_single_task(self, p, async=False):
@@ -81,12 +77,13 @@ class TestThreadPool(TestBase):
assert ni % 2 == 0, "ni needs to be dividable by 2"
assert ni % 4 == 0, "ni needs to be dividable by 4"
- def make_iter():
- return iter(range(ni))
+ def make_task():
+ t = TestThreadTaskNode(iter(range(ni)), 'iterator', None)
+ t.fun = t.do_fun
+ return t
# END utility
- task = TestThreadTaskNode(make_iter(), 'iterator', None)
- task.fun = task.do_fun
+ task = make_task()
assert p.num_tasks() == null_tasks
rc = p.add_task(task)
@@ -104,8 +101,9 @@ class TestThreadPool(TestBase):
# as the task is done, it should have been removed - we have read everything
assert task.is_done()
+ del(rc)
assert p.num_tasks() == null_tasks
- task.reset(make_iter())
+ task = make_task()
# pull individual items
rc = p.add_task(task)
@@ -126,14 +124,14 @@ class TestThreadPool(TestBase):
# it couldn't yet notice that the input is depleted as we pulled exaclty
# ni items - the next one would remove it. Instead, we delete our channel
# which triggers orphan handling
+ assert not task.is_done()
assert p.num_tasks() == 1 + null_tasks
del(rc)
assert p.num_tasks() == null_tasks
- task.reset(make_iter())
-
# test min count
# if we query 1 item, it will prepare ni / 2
+ task = make_task()
task.min_count = ni / 2
rc = p.add_task(task)
print "read(1)"
@@ -149,6 +147,7 @@ class TestThreadPool(TestBase):
print "read(%i)" % nri
items = rc.read(nri)
assert len(items) == nri
+ p.del_task(task)
assert p.num_tasks() == null_tasks
task._assert(2, ni) # two chunks, ni calls
@@ -158,31 +157,36 @@ class TestThreadPool(TestBase):
# test chunking
# we always want 4 chunks, these could go to individual nodes
- task.reset(make_iter())
+ task = make_task()
+ task.min_count = ni / 2 # restore previous value
task.max_chunksize = ni / 4 # 4 chunks
rc = p.add_task(task)
+
# must read a specific item count
# count is still at ni / 2 - here we want more than that
# 2 steps with n / 4 items, + 1 step with n/4 items to get + 2
nri = ni / 2 + 2
- print "read(%i)" % nri
+ print "read(%i) chunksize set" % nri
items = rc.read(nri)
assert len(items) == nri
# have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing
# ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing
nri = ni / 2 - 2
- print "read(%i)" % nri
+ print "read(%i) chunksize set" % nri
items = rc.read(nri)
assert len(items) == nri
task._assert( 5, ni)
+ assert task.is_done()
+ del(rc)
assert p.num_tasks() == null_tasks # depleted
# but this only hits if we want too many items, if we want less, it could
# still do too much - hence we set the min_count to the same number to enforce
# at least ni / 4 items to be preocessed, no matter what we request
- task.reset(make_iter())
+ task = make_task()
task.min_count = None
+ task.max_chunksize = ni / 4 # match previous setup
rc = p.add_task(task)
st = time.time()
print "read(1) * %i, chunksize set" % ni
@@ -203,8 +207,9 @@ class TestThreadPool(TestBase):
assert p.num_tasks() == null_tasks
# now with we set the minimum count to reduce the number of processing counts
- task.reset(make_iter())
+ task = make_task()
task.min_count = ni / 4
+ task.max_chunksize = ni / 4 # match previous setup
rc = p.add_task(task)
print "read(1) * %i, min_count%i + chunksize" % (ni, task.min_count)
for i in range(ni):
@@ -220,13 +225,15 @@ class TestThreadPool(TestBase):
# test failure
# on failure, the processing stops and the task is finished, keeping
# his error for later
- task.reset(make_iter())
+ task = make_task()
task.should_fail = True
rc = p.add_task(task)
print "read(0) with failure"
assert len(rc.read()) == 0 # failure on first item
print "done with everything"
assert isinstance(task.error(), AssertionError)
+ assert task.is_done() # on error, its marked done as well
+ del(rc)
assert p.num_tasks() == null_tasks