diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-09 21:19:54 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-09 21:19:54 +0200 |
commit | 257a8a9441fca9a9bc384f673ba86ef5c3f1715d (patch) | |
tree | d9b21a498ee7c2fe5f19951228c81214988db00a /test/git/async/test_pool.py | |
parent | 365fb14ced88a5571d3287ff1698582ceacd80d6 (diff) | |
download | gitpython-257a8a9441fca9a9bc384f673ba86ef5c3f1715d.tar.gz |
test: prepared task dependency test, which already helped to find bug in the reference counting mechanism, causing references to the pool to be kepts via cycles
Diffstat (limited to 'test/git/async/test_pool.py')
-rw-r--r-- | test/git/async/test_pool.py | 159 |
1 files changed, 128 insertions, 31 deletions
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 202fdb66..2a5e4647 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -8,15 +8,14 @@ import threading import time import sys -class TestThreadTaskNode(InputIteratorThreadTask): +class _TestTaskBase(object): def __init__(self, *args, **kwargs): - super(TestThreadTaskNode, self).__init__(*args, **kwargs) + super(_TestTaskBase, self).__init__(*args, **kwargs) self.should_fail = False self.lock = threading.Lock() # yes, can't safely do x = x + 1 :) self.plock = threading.Lock() self.item_count = 0 self.process_count = 0 - self._scheduled_items = 0 def do_fun(self, item): self.lock.acquire() @@ -32,44 +31,118 @@ class TestThreadTaskNode(InputIteratorThreadTask): self.plock.acquire() self.process_count += 1 self.plock.release() - super(TestThreadTaskNode, self).process(count) + super(_TestTaskBase, self).process(count) def _assert(self, pc, fc, check_scheduled=False): """Assert for num process counts (pc) and num function counts (fc) :return: self""" - # TODO: fixme - return self - self.plock.acquire() - if self.process_count != pc: - print self.process_count, pc - assert self.process_count == pc - self.plock.release() self.lock.acquire() if self.item_count != fc: print self.item_count, fc assert self.item_count == fc self.lock.release() - # if we read all, we can't really use scheduled items - if check_scheduled: - assert self._scheduled_items == 0 - assert not self.error() return self + +class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask): + pass class TestThreadFailureNode(TestThreadTaskNode): """Fails after X items""" + def __init__(self, *args, **kwargs): + self.fail_after = kwargs.pop('fail_after') + super(TestThreadFailureNode, self).__init__(*args, **kwargs) + def do_fun(self, item): + item = TestThreadTaskNode.do_fun(self, item) + if self.item_count > self.fail_after: + raise AssertionError("Simulated failure after processing %i items" % self.fail_after) + return item + + +class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask): + """Apply a transformation on items read from an input channel""" + + def do_fun(self, item): + """return tuple(i, i*2)""" + item = super(TestThreadInputChannelTaskNode, self).do_fun(item) + if isinstance(item, tuple): + i = item[0] + return item + (i * self.id, ) + else: + return (item, item * self.id) + # END handle tuple + + +class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask): + """An input channel task, which verifies the result of its input channels, + should be last in the chain. + Id must be int""" + + def do_fun(self, item): + """return tuple(i, i*2)""" + item = super(TestThreadInputChannelTaskNode, self).do_fun(item) + + # make sure the computation order matches + assert isinstance(item, tuple) + + base = item[0] + for num in item[1:]: + assert num == base * 2 + base = num + # END verify order + + return item + + class TestThreadPool(TestBase): max_threads = cpu_count() - def _add_triple_task(self, p): - """Add a triplet of feeder, transformer and finalizer to the pool, like - t1 -> t2 -> t3, return all 3 return channels in order""" - # t1 = TestThreadTaskNode(make_task(), 'iterator', None) - # TODO: + def _add_task_chain(self, p, ni, count=1): + """Create a task chain of feeder, count transformers and order verifcator + to the pool p, like t1 -> t2 -> t3 + :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))""" + nt = p.num_tasks() + + feeder = self._make_iterator_task(ni) + frc = p.add_task(feeder) + + assert p.num_tasks() == nt + 1 + + rcs = [frc] + tasks = [feeder] + + inrc = frc + for tc in xrange(count): + t = TestThreadInputChannelTaskNode(inrc, tc, None) + t.fun = t.do_fun + inrc = p.add_task(t) + + tasks.append(t) + rcs.append(inrc) + assert p.num_tasks() == nt + 2 + tc + # END create count transformers + + verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None) + verifier.fun = verifier.do_fun + vrc = p.add_task(verifier) + + assert p.num_tasks() == nt + tc + 3 + + tasks.append(verifier) + rcs.append(vrc) + return tasks, rcs + + def _make_iterator_task(self, ni, taskcls=TestThreadTaskNode, **kwargs): + """:return: task which yields ni items + :param taskcls: the actual iterator type to use + :param **kwargs: additional kwargs to be passed to the task""" + t = taskcls(iter(range(ni)), 'iterator', None, **kwargs) + t.fun = t.do_fun + return t def _assert_single_task(self, p, async=False): """Performs testing in a synchronized environment""" @@ -82,11 +155,7 @@ class TestThreadPool(TestBase): assert ni % 2 == 0, "ni needs to be dividable by 2" assert ni % 4 == 0, "ni needs to be dividable by 4" - def make_task(): - t = TestThreadTaskNode(iter(range(ni)), 'iterator', None) - t.fun = t.do_fun - return t - # END utility + make_task = lambda *args, **kwargs: self._make_iterator_task(ni, *args, **kwargs) task = make_task() @@ -252,15 +321,44 @@ class TestThreadPool(TestBase): # test failure after ni / 2 items # This makes sure it correctly closes the channel on failure to prevent blocking + nri = ni/2 + task = make_task(TestThreadFailureNode, fail_after=ni/2) + rc = p.add_task(task) + assert len(rc.read()) == nri + assert task.is_done() + assert isinstance(task.error(), AssertionError) - def _assert_async_dependent_tasks(self, p): + def _assert_async_dependent_tasks(self, pool): # includes failure in center task, 'recursive' orphan cleanup # This will also verify that the channel-close mechanism works # t1 -> t2 -> t3 # t1 -> x -> t3 - pass + null_tasks = pool.num_tasks() + ni = 100 + count = 1 + make_task = lambda *args, **kwargs: self._add_task_chain(pool, ni, count, *args, **kwargs) + + ts, rcs = make_task() + assert len(ts) == count + 2 + assert len(rcs) == count + 2 + assert pool.num_tasks() == null_tasks + len(ts) + print pool._tasks.nodes + + + # in the end, we expect all tasks to be gone, automatically + + + + # order of deletion matters - just keep the end, then delete + final_rc = rcs[-1] + del(ts) + del(rcs) + del(final_rc) + assert pool.num_tasks() == null_tasks + + @terminate_threads def test_base(self): @@ -301,8 +399,8 @@ class TestThreadPool(TestBase): assert p.num_tasks() == 0 - # DEPENDENT TASKS SERIAL - ######################## + # DEPENDENT TASKS SYNC MODE + ########################### self._assert_async_dependent_tasks(p) @@ -311,12 +409,11 @@ class TestThreadPool(TestBase): # step one gear up - just one thread for now. p.set_size(1) assert p.size() == 1 - print len(threading.enumerate()), num_threads assert len(threading.enumerate()) == num_threads + 1 # deleting the pool stops its threads - just to be sure ;) # Its not synchronized, hence we wait a moment del(p) - time.sleep(0.25) + time.sleep(0.05) assert len(threading.enumerate()) == num_threads p = ThreadPool(1) |