summaryrefslogtreecommitdiff
path: root/test/git/async/test_pool.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-09 21:19:54 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-09 21:19:54 +0200
commit257a8a9441fca9a9bc384f673ba86ef5c3f1715d (patch)
treed9b21a498ee7c2fe5f19951228c81214988db00a /test/git/async/test_pool.py
parent365fb14ced88a5571d3287ff1698582ceacd80d6 (diff)
downloadgitpython-257a8a9441fca9a9bc384f673ba86ef5c3f1715d.tar.gz
test: prepared task dependency test, which already helped to find bug in the reference counting mechanism, causing references to the pool to be kepts via cycles
Diffstat (limited to 'test/git/async/test_pool.py')
-rw-r--r--test/git/async/test_pool.py159
1 files changed, 128 insertions, 31 deletions
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
index 202fdb66..2a5e4647 100644
--- a/test/git/async/test_pool.py
+++ b/test/git/async/test_pool.py
@@ -8,15 +8,14 @@ import threading
import time
import sys
-class TestThreadTaskNode(InputIteratorThreadTask):
+class _TestTaskBase(object):
def __init__(self, *args, **kwargs):
- super(TestThreadTaskNode, self).__init__(*args, **kwargs)
+ super(_TestTaskBase, self).__init__(*args, **kwargs)
self.should_fail = False
self.lock = threading.Lock() # yes, can't safely do x = x + 1 :)
self.plock = threading.Lock()
self.item_count = 0
self.process_count = 0
- self._scheduled_items = 0
def do_fun(self, item):
self.lock.acquire()
@@ -32,44 +31,118 @@ class TestThreadTaskNode(InputIteratorThreadTask):
self.plock.acquire()
self.process_count += 1
self.plock.release()
- super(TestThreadTaskNode, self).process(count)
+ super(_TestTaskBase, self).process(count)
def _assert(self, pc, fc, check_scheduled=False):
"""Assert for num process counts (pc) and num function counts (fc)
:return: self"""
- # TODO: fixme
- return self
- self.plock.acquire()
- if self.process_count != pc:
- print self.process_count, pc
- assert self.process_count == pc
- self.plock.release()
self.lock.acquire()
if self.item_count != fc:
print self.item_count, fc
assert self.item_count == fc
self.lock.release()
- # if we read all, we can't really use scheduled items
- if check_scheduled:
- assert self._scheduled_items == 0
- assert not self.error()
return self
+
+class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask):
+ pass
class TestThreadFailureNode(TestThreadTaskNode):
"""Fails after X items"""
+ def __init__(self, *args, **kwargs):
+ self.fail_after = kwargs.pop('fail_after')
+ super(TestThreadFailureNode, self).__init__(*args, **kwargs)
+ def do_fun(self, item):
+ item = TestThreadTaskNode.do_fun(self, item)
+ if self.item_count > self.fail_after:
+ raise AssertionError("Simulated failure after processing %i items" % self.fail_after)
+ return item
+
+
+class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask):
+ """Apply a transformation on items read from an input channel"""
+
+ def do_fun(self, item):
+ """return tuple(i, i*2)"""
+ item = super(TestThreadInputChannelTaskNode, self).do_fun(item)
+ if isinstance(item, tuple):
+ i = item[0]
+ return item + (i * self.id, )
+ else:
+ return (item, item * self.id)
+ # END handle tuple
+
+
+class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask):
+ """An input channel task, which verifies the result of its input channels,
+ should be last in the chain.
+ Id must be int"""
+
+ def do_fun(self, item):
+ """return tuple(i, i*2)"""
+ item = super(TestThreadInputChannelTaskNode, self).do_fun(item)
+
+ # make sure the computation order matches
+ assert isinstance(item, tuple)
+
+ base = item[0]
+ for num in item[1:]:
+ assert num == base * 2
+ base = num
+ # END verify order
+
+ return item
+
+
class TestThreadPool(TestBase):
max_threads = cpu_count()
- def _add_triple_task(self, p):
- """Add a triplet of feeder, transformer and finalizer to the pool, like
- t1 -> t2 -> t3, return all 3 return channels in order"""
- # t1 = TestThreadTaskNode(make_task(), 'iterator', None)
- # TODO:
+ def _add_task_chain(self, p, ni, count=1):
+ """Create a task chain of feeder, count transformers and order verifcator
+ to the pool p, like t1 -> t2 -> t3
+ :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))"""
+ nt = p.num_tasks()
+
+ feeder = self._make_iterator_task(ni)
+ frc = p.add_task(feeder)
+
+ assert p.num_tasks() == nt + 1
+
+ rcs = [frc]
+ tasks = [feeder]
+
+ inrc = frc
+ for tc in xrange(count):
+ t = TestThreadInputChannelTaskNode(inrc, tc, None)
+ t.fun = t.do_fun
+ inrc = p.add_task(t)
+
+ tasks.append(t)
+ rcs.append(inrc)
+ assert p.num_tasks() == nt + 2 + tc
+ # END create count transformers
+
+ verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None)
+ verifier.fun = verifier.do_fun
+ vrc = p.add_task(verifier)
+
+ assert p.num_tasks() == nt + tc + 3
+
+ tasks.append(verifier)
+ rcs.append(vrc)
+ return tasks, rcs
+
+ def _make_iterator_task(self, ni, taskcls=TestThreadTaskNode, **kwargs):
+ """:return: task which yields ni items
+ :param taskcls: the actual iterator type to use
+ :param **kwargs: additional kwargs to be passed to the task"""
+ t = taskcls(iter(range(ni)), 'iterator', None, **kwargs)
+ t.fun = t.do_fun
+ return t
def _assert_single_task(self, p, async=False):
"""Performs testing in a synchronized environment"""
@@ -82,11 +155,7 @@ class TestThreadPool(TestBase):
assert ni % 2 == 0, "ni needs to be dividable by 2"
assert ni % 4 == 0, "ni needs to be dividable by 4"
- def make_task():
- t = TestThreadTaskNode(iter(range(ni)), 'iterator', None)
- t.fun = t.do_fun
- return t
- # END utility
+ make_task = lambda *args, **kwargs: self._make_iterator_task(ni, *args, **kwargs)
task = make_task()
@@ -252,15 +321,44 @@ class TestThreadPool(TestBase):
# test failure after ni / 2 items
# This makes sure it correctly closes the channel on failure to prevent blocking
+ nri = ni/2
+ task = make_task(TestThreadFailureNode, fail_after=ni/2)
+ rc = p.add_task(task)
+ assert len(rc.read()) == nri
+ assert task.is_done()
+ assert isinstance(task.error(), AssertionError)
- def _assert_async_dependent_tasks(self, p):
+ def _assert_async_dependent_tasks(self, pool):
# includes failure in center task, 'recursive' orphan cleanup
# This will also verify that the channel-close mechanism works
# t1 -> t2 -> t3
# t1 -> x -> t3
- pass
+ null_tasks = pool.num_tasks()
+ ni = 100
+ count = 1
+ make_task = lambda *args, **kwargs: self._add_task_chain(pool, ni, count, *args, **kwargs)
+
+ ts, rcs = make_task()
+ assert len(ts) == count + 2
+ assert len(rcs) == count + 2
+ assert pool.num_tasks() == null_tasks + len(ts)
+ print pool._tasks.nodes
+
+
+ # in the end, we expect all tasks to be gone, automatically
+
+
+
+ # order of deletion matters - just keep the end, then delete
+ final_rc = rcs[-1]
+ del(ts)
+ del(rcs)
+ del(final_rc)
+ assert pool.num_tasks() == null_tasks
+
+
@terminate_threads
def test_base(self):
@@ -301,8 +399,8 @@ class TestThreadPool(TestBase):
assert p.num_tasks() == 0
- # DEPENDENT TASKS SERIAL
- ########################
+ # DEPENDENT TASKS SYNC MODE
+ ###########################
self._assert_async_dependent_tasks(p)
@@ -311,12 +409,11 @@ class TestThreadPool(TestBase):
# step one gear up - just one thread for now.
p.set_size(1)
assert p.size() == 1
- print len(threading.enumerate()), num_threads
assert len(threading.enumerate()) == num_threads + 1
# deleting the pool stops its threads - just to be sure ;)
# Its not synchronized, hence we wait a moment
del(p)
- time.sleep(0.25)
+ time.sleep(0.05)
assert len(threading.enumerate()) == num_threads
p = ThreadPool(1)