diff options
Diffstat (limited to 'test/git/async/test_pool.py')
-rw-r--r-- | test/git/async/test_pool.py | 51 |
1 files changed, 29 insertions, 22 deletions
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 3077dc32..82947988 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -6,14 +6,17 @@ from git.async.thread import terminate_threads from git.async.util import cpu_count import threading import time +import sys class TestThreadTaskNode(InputIteratorThreadTask): def __init__(self, *args, **kwargs): super(TestThreadTaskNode, self).__init__(*args, **kwargs) - self.reset(self._iterator) self.should_fail = False self.lock = threading.Lock() # yes, can't safely do x = x + 1 :) self.plock = threading.Lock() + self.item_count = 0 + self.process_count = 0 + self._scheduled_items = 0 def do_fun(self, item): self.lock.acquire() @@ -23,13 +26,6 @@ class TestThreadTaskNode(InputIteratorThreadTask): raise AssertionError("I am failing just for the fun of it") return item - def reset(self, iterator): - self.process_count = 0 - self.item_count = 0 - self._exc = None - self._iterator = iterator - self._done = False - def process(self, count=1): # must do it first, otherwise we might read and check results before # the thread gets here :). Its a lesson ! @@ -68,7 +64,7 @@ class TestThreadPool(TestBase): def _add_triple_task(self, p): """Add a triplet of feeder, transformer and finalizer to the pool, like t1 -> t2 -> t3, return all 3 return channels in order""" - t1 = TestThreadTaskNode(make_iter(), 'iterator', None) + # t1 = TestThreadTaskNode(make_task(), 'iterator', None) # TODO: def _assert_single_task(self, p, async=False): @@ -81,12 +77,13 @@ class TestThreadPool(TestBase): assert ni % 2 == 0, "ni needs to be dividable by 2" assert ni % 4 == 0, "ni needs to be dividable by 4" - def make_iter(): - return iter(range(ni)) + def make_task(): + t = TestThreadTaskNode(iter(range(ni)), 'iterator', None) + t.fun = t.do_fun + return t # END utility - task = TestThreadTaskNode(make_iter(), 'iterator', None) - task.fun = task.do_fun + task = make_task() assert p.num_tasks() == null_tasks rc = p.add_task(task) @@ -104,8 +101,9 @@ class TestThreadPool(TestBase): # as the task is done, it should have been removed - we have read everything assert task.is_done() + del(rc) assert p.num_tasks() == null_tasks - task.reset(make_iter()) + task = make_task() # pull individual items rc = p.add_task(task) @@ -126,14 +124,14 @@ class TestThreadPool(TestBase): # it couldn't yet notice that the input is depleted as we pulled exaclty # ni items - the next one would remove it. Instead, we delete our channel # which triggers orphan handling + assert not task.is_done() assert p.num_tasks() == 1 + null_tasks del(rc) assert p.num_tasks() == null_tasks - task.reset(make_iter()) - # test min count # if we query 1 item, it will prepare ni / 2 + task = make_task() task.min_count = ni / 2 rc = p.add_task(task) print "read(1)" @@ -149,6 +147,7 @@ class TestThreadPool(TestBase): print "read(%i)" % nri items = rc.read(nri) assert len(items) == nri + p.del_task(task) assert p.num_tasks() == null_tasks task._assert(2, ni) # two chunks, ni calls @@ -158,31 +157,36 @@ class TestThreadPool(TestBase): # test chunking # we always want 4 chunks, these could go to individual nodes - task.reset(make_iter()) + task = make_task() + task.min_count = ni / 2 # restore previous value task.max_chunksize = ni / 4 # 4 chunks rc = p.add_task(task) + # must read a specific item count # count is still at ni / 2 - here we want more than that # 2 steps with n / 4 items, + 1 step with n/4 items to get + 2 nri = ni / 2 + 2 - print "read(%i)" % nri + print "read(%i) chunksize set" % nri items = rc.read(nri) assert len(items) == nri # have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing # ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing nri = ni / 2 - 2 - print "read(%i)" % nri + print "read(%i) chunksize set" % nri items = rc.read(nri) assert len(items) == nri task._assert( 5, ni) + assert task.is_done() + del(rc) assert p.num_tasks() == null_tasks # depleted # but this only hits if we want too many items, if we want less, it could # still do too much - hence we set the min_count to the same number to enforce # at least ni / 4 items to be preocessed, no matter what we request - task.reset(make_iter()) + task = make_task() task.min_count = None + task.max_chunksize = ni / 4 # match previous setup rc = p.add_task(task) st = time.time() print "read(1) * %i, chunksize set" % ni @@ -203,8 +207,9 @@ class TestThreadPool(TestBase): assert p.num_tasks() == null_tasks # now with we set the minimum count to reduce the number of processing counts - task.reset(make_iter()) + task = make_task() task.min_count = ni / 4 + task.max_chunksize = ni / 4 # match previous setup rc = p.add_task(task) print "read(1) * %i, min_count%i + chunksize" % (ni, task.min_count) for i in range(ni): @@ -220,13 +225,15 @@ class TestThreadPool(TestBase): # test failure # on failure, the processing stops and the task is finished, keeping # his error for later - task.reset(make_iter()) + task = make_task() task.should_fail = True rc = p.add_task(task) print "read(0) with failure" assert len(rc.read()) == 0 # failure on first item print "done with everything" assert isinstance(task.error(), AssertionError) + assert task.is_done() # on error, its marked done as well + del(rc) assert p.num_tasks() == null_tasks |