summaryrefslogtreecommitdiff
path: root/test/git/async/test_pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/git/async/test_pool.py')
-rw-r--r--test/git/async/test_pool.py129
1 files changed, 110 insertions, 19 deletions
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
index 3fb55e31..679bab31 100644
--- a/test/git/async/test_pool.py
+++ b/test/git/async/test_pool.py
@@ -9,6 +9,7 @@ import time
import sys
class _TestTaskBase(object):
+ """Note: causes great slowdown due to the required locking of task variables"""
def __init__(self, *args, **kwargs):
super(_TestTaskBase, self).__init__(*args, **kwargs)
self.should_fail = False
@@ -43,7 +44,8 @@ class _TestTaskBase(object):
self.lock.release()
return self
-
+
+
class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask):
pass
@@ -56,18 +58,36 @@ class TestThreadFailureNode(TestThreadTaskNode):
def do_fun(self, item):
item = TestThreadTaskNode.do_fun(self, item)
- if self.item_count > self.fail_after:
- raise AssertionError("Simulated failure after processing %i items" % self.fail_after)
+
+ self.lock.acquire()
+ try:
+ if self.item_count > self.fail_after:
+ raise AssertionError("Simulated failure after processing %i items" % self.fail_after)
+ finally:
+ self.lock.release()
+ # END handle fail after
return item
class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask):
"""Apply a transformation on items read from an input channel"""
+ def __init__(self, *args, **kwargs):
+ self.fail_after = kwargs.pop('fail_after', 0)
+ super(TestThreadInputChannelTaskNode, self).__init__(*args, **kwargs)
def do_fun(self, item):
"""return tuple(i, i*2)"""
item = super(TestThreadInputChannelTaskNode, self).do_fun(item)
- #print "transformer.doit", self.id, item
+
+ # fail after support
+ if self.fail_after:
+ self.lock.acquire()
+ try:
+ if self.item_count > self.fail_after:
+ raise AssertionError("Simulated failure after processing %i items" % self.fail_after)
+ finally:
+ self.lock.release()
+ # END handle fail-after
if isinstance(item, tuple):
i = item[0]
@@ -86,14 +106,12 @@ class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask):
"""return tuple(i, i*2)"""
item = super(TestThreadInputChannelVerifyTaskNode, self).do_fun(item)
- # print "verifier.doit", self.id, item
-
# make sure the computation order matches
assert isinstance(item, tuple), "input was no tuple: %s" % item
base = item[0]
for id, num in enumerate(item[1:]):
- assert num == base * (id), "%i != %i, orig = %s" % (num, base * id+1, str(item))
+ assert num == base * id, "%i != %i, orig = %s" % (num, base * id, str(item))
# END verify order
return item
@@ -104,9 +122,11 @@ class TestThreadPool(TestBase):
max_threads = cpu_count()
- def _add_task_chain(self, p, ni, count=1):
+ def _add_task_chain(self, p, ni, count=1, fail_setup=list()):
"""Create a task chain of feeder, count transformers and order verifcator
to the pool p, like t1 -> t2 -> t3
+ :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would
+ make the third transformer fail after 20 items
:return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))"""
nt = p.num_tasks()
@@ -129,6 +149,11 @@ class TestThreadPool(TestBase):
assert p.num_tasks() == nt + 2 + tc
# END create count transformers
+ # setup failure
+ for id, fail_after in fail_setup:
+ tasks[1+id].fail_after = fail_after
+ # END setup failure
+
verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None)
verifier.fun = verifier.do_fun
vrc = p.add_task(verifier)
@@ -149,7 +174,7 @@ class TestThreadPool(TestBase):
def _assert_single_task(self, p, async=False):
"""Performs testing in a synchronized environment"""
- return # DEBUG TODO: Fixme deactivated it
+ # return # DEBUG TODO: Fixme deactivated it
print >> sys.stderr, "Threadpool: Starting single task (async = %i) with %i threads" % (async, p.size())
null_tasks = p.num_tasks() # in case we had some before
@@ -316,8 +341,6 @@ class TestThreadPool(TestBase):
print "read(0) with failure"
assert len(rc.read()) == 0 # failure on first item
- print >> sys.stderr, "done with everything"
-
assert isinstance(task.error(), AssertionError)
assert task.is_done() # on error, its marked done as well
del(rc)
@@ -332,39 +355,107 @@ class TestThreadPool(TestBase):
assert task.is_done()
assert isinstance(task.error(), AssertionError)
+ print >> sys.stderr, "done with everything"
+
def _assert_async_dependent_tasks(self, pool):
# includes failure in center task, 'recursive' orphan cleanup
# This will also verify that the channel-close mechanism works
# t1 -> t2 -> t3
- # t1 -> x -> t3
+
print >> sys.stderr, "Threadpool: starting async dependency test in %i threads" % pool.size()
null_tasks = pool.num_tasks()
ni = 5000
count = 3
aic = count + 2
make_task = lambda *args, **kwargs: self._add_task_chain(pool, ni, count, *args, **kwargs)
-
ts, rcs = make_task()
assert len(ts) == aic
assert len(rcs) == aic
assert pool.num_tasks() == null_tasks + len(ts)
print pool._tasks.nodes
-
- # read all at once
- print "read(0)"
+ # read(0)
+ #########
st = time.time()
items = rcs[-1].read()
- print "finished read(0)"
elapsed = time.time() - st
- print len(items), ni
assert len(items) == ni
+ del(rcs)
+ assert pool.num_tasks() == 0 # tasks depleted, all done, no handles
+ print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed)
- print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed)
+
+ # read(1)
+ #########
+ ts, rcs = make_task()
+ st = time.time()
+ for i in xrange(ni):
+ items = rcs[-1].read(1)
+ assert len(items) == 1
+ # END for each item to pull
+ elapsed_single = time.time() - st
+ # another read yields nothing, its empty
+ assert len(rcs[-1].read()) == 0
+ print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1) of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed_single, ni / elapsed_single)
+
+
+ # read with min-count size
+ ###########################
+ # must be faster, as it will read ni / 4 chunks
+ # Its enough to set one task, as it will force all others in the chain
+ # to min_size as well.
+ ts, rcs = make_task()
+ assert pool.num_tasks() == len(ts)
+ nri = ni / 4
+ ts[-1].min_count = nri
+ st = time.time()
+ for i in xrange(ni):
+ items = rcs[-1].read(1)
+ assert len(items) == 1
+ # END for each item to read
+ elapsed_minsize = time.time() - st
+ # its empty
+ assert len(rcs[-1].read()) == 0
+ print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1), min_size=%i, of %i dependent in %f s ( %i items / s )" % (ni, nri, aic, elapsed_minsize, ni / elapsed_minsize)
+
+ # it should have been a bit faster at least, and most of the time it is
+ # Sometimes, its not, mainly because:
+ # * The test tasks lock a lot, hence they slow down the system
+ # * Each read will still trigger the pool to evaluate, causing some overhead
+ # even though there are enough items on the queue in that case. Keeping
+ # track of the scheduled items helped there, but it caused further inacceptable
+ # slowdown
+ # assert elapsed_minsize < elapsed_single
+
+
+ # read with failure
+ ###################
+ # it should recover and give at least fail_after items
+ # t1 -> x -> t3
+ fail_after = ni/2
+ ts, rcs = make_task(fail_setup=[(0, fail_after)])
+ items = rcs[-1].read()
+ assert len(items) == fail_after
+ # MULTI-POOL
+ # If two pools are connected, this shold work as well.
+ # The second one has just one more thread
+ if False:
+ p2 = ThreadPool(1)
+ assert p2.size() == 1
+ p2ts, p2rcs = self._add_task_chain(p2, ni, count)
+
+ ts, rcs = make_task()
+
+
+ del(p2ts)
+ del(p2rcs)
+ assert p2.num_tasks() == 0
+ del(p2)
+
# in the end, we expect all tasks to be gone, automatically
# order of deletion doesnt matter
del(ts)