diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-08 18:20:12 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-08 18:20:12 +0200 |
commit | e5c0002d069382db1768349bf0c5ff40aafbf140 (patch) | |
tree | 0af5a850aa240aff9bc90b0d0e4a31ab1d1ac9eb /test/git/async/test_pool.py | |
parent | 13dd59ba5b3228820841682b59bad6c22476ff66 (diff) | |
download | gitpython-e5c0002d069382db1768349bf0c5ff40aafbf140.tar.gz |
Revised task deletion works well, adjusted test to be creating new tasks all the time instead of reusing its own one, it was somewhat hard to manage its state over time and could cause bugs. It works okay, but it occasionally hangs, it appears to be an empty queue, have to gradually put certain things back in, although in the current mode of operation, it should never have empty queues from the pool to the user
Diffstat (limited to 'test/git/async/test_pool.py')
-rw-r--r-- | test/git/async/test_pool.py | 51 |
1 files changed, 29 insertions, 22 deletions
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 3077dc32..82947988 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -6,14 +6,17 @@ from git.async.thread import terminate_threads from git.async.util import cpu_count import threading import time +import sys class TestThreadTaskNode(InputIteratorThreadTask): def __init__(self, *args, **kwargs): super(TestThreadTaskNode, self).__init__(*args, **kwargs) - self.reset(self._iterator) self.should_fail = False self.lock = threading.Lock() # yes, can't safely do x = x + 1 :) self.plock = threading.Lock() + self.item_count = 0 + self.process_count = 0 + self._scheduled_items = 0 def do_fun(self, item): self.lock.acquire() @@ -23,13 +26,6 @@ class TestThreadTaskNode(InputIteratorThreadTask): raise AssertionError("I am failing just for the fun of it") return item - def reset(self, iterator): - self.process_count = 0 - self.item_count = 0 - self._exc = None - self._iterator = iterator - self._done = False - def process(self, count=1): # must do it first, otherwise we might read and check results before # the thread gets here :). Its a lesson ! @@ -68,7 +64,7 @@ class TestThreadPool(TestBase): def _add_triple_task(self, p): """Add a triplet of feeder, transformer and finalizer to the pool, like t1 -> t2 -> t3, return all 3 return channels in order""" - t1 = TestThreadTaskNode(make_iter(), 'iterator', None) + # t1 = TestThreadTaskNode(make_task(), 'iterator', None) # TODO: def _assert_single_task(self, p, async=False): @@ -81,12 +77,13 @@ class TestThreadPool(TestBase): assert ni % 2 == 0, "ni needs to be dividable by 2" assert ni % 4 == 0, "ni needs to be dividable by 4" - def make_iter(): - return iter(range(ni)) + def make_task(): + t = TestThreadTaskNode(iter(range(ni)), 'iterator', None) + t.fun = t.do_fun + return t # END utility - task = TestThreadTaskNode(make_iter(), 'iterator', None) - task.fun = task.do_fun + task = make_task() assert p.num_tasks() == null_tasks rc = p.add_task(task) @@ -104,8 +101,9 @@ class TestThreadPool(TestBase): # as the task is done, it should have been removed - we have read everything assert task.is_done() + del(rc) assert p.num_tasks() == null_tasks - task.reset(make_iter()) + task = make_task() # pull individual items rc = p.add_task(task) @@ -126,14 +124,14 @@ class TestThreadPool(TestBase): # it couldn't yet notice that the input is depleted as we pulled exaclty # ni items - the next one would remove it. Instead, we delete our channel # which triggers orphan handling + assert not task.is_done() assert p.num_tasks() == 1 + null_tasks del(rc) assert p.num_tasks() == null_tasks - task.reset(make_iter()) - # test min count # if we query 1 item, it will prepare ni / 2 + task = make_task() task.min_count = ni / 2 rc = p.add_task(task) print "read(1)" @@ -149,6 +147,7 @@ class TestThreadPool(TestBase): print "read(%i)" % nri items = rc.read(nri) assert len(items) == nri + p.del_task(task) assert p.num_tasks() == null_tasks task._assert(2, ni) # two chunks, ni calls @@ -158,31 +157,36 @@ class TestThreadPool(TestBase): # test chunking # we always want 4 chunks, these could go to individual nodes - task.reset(make_iter()) + task = make_task() + task.min_count = ni / 2 # restore previous value task.max_chunksize = ni / 4 # 4 chunks rc = p.add_task(task) + # must read a specific item count # count is still at ni / 2 - here we want more than that # 2 steps with n / 4 items, + 1 step with n/4 items to get + 2 nri = ni / 2 + 2 - print "read(%i)" % nri + print "read(%i) chunksize set" % nri items = rc.read(nri) assert len(items) == nri # have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing # ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing nri = ni / 2 - 2 - print "read(%i)" % nri + print "read(%i) chunksize set" % nri items = rc.read(nri) assert len(items) == nri task._assert( 5, ni) + assert task.is_done() + del(rc) assert p.num_tasks() == null_tasks # depleted # but this only hits if we want too many items, if we want less, it could # still do too much - hence we set the min_count to the same number to enforce # at least ni / 4 items to be preocessed, no matter what we request - task.reset(make_iter()) + task = make_task() task.min_count = None + task.max_chunksize = ni / 4 # match previous setup rc = p.add_task(task) st = time.time() print "read(1) * %i, chunksize set" % ni @@ -203,8 +207,9 @@ class TestThreadPool(TestBase): assert p.num_tasks() == null_tasks # now with we set the minimum count to reduce the number of processing counts - task.reset(make_iter()) + task = make_task() task.min_count = ni / 4 + task.max_chunksize = ni / 4 # match previous setup rc = p.add_task(task) print "read(1) * %i, min_count%i + chunksize" % (ni, task.min_count) for i in range(ni): @@ -220,13 +225,15 @@ class TestThreadPool(TestBase): # test failure # on failure, the processing stops and the task is finished, keeping # his error for later - task.reset(make_iter()) + task = make_task() task.should_fail = True rc = p.add_task(task) print "read(0) with failure" assert len(rc.read()) == 0 # failure on first item print "done with everything" assert isinstance(task.error(), AssertionError) + assert task.is_done() # on error, its marked done as well + del(rc) assert p.num_tasks() == null_tasks |