From 3323464f85b986cba23176271da92a478b33ab9c Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 00:24:49 +0200 Subject: messy first version of a properly working depth-first graph method, which allows the pool to work as expected. Many more tests need to be added, and there still is a problem with shutdown as sometimes it won't kill all threads, mainly because the process came up with worker threads started, which cannot be --- test/git/async/test_pool.py | 48 +++++++++++++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 15 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 2a5e4647..788ca3bf 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -67,6 +67,8 @@ class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask): def do_fun(self, item): """return tuple(i, i*2)""" item = super(TestThreadInputChannelTaskNode, self).do_fun(item) + #print "transformer.doit", self.id, item + if isinstance(item, tuple): i = item[0] return item + (i * self.id, ) @@ -82,15 +84,16 @@ class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask): def do_fun(self, item): """return tuple(i, i*2)""" - item = super(TestThreadInputChannelTaskNode, self).do_fun(item) + item = super(TestThreadInputChannelVerifyTaskNode, self).do_fun(item) + + # print "verifier.doit", self.id, item # make sure the computation order matches - assert isinstance(item, tuple) + assert isinstance(item, tuple), "input was no tuple: %s" % item base = item[0] - for num in item[1:]: - assert num == base * 2 - base = num + for id, num in enumerate(item[1:]): + assert num == base * (id), "%i != %i, orig = %s" % (num, base * id+1, str(item)) # END verify order return item @@ -146,6 +149,7 @@ class TestThreadPool(TestBase): def _assert_single_task(self, p, async=False): """Performs testing in a synchronized environment""" + return # DEBUG TODO: Fixme deactivated it print >> sys.stderr, "Threadpool: Starting single task (async = %i) with %i threads" % (async, p.size()) null_tasks = p.num_tasks() # in case we had some before @@ -335,33 +339,47 @@ class TestThreadPool(TestBase): # This will also verify that the channel-close mechanism works # t1 -> t2 -> t3 # t1 -> x -> t3 + print >> sys.stderr, "Threadpool: starting async dependency test in %i threads" % pool.size() null_tasks = pool.num_tasks() - ni = 100 - count = 1 + ni = 5000 + count = 3 + aic = count + 2 make_task = lambda *args, **kwargs: self._add_task_chain(pool, ni, count, *args, **kwargs) ts, rcs = make_task() - assert len(ts) == count + 2 - assert len(rcs) == count + 2 + assert len(ts) == aic + assert len(rcs) == aic assert pool.num_tasks() == null_tasks + len(ts) print pool._tasks.nodes - # in the end, we expect all tasks to be gone, automatically + # read all at once + print "read(0)" + st = time.time() + items = rcs[-1].read() + print "finished read(0)" + elapsed = time.time() - st + assert len(items) == ni + print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed) - # order of deletion matters - just keep the end, then delete - final_rc = rcs[-1] + # in the end, we expect all tasks to be gone, automatically + # order of deletion doesnt matter + print "del ts" del(ts) + print "del rcs" del(rcs) - del(final_rc) assert pool.num_tasks() == null_tasks - @terminate_threads + # for some reason, sometimes it has multiple workerthreads already when he + # enters the method ... dunno yet, pools should clean up themselvess + # @terminate_threads def test_base(self): + assert len(threading.enumerate()) == 1 + p = ThreadPool() # default pools have no workers @@ -438,4 +456,4 @@ class TestThreadPool(TestBase): ########################### self._assert_async_dependent_tasks(p) - + print >> sys.stderr, "Done with everything" -- cgit v1.2.1 From cfb278d74ad01f3f1edf5e0ad113974a9555038d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 10:14:32 +0200 Subject: InputChannelTask now has interface for properly handling the reading from the same and different pools --- test/git/async/test_pool.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 788ca3bf..3fb55e31 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -359,6 +359,7 @@ class TestThreadPool(TestBase): items = rcs[-1].read() print "finished read(0)" elapsed = time.time() - st + print len(items), ni assert len(items) == ni print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed) @@ -366,9 +367,7 @@ class TestThreadPool(TestBase): # in the end, we expect all tasks to be gone, automatically # order of deletion doesnt matter - print "del ts" del(ts) - print "del rcs" del(rcs) assert pool.num_tasks() == null_tasks @@ -376,7 +375,7 @@ class TestThreadPool(TestBase): # for some reason, sometimes it has multiple workerthreads already when he # enters the method ... dunno yet, pools should clean up themselvess - # @terminate_threads + #@terminate_threads def test_base(self): assert len(threading.enumerate()) == 1 @@ -457,3 +456,5 @@ class TestThreadPool(TestBase): self._assert_async_dependent_tasks(p) print >> sys.stderr, "Done with everything" + + # TODO: test multi-pool connections -- cgit v1.2.1 From 01eac1a959c1fa5894a86bf11e6b92f96762bdd8 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 12:06:16 +0200 Subject: Added more dependency task tests, especially the single-reads are not yet fully deterministic as tasks still run into the problem that they try to write into a closed channel, it was closed by one of their task-mates who didn't know someone else was still computing --- test/git/async/test_pool.py | 129 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 110 insertions(+), 19 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 3fb55e31..679bab31 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -9,6 +9,7 @@ import time import sys class _TestTaskBase(object): + """Note: causes great slowdown due to the required locking of task variables""" def __init__(self, *args, **kwargs): super(_TestTaskBase, self).__init__(*args, **kwargs) self.should_fail = False @@ -43,7 +44,8 @@ class _TestTaskBase(object): self.lock.release() return self - + + class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask): pass @@ -56,18 +58,36 @@ class TestThreadFailureNode(TestThreadTaskNode): def do_fun(self, item): item = TestThreadTaskNode.do_fun(self, item) - if self.item_count > self.fail_after: - raise AssertionError("Simulated failure after processing %i items" % self.fail_after) + + self.lock.acquire() + try: + if self.item_count > self.fail_after: + raise AssertionError("Simulated failure after processing %i items" % self.fail_after) + finally: + self.lock.release() + # END handle fail after return item class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask): """Apply a transformation on items read from an input channel""" + def __init__(self, *args, **kwargs): + self.fail_after = kwargs.pop('fail_after', 0) + super(TestThreadInputChannelTaskNode, self).__init__(*args, **kwargs) def do_fun(self, item): """return tuple(i, i*2)""" item = super(TestThreadInputChannelTaskNode, self).do_fun(item) - #print "transformer.doit", self.id, item + + # fail after support + if self.fail_after: + self.lock.acquire() + try: + if self.item_count > self.fail_after: + raise AssertionError("Simulated failure after processing %i items" % self.fail_after) + finally: + self.lock.release() + # END handle fail-after if isinstance(item, tuple): i = item[0] @@ -86,14 +106,12 @@ class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask): """return tuple(i, i*2)""" item = super(TestThreadInputChannelVerifyTaskNode, self).do_fun(item) - # print "verifier.doit", self.id, item - # make sure the computation order matches assert isinstance(item, tuple), "input was no tuple: %s" % item base = item[0] for id, num in enumerate(item[1:]): - assert num == base * (id), "%i != %i, orig = %s" % (num, base * id+1, str(item)) + assert num == base * id, "%i != %i, orig = %s" % (num, base * id, str(item)) # END verify order return item @@ -104,9 +122,11 @@ class TestThreadPool(TestBase): max_threads = cpu_count() - def _add_task_chain(self, p, ni, count=1): + def _add_task_chain(self, p, ni, count=1, fail_setup=list()): """Create a task chain of feeder, count transformers and order verifcator to the pool p, like t1 -> t2 -> t3 + :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would + make the third transformer fail after 20 items :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))""" nt = p.num_tasks() @@ -129,6 +149,11 @@ class TestThreadPool(TestBase): assert p.num_tasks() == nt + 2 + tc # END create count transformers + # setup failure + for id, fail_after in fail_setup: + tasks[1+id].fail_after = fail_after + # END setup failure + verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None) verifier.fun = verifier.do_fun vrc = p.add_task(verifier) @@ -149,7 +174,7 @@ class TestThreadPool(TestBase): def _assert_single_task(self, p, async=False): """Performs testing in a synchronized environment""" - return # DEBUG TODO: Fixme deactivated it + # return # DEBUG TODO: Fixme deactivated it print >> sys.stderr, "Threadpool: Starting single task (async = %i) with %i threads" % (async, p.size()) null_tasks = p.num_tasks() # in case we had some before @@ -316,8 +341,6 @@ class TestThreadPool(TestBase): print "read(0) with failure" assert len(rc.read()) == 0 # failure on first item - print >> sys.stderr, "done with everything" - assert isinstance(task.error(), AssertionError) assert task.is_done() # on error, its marked done as well del(rc) @@ -332,39 +355,107 @@ class TestThreadPool(TestBase): assert task.is_done() assert isinstance(task.error(), AssertionError) + print >> sys.stderr, "done with everything" + def _assert_async_dependent_tasks(self, pool): # includes failure in center task, 'recursive' orphan cleanup # This will also verify that the channel-close mechanism works # t1 -> t2 -> t3 - # t1 -> x -> t3 + print >> sys.stderr, "Threadpool: starting async dependency test in %i threads" % pool.size() null_tasks = pool.num_tasks() ni = 5000 count = 3 aic = count + 2 make_task = lambda *args, **kwargs: self._add_task_chain(pool, ni, count, *args, **kwargs) - ts, rcs = make_task() assert len(ts) == aic assert len(rcs) == aic assert pool.num_tasks() == null_tasks + len(ts) print pool._tasks.nodes - - # read all at once - print "read(0)" + # read(0) + ######### st = time.time() items = rcs[-1].read() - print "finished read(0)" elapsed = time.time() - st - print len(items), ni assert len(items) == ni + del(rcs) + assert pool.num_tasks() == 0 # tasks depleted, all done, no handles + print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed) - print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed) + + # read(1) + ######### + ts, rcs = make_task() + st = time.time() + for i in xrange(ni): + items = rcs[-1].read(1) + assert len(items) == 1 + # END for each item to pull + elapsed_single = time.time() - st + # another read yields nothing, its empty + assert len(rcs[-1].read()) == 0 + print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1) of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed_single, ni / elapsed_single) + + + # read with min-count size + ########################### + # must be faster, as it will read ni / 4 chunks + # Its enough to set one task, as it will force all others in the chain + # to min_size as well. + ts, rcs = make_task() + assert pool.num_tasks() == len(ts) + nri = ni / 4 + ts[-1].min_count = nri + st = time.time() + for i in xrange(ni): + items = rcs[-1].read(1) + assert len(items) == 1 + # END for each item to read + elapsed_minsize = time.time() - st + # its empty + assert len(rcs[-1].read()) == 0 + print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1), min_size=%i, of %i dependent in %f s ( %i items / s )" % (ni, nri, aic, elapsed_minsize, ni / elapsed_minsize) + + # it should have been a bit faster at least, and most of the time it is + # Sometimes, its not, mainly because: + # * The test tasks lock a lot, hence they slow down the system + # * Each read will still trigger the pool to evaluate, causing some overhead + # even though there are enough items on the queue in that case. Keeping + # track of the scheduled items helped there, but it caused further inacceptable + # slowdown + # assert elapsed_minsize < elapsed_single + + + # read with failure + ################### + # it should recover and give at least fail_after items + # t1 -> x -> t3 + fail_after = ni/2 + ts, rcs = make_task(fail_setup=[(0, fail_after)]) + items = rcs[-1].read() + assert len(items) == fail_after + # MULTI-POOL + # If two pools are connected, this shold work as well. + # The second one has just one more thread + if False: + p2 = ThreadPool(1) + assert p2.size() == 1 + p2ts, p2rcs = self._add_task_chain(p2, ni, count) + + ts, rcs = make_task() + + + del(p2ts) + del(p2rcs) + assert p2.num_tasks() == 0 + del(p2) + # in the end, we expect all tasks to be gone, automatically # order of deletion doesnt matter del(ts) -- cgit v1.2.1 From 55e757928e493ce93056822d510482e4ffcaac2d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 14:39:57 +0200 Subject: channel: Changed design to be more logical - a channel now has any amount of readers and writers, a ready is not connected to its writer anymore. This changes the refcounting of course, which is why the auto-cleanup for the pool is currently broken. The benefit of this are faster writes to the channel, reading didn't improve, refcounts should be clearer now --- test/git/async/test_pool.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 679bab31..d34f6773 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -191,8 +191,8 @@ class TestThreadPool(TestBase): assert p.num_tasks() == null_tasks rc = p.add_task(task) assert p.num_tasks() == 1 + null_tasks - assert isinstance(rc, RPoolChannel) - assert task._out_wc is not None + assert isinstance(rc, PoolReader) + assert task._out_writer is not None # pull the result completely - we should get one task, which calls its # function once. In sync mode, the order matches @@ -460,6 +460,7 @@ class TestThreadPool(TestBase): # order of deletion doesnt matter del(ts) del(rcs) + print pool.num_tasks() assert pool.num_tasks() == null_tasks -- cgit v1.2.1 From 7c36f3648e39ace752c67c71867693ce1eee52a3 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 15:38:40 +0200 Subject: Now tracking the amount of concurrent writers to assure the channel is closed only when there is no one else writing to it. This assures that all tasks can continue working, and put their results accordingly. Shutdown is still not working correctly, but that should be solvable as well. Its still not perfect though ... --- test/git/async/test_pool.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index d34f6773..7cb94a86 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -42,7 +42,7 @@ class _TestTaskBase(object): print self.item_count, fc assert self.item_count == fc self.lock.release() - + assert self._num_writers == 0 return self @@ -381,6 +381,7 @@ class TestThreadPool(TestBase): st = time.time() items = rcs[-1].read() elapsed = time.time() - st + print len(items), ni assert len(items) == ni del(rcs) assert pool.num_tasks() == 0 # tasks depleted, all done, no handles -- cgit v1.2.1 From fbe062bf6dacd3ad63dd827d898337fa542931ac Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 23:55:50 +0200 Subject: Added dependency-task tests, and fixed plenty of ref-count related bugs, as well as concurrency issues. Now it works okay, but the thread-shutdown is still an issue, as it causes incorrect behaviour making the tests fail. Its good, as it hints at additional issues that need to be solved. There is just a little more left on the feature side, but its nearly there --- test/git/async/test_pool.py | 129 +++++++++++++++++++++++++++++++++----------- 1 file changed, 98 insertions(+), 31 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 7cb94a86..4851f61b 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -5,6 +5,7 @@ from git.async.task import * from git.async.thread import terminate_threads from git.async.util import cpu_count import threading +import weakref import time import sys @@ -42,7 +43,9 @@ class _TestTaskBase(object): print self.item_count, fc assert self.item_count == fc self.lock.release() + self._wlock.acquire() assert self._num_writers == 0 + self._wlock.release() return self @@ -122,31 +125,47 @@ class TestThreadPool(TestBase): max_threads = cpu_count() - def _add_task_chain(self, p, ni, count=1, fail_setup=list()): + + def _make_proxy_method(self, t): + """required to prevent binding self into the method we call""" + wt = weakref.proxy(t) + return lambda item: wt.do_fun(item) + + def _add_task_chain(self, p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0): """Create a task chain of feeder, count transformers and order verifcator to the pool p, like t1 -> t2 -> t3 :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would make the third transformer fail after 20 items + :param feeder_channel: if set to a channel, it will be used as input of the + first transformation task. The respective first task in the return value + will be None. + :param id_offset: defines the id of the first transformation task, all subsequent + ones will add one :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))""" nt = p.num_tasks() - feeder = self._make_iterator_task(ni) - frc = p.add_task(feeder) - - assert p.num_tasks() == nt + 1 + feeder = None + frc = feeder_channel + if feeder_channel is None: + feeder = self._make_iterator_task(ni) + frc = p.add_task(feeder) + # END handle specific feeder rcs = [frc] tasks = [feeder] + make_proxy_method = self._make_proxy_method + inrc = frc for tc in xrange(count): - t = TestThreadInputChannelTaskNode(inrc, tc, None) - t.fun = t.do_fun + t = TestThreadInputChannelTaskNode(inrc, tc+id_offset, None) + + t.fun = make_proxy_method(t) + #t.fun = t.do_fun inrc = p.add_task(t) tasks.append(t) rcs.append(inrc) - assert p.num_tasks() == nt + 2 + tc # END create count transformers # setup failure @@ -155,10 +174,10 @@ class TestThreadPool(TestBase): # END setup failure verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None) - verifier.fun = verifier.do_fun + #verifier.fun = verifier.do_fun + verifier.fun = make_proxy_method(verifier) vrc = p.add_task(verifier) - assert p.num_tasks() == nt + tc + 3 tasks.append(verifier) rcs.append(vrc) @@ -169,7 +188,7 @@ class TestThreadPool(TestBase): :param taskcls: the actual iterator type to use :param **kwargs: additional kwargs to be passed to the task""" t = taskcls(iter(range(ni)), 'iterator', None, **kwargs) - t.fun = t.do_fun + t.fun = self._make_proxy_method(t) return t def _assert_single_task(self, p, async=False): @@ -385,6 +404,14 @@ class TestThreadPool(TestBase): assert len(items) == ni del(rcs) assert pool.num_tasks() == 0 # tasks depleted, all done, no handles + # wait a tiny moment - there could still be something unprocessed on the + # queue, increasing the refcount + time.sleep(0.15) + import gc + print gc.get_referrers(ts[-1]) + print len(pool._queue) + assert sys.getrefcount(ts[-1]) == 2 # ts + call + assert sys.getrefcount(ts[0]) == 2 # ts + call print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed) @@ -444,25 +471,53 @@ class TestThreadPool(TestBase): # MULTI-POOL # If two pools are connected, this shold work as well. # The second one has just one more thread - if False: - p2 = ThreadPool(1) - assert p2.size() == 1 - p2ts, p2rcs = self._add_task_chain(p2, ni, count) - - ts, rcs = make_task() - - - del(p2ts) - del(p2rcs) - assert p2.num_tasks() == 0 - del(p2) - - # in the end, we expect all tasks to be gone, automatically - # order of deletion doesnt matter + ts, rcs = make_task() + + # connect verifier channel as feeder of the second pool + p2 = ThreadPool(1) + assert p2.size() == 1 + p2ts, p2rcs = self._add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) + assert p2ts[0] is None # we have no feeder task + assert rcs[-1].pool_ref()() is pool # it didnt change the pool + assert rcs[-1] is p2ts[1].reader() + assert p2.num_tasks() == len(p2ts)-1 # first is None + + # reading from the last one will evaluate all pools correctly + print "read(0) multi-pool" + items = p2rcs[-1].read() + assert len(items) == ni + + # now that both are connected, I can drop my handle to the reader + # without affecting the task-count, but whats more important: + # They remove their tasks correctly once we drop our references in the + # right order + del(p2ts) + assert p2rcs[0] is rcs[-1] + del(p2rcs) + assert p2.num_tasks() == 0 + del(p2) + + assert pool.num_tasks() == null_tasks + len(ts) + + del(ts) + print "del rcs" + print rcs[-1] + print sys.getrefcount(rcs[-1]) del(rcs) + # TODO: make this work - something with the refcount goes wrong, + # they never get cleaned up properly + ts = pool._tasks.nodes print pool.num_tasks() - assert pool.num_tasks() == null_tasks + assert pool.num_tasks() == null_tasks + + + # TODO: Test multi-read(1) + + # in the end, we expect all tasks to be gone, automatically + # order of deletion doesnt matter + + @@ -496,17 +551,28 @@ class TestThreadPool(TestBase): # SINGLE TASK SERIAL SYNC MODE ############################## - # put a few unrelated tasks that we forget about - urc1 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None)) - urc2 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None)) + # put a few unrelated tasks that we forget about - check ref counts and cleanup + t1, t2 = TestThreadTaskNode(iter(list()), "nothing1", None), TestThreadTaskNode(iter(list()), "nothing2", None) + urc1 = p.add_task(t1) + urc2 = p.add_task(t2) assert p.num_tasks() == 2 ## SINGLE TASK ################# self._assert_single_task(p, False) assert p.num_tasks() == 2 del(urc1) - del(urc2) + assert p.num_tasks() == 1 + + p.remove_task(t2) + assert p.num_tasks() == 0 + assert sys.getrefcount(t2) == 2 + + t3 = TestThreadInputChannelTaskNode(urc2, "channel", None) + urc3 = p.add_task(t3) + assert p.num_tasks() == 1 + del(urc3) assert p.num_tasks() == 0 + assert sys.getrefcount(t3) == 2 # DEPENDENT TASKS SYNC MODE @@ -519,6 +585,7 @@ class TestThreadPool(TestBase): # step one gear up - just one thread for now. p.set_size(1) assert p.size() == 1 + print len(threading.enumerate()) assert len(threading.enumerate()) == num_threads + 1 # deleting the pool stops its threads - just to be sure ;) # Its not synchronized, hence we wait a moment -- cgit v1.2.1 From 6d1212e8c412b0b4802bc1080d38d54907db879d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 11:52:01 +0200 Subject: IMPORTANT: sometimes, when notifying waiters by releasing their lock, the lock is not actually released or they are not actually notifyied, staying in a beautysleep. This glitch is probably caused by some detail not treated correctly in the thread python module, which is something we cannot fix. It works most of the time as expected though - maybe some cleanup is not done correctly which causes this --- test/git/async/test_pool.py | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 4851f61b..5bb48cc2 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -199,7 +199,7 @@ class TestThreadPool(TestBase): # add a simple task # it iterates n items - ni = 5000 + ni = 1000 assert ni % 2 == 0, "ni needs to be dividable by 2" assert ni % 4 == 0, "ni needs to be dividable by 4" @@ -382,18 +382,18 @@ class TestThreadPool(TestBase): # includes failure in center task, 'recursive' orphan cleanup # This will also verify that the channel-close mechanism works # t1 -> t2 -> t3 - + print >> sys.stderr, "Threadpool: starting async dependency test in %i threads" % pool.size() null_tasks = pool.num_tasks() - ni = 5000 + ni = 1000 count = 3 aic = count + 2 make_task = lambda *args, **kwargs: self._add_task_chain(pool, ni, count, *args, **kwargs) + ts, rcs = make_task() assert len(ts) == aic assert len(rcs) == aic assert pool.num_tasks() == null_tasks + len(ts) - print pool._tasks.nodes # read(0) ######### @@ -407,9 +407,6 @@ class TestThreadPool(TestBase): # wait a tiny moment - there could still be something unprocessed on the # queue, increasing the refcount time.sleep(0.15) - import gc - print gc.get_referrers(ts[-1]) - print len(pool._queue) assert sys.getrefcount(ts[-1]) == 2 # ts + call assert sys.getrefcount(ts[0]) == 2 # ts + call print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed) @@ -467,15 +464,15 @@ class TestThreadPool(TestBase): items = rcs[-1].read() assert len(items) == fail_after - + # MULTI-POOL # If two pools are connected, this shold work as well. # The second one has just one more thread ts, rcs = make_task() # connect verifier channel as feeder of the second pool - p2 = ThreadPool(1) - assert p2.size() == 1 + p2 = ThreadPool(0) # don't spawn new threads, they have the tendency not to wake up on mutexes + assert p2.size() == 0 p2ts, p2rcs = self._add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) assert p2ts[0] is None # we have no feeder task assert rcs[-1].pool_ref()() is pool # it didnt change the pool @@ -501,14 +498,8 @@ class TestThreadPool(TestBase): del(ts) - print "del rcs" - print rcs[-1] - print sys.getrefcount(rcs[-1]) del(rcs) - # TODO: make this work - something with the refcount goes wrong, - # they never get cleaned up properly - ts = pool._tasks.nodes - print pool.num_tasks() + assert pool.num_tasks() == null_tasks @@ -585,7 +576,6 @@ class TestThreadPool(TestBase): # step one gear up - just one thread for now. p.set_size(1) assert p.size() == 1 - print len(threading.enumerate()) assert len(threading.enumerate()) == num_threads + 1 # deleting the pool stops its threads - just to be sure ;) # Its not synchronized, hence we wait a moment -- cgit v1.2.1 From 5ff864138cd1e680a78522c26b583639f8f5e313 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 14:37:51 +0200 Subject: test.async: split test_pool up into task implenetations and related utilities, as well as the tests themselves. File became too large --- test/git/async/test_pool.py | 186 ++------------------------------------------ 1 file changed, 6 insertions(+), 180 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 5bb48cc2..0fa34f6a 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -1,196 +1,22 @@ """Channel testing""" from test.testlib import * +from task import * + from git.async.pool import * -from git.async.task import * from git.async.thread import terminate_threads from git.async.util import cpu_count + import threading import weakref import time import sys -class _TestTaskBase(object): - """Note: causes great slowdown due to the required locking of task variables""" - def __init__(self, *args, **kwargs): - super(_TestTaskBase, self).__init__(*args, **kwargs) - self.should_fail = False - self.lock = threading.Lock() # yes, can't safely do x = x + 1 :) - self.plock = threading.Lock() - self.item_count = 0 - self.process_count = 0 - - def do_fun(self, item): - self.lock.acquire() - self.item_count += 1 - self.lock.release() - if self.should_fail: - raise AssertionError("I am failing just for the fun of it") - return item - - def process(self, count=1): - # must do it first, otherwise we might read and check results before - # the thread gets here :). Its a lesson ! - self.plock.acquire() - self.process_count += 1 - self.plock.release() - super(_TestTaskBase, self).process(count) - - def _assert(self, pc, fc, check_scheduled=False): - """Assert for num process counts (pc) and num function counts (fc) - :return: self""" - self.lock.acquire() - if self.item_count != fc: - print self.item_count, fc - assert self.item_count == fc - self.lock.release() - self._wlock.acquire() - assert self._num_writers == 0 - self._wlock.release() - return self - - -class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask): - pass - - -class TestThreadFailureNode(TestThreadTaskNode): - """Fails after X items""" - def __init__(self, *args, **kwargs): - self.fail_after = kwargs.pop('fail_after') - super(TestThreadFailureNode, self).__init__(*args, **kwargs) - - def do_fun(self, item): - item = TestThreadTaskNode.do_fun(self, item) - - self.lock.acquire() - try: - if self.item_count > self.fail_after: - raise AssertionError("Simulated failure after processing %i items" % self.fail_after) - finally: - self.lock.release() - # END handle fail after - return item - - -class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask): - """Apply a transformation on items read from an input channel""" - def __init__(self, *args, **kwargs): - self.fail_after = kwargs.pop('fail_after', 0) - super(TestThreadInputChannelTaskNode, self).__init__(*args, **kwargs) - - def do_fun(self, item): - """return tuple(i, i*2)""" - item = super(TestThreadInputChannelTaskNode, self).do_fun(item) - - # fail after support - if self.fail_after: - self.lock.acquire() - try: - if self.item_count > self.fail_after: - raise AssertionError("Simulated failure after processing %i items" % self.fail_after) - finally: - self.lock.release() - # END handle fail-after - - if isinstance(item, tuple): - i = item[0] - return item + (i * self.id, ) - else: - return (item, item * self.id) - # END handle tuple -class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask): - """An input channel task, which verifies the result of its input channels, - should be last in the chain. - Id must be int""" - - def do_fun(self, item): - """return tuple(i, i*2)""" - item = super(TestThreadInputChannelVerifyTaskNode, self).do_fun(item) - - # make sure the computation order matches - assert isinstance(item, tuple), "input was no tuple: %s" % item - - base = item[0] - for id, num in enumerate(item[1:]): - assert num == base * id, "%i != %i, orig = %s" % (num, base * id, str(item)) - # END verify order - - return item - - - class TestThreadPool(TestBase): max_threads = cpu_count() - - def _make_proxy_method(self, t): - """required to prevent binding self into the method we call""" - wt = weakref.proxy(t) - return lambda item: wt.do_fun(item) - - def _add_task_chain(self, p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0): - """Create a task chain of feeder, count transformers and order verifcator - to the pool p, like t1 -> t2 -> t3 - :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would - make the third transformer fail after 20 items - :param feeder_channel: if set to a channel, it will be used as input of the - first transformation task. The respective first task in the return value - will be None. - :param id_offset: defines the id of the first transformation task, all subsequent - ones will add one - :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))""" - nt = p.num_tasks() - - feeder = None - frc = feeder_channel - if feeder_channel is None: - feeder = self._make_iterator_task(ni) - frc = p.add_task(feeder) - # END handle specific feeder - - rcs = [frc] - tasks = [feeder] - - make_proxy_method = self._make_proxy_method - - inrc = frc - for tc in xrange(count): - t = TestThreadInputChannelTaskNode(inrc, tc+id_offset, None) - - t.fun = make_proxy_method(t) - #t.fun = t.do_fun - inrc = p.add_task(t) - - tasks.append(t) - rcs.append(inrc) - # END create count transformers - - # setup failure - for id, fail_after in fail_setup: - tasks[1+id].fail_after = fail_after - # END setup failure - - verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None) - #verifier.fun = verifier.do_fun - verifier.fun = make_proxy_method(verifier) - vrc = p.add_task(verifier) - - - tasks.append(verifier) - rcs.append(vrc) - return tasks, rcs - - def _make_iterator_task(self, ni, taskcls=TestThreadTaskNode, **kwargs): - """:return: task which yields ni items - :param taskcls: the actual iterator type to use - :param **kwargs: additional kwargs to be passed to the task""" - t = taskcls(iter(range(ni)), 'iterator', None, **kwargs) - t.fun = self._make_proxy_method(t) - return t - def _assert_single_task(self, p, async=False): """Performs testing in a synchronized environment""" # return # DEBUG TODO: Fixme deactivated it @@ -203,7 +29,7 @@ class TestThreadPool(TestBase): assert ni % 2 == 0, "ni needs to be dividable by 2" assert ni % 4 == 0, "ni needs to be dividable by 4" - make_task = lambda *args, **kwargs: self._make_iterator_task(ni, *args, **kwargs) + make_task = lambda *args, **kwargs: make_iterator_task(ni, *args, **kwargs) task = make_task() @@ -388,7 +214,7 @@ class TestThreadPool(TestBase): ni = 1000 count = 3 aic = count + 2 - make_task = lambda *args, **kwargs: self._add_task_chain(pool, ni, count, *args, **kwargs) + make_task = lambda *args, **kwargs: add_task_chain(pool, ni, count, *args, **kwargs) ts, rcs = make_task() assert len(ts) == aic @@ -473,7 +299,7 @@ class TestThreadPool(TestBase): # connect verifier channel as feeder of the second pool p2 = ThreadPool(0) # don't spawn new threads, they have the tendency not to wake up on mutexes assert p2.size() == 0 - p2ts, p2rcs = self._add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) + p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) assert p2ts[0] is None # we have no feeder task assert rcs[-1].pool_ref()() is pool # it didnt change the pool assert rcs[-1] is p2ts[1].reader() -- cgit v1.2.1 From 18e3252a1f655f09093a4cffd5125342a8f94f3b Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 14:58:51 +0200 Subject: Finished dependent task testing according to the features we would currently like to see --- test/git/async/test_pool.py | 48 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 42 insertions(+), 6 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 0fa34f6a..40c6d66e 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -132,8 +132,13 @@ class TestThreadPool(TestBase): assert len(items) == nri task._assert( 5, ni) - assert task.is_done() + + # delete the handle first, causing the task to be removed and to be set + # done. We check for the set-done state later. Depending on the timing, + # The task is not yet set done when we are checking it because we were + # scheduled in before the flag could be set. del(rc) + assert task.is_done() assert p.num_tasks() == null_tasks # depleted # but this only hits if we want too many items, if we want less, it could @@ -307,9 +312,41 @@ class TestThreadPool(TestBase): # reading from the last one will evaluate all pools correctly print "read(0) multi-pool" + st = time.time() items = p2rcs[-1].read() + elapsed = time.time() - st assert len(items) == ni + print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(0), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed) + + + # loose the handles of the second pool to allow others to go as well + del(p2rcs); del(p2ts) + assert p2.num_tasks() == 0 + + # now we lost our old handles as well, and the tasks go away + ts, rcs = make_task() + assert pool.num_tasks() == len(ts) + + p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) + assert p2.num_tasks() == len(p2ts) - 1 + + # Test multi-read(1) + print "read(1) * %i" % ni + reader = rcs[-1] + st = time.time() + for i in xrange(ni): + items = reader.read(1) + assert len(items) == 1 + # END for each item to get + elapsed = time.time() - st + del(reader) # decrement refcount + + print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(1), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed) + + # another read is empty + assert len(rcs[-1].read()) == 0 + # now that both are connected, I can drop my handle to the reader # without affecting the task-count, but whats more important: # They remove their tasks correctly once we drop our references in the @@ -329,11 +366,10 @@ class TestThreadPool(TestBase): assert pool.num_tasks() == null_tasks - # TODO: Test multi-read(1) - - # in the end, we expect all tasks to be gone, automatically - # order of deletion doesnt matter - + # ASSERTION: We already tested that one pool behaves correctly when an error + # occours - if two pools handle their ref-counts correctly, which they + # do if we are here, then they should handle errors happening during + # the task processing as expected as well. Hence we can safe this here -- cgit v1.2.1