From 01eac1a959c1fa5894a86bf11e6b92f96762bdd8 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 12:06:16 +0200 Subject: Added more dependency task tests, especially the single-reads are not yet fully deterministic as tasks still run into the problem that they try to write into a closed channel, it was closed by one of their task-mates who didn't know someone else was still computing --- test/git/async/test_pool.py | 129 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 110 insertions(+), 19 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 3fb55e31..679bab31 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -9,6 +9,7 @@ import time import sys class _TestTaskBase(object): + """Note: causes great slowdown due to the required locking of task variables""" def __init__(self, *args, **kwargs): super(_TestTaskBase, self).__init__(*args, **kwargs) self.should_fail = False @@ -43,7 +44,8 @@ class _TestTaskBase(object): self.lock.release() return self - + + class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask): pass @@ -56,18 +58,36 @@ class TestThreadFailureNode(TestThreadTaskNode): def do_fun(self, item): item = TestThreadTaskNode.do_fun(self, item) - if self.item_count > self.fail_after: - raise AssertionError("Simulated failure after processing %i items" % self.fail_after) + + self.lock.acquire() + try: + if self.item_count > self.fail_after: + raise AssertionError("Simulated failure after processing %i items" % self.fail_after) + finally: + self.lock.release() + # END handle fail after return item class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask): """Apply a transformation on items read from an input channel""" + def __init__(self, *args, **kwargs): + self.fail_after = kwargs.pop('fail_after', 0) + super(TestThreadInputChannelTaskNode, self).__init__(*args, **kwargs) def do_fun(self, item): """return tuple(i, i*2)""" item = super(TestThreadInputChannelTaskNode, self).do_fun(item) - #print "transformer.doit", self.id, item + + # fail after support + if self.fail_after: + self.lock.acquire() + try: + if self.item_count > self.fail_after: + raise AssertionError("Simulated failure after processing %i items" % self.fail_after) + finally: + self.lock.release() + # END handle fail-after if isinstance(item, tuple): i = item[0] @@ -86,14 +106,12 @@ class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask): """return tuple(i, i*2)""" item = super(TestThreadInputChannelVerifyTaskNode, self).do_fun(item) - # print "verifier.doit", self.id, item - # make sure the computation order matches assert isinstance(item, tuple), "input was no tuple: %s" % item base = item[0] for id, num in enumerate(item[1:]): - assert num == base * (id), "%i != %i, orig = %s" % (num, base * id+1, str(item)) + assert num == base * id, "%i != %i, orig = %s" % (num, base * id, str(item)) # END verify order return item @@ -104,9 +122,11 @@ class TestThreadPool(TestBase): max_threads = cpu_count() - def _add_task_chain(self, p, ni, count=1): + def _add_task_chain(self, p, ni, count=1, fail_setup=list()): """Create a task chain of feeder, count transformers and order verifcator to the pool p, like t1 -> t2 -> t3 + :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would + make the third transformer fail after 20 items :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))""" nt = p.num_tasks() @@ -129,6 +149,11 @@ class TestThreadPool(TestBase): assert p.num_tasks() == nt + 2 + tc # END create count transformers + # setup failure + for id, fail_after in fail_setup: + tasks[1+id].fail_after = fail_after + # END setup failure + verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None) verifier.fun = verifier.do_fun vrc = p.add_task(verifier) @@ -149,7 +174,7 @@ class TestThreadPool(TestBase): def _assert_single_task(self, p, async=False): """Performs testing in a synchronized environment""" - return # DEBUG TODO: Fixme deactivated it + # return # DEBUG TODO: Fixme deactivated it print >> sys.stderr, "Threadpool: Starting single task (async = %i) with %i threads" % (async, p.size()) null_tasks = p.num_tasks() # in case we had some before @@ -316,8 +341,6 @@ class TestThreadPool(TestBase): print "read(0) with failure" assert len(rc.read()) == 0 # failure on first item - print >> sys.stderr, "done with everything" - assert isinstance(task.error(), AssertionError) assert task.is_done() # on error, its marked done as well del(rc) @@ -332,39 +355,107 @@ class TestThreadPool(TestBase): assert task.is_done() assert isinstance(task.error(), AssertionError) + print >> sys.stderr, "done with everything" + def _assert_async_dependent_tasks(self, pool): # includes failure in center task, 'recursive' orphan cleanup # This will also verify that the channel-close mechanism works # t1 -> t2 -> t3 - # t1 -> x -> t3 + print >> sys.stderr, "Threadpool: starting async dependency test in %i threads" % pool.size() null_tasks = pool.num_tasks() ni = 5000 count = 3 aic = count + 2 make_task = lambda *args, **kwargs: self._add_task_chain(pool, ni, count, *args, **kwargs) - ts, rcs = make_task() assert len(ts) == aic assert len(rcs) == aic assert pool.num_tasks() == null_tasks + len(ts) print pool._tasks.nodes - - # read all at once - print "read(0)" + # read(0) + ######### st = time.time() items = rcs[-1].read() - print "finished read(0)" elapsed = time.time() - st - print len(items), ni assert len(items) == ni + del(rcs) + assert pool.num_tasks() == 0 # tasks depleted, all done, no handles + print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed) - print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed) + + # read(1) + ######### + ts, rcs = make_task() + st = time.time() + for i in xrange(ni): + items = rcs[-1].read(1) + assert len(items) == 1 + # END for each item to pull + elapsed_single = time.time() - st + # another read yields nothing, its empty + assert len(rcs[-1].read()) == 0 + print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1) of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed_single, ni / elapsed_single) + + + # read with min-count size + ########################### + # must be faster, as it will read ni / 4 chunks + # Its enough to set one task, as it will force all others in the chain + # to min_size as well. + ts, rcs = make_task() + assert pool.num_tasks() == len(ts) + nri = ni / 4 + ts[-1].min_count = nri + st = time.time() + for i in xrange(ni): + items = rcs[-1].read(1) + assert len(items) == 1 + # END for each item to read + elapsed_minsize = time.time() - st + # its empty + assert len(rcs[-1].read()) == 0 + print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1), min_size=%i, of %i dependent in %f s ( %i items / s )" % (ni, nri, aic, elapsed_minsize, ni / elapsed_minsize) + + # it should have been a bit faster at least, and most of the time it is + # Sometimes, its not, mainly because: + # * The test tasks lock a lot, hence they slow down the system + # * Each read will still trigger the pool to evaluate, causing some overhead + # even though there are enough items on the queue in that case. Keeping + # track of the scheduled items helped there, but it caused further inacceptable + # slowdown + # assert elapsed_minsize < elapsed_single + + + # read with failure + ################### + # it should recover and give at least fail_after items + # t1 -> x -> t3 + fail_after = ni/2 + ts, rcs = make_task(fail_setup=[(0, fail_after)]) + items = rcs[-1].read() + assert len(items) == fail_after + # MULTI-POOL + # If two pools are connected, this shold work as well. + # The second one has just one more thread + if False: + p2 = ThreadPool(1) + assert p2.size() == 1 + p2ts, p2rcs = self._add_task_chain(p2, ni, count) + + ts, rcs = make_task() + + + del(p2ts) + del(p2rcs) + assert p2.num_tasks() == 0 + del(p2) + # in the end, we expect all tasks to be gone, automatically # order of deletion doesnt matter del(ts) -- cgit v1.2.1