From 55e757928e493ce93056822d510482e4ffcaac2d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 14:39:57 +0200 Subject: channel: Changed design to be more logical - a channel now has any amount of readers and writers, a ready is not connected to its writer anymore. This changes the refcounting of course, which is why the auto-cleanup for the pool is currently broken. The benefit of this are faster writes to the channel, reading didn't improve, refcounts should be clearer now --- test/git/async/test_pool.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 679bab31..d34f6773 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -191,8 +191,8 @@ class TestThreadPool(TestBase): assert p.num_tasks() == null_tasks rc = p.add_task(task) assert p.num_tasks() == 1 + null_tasks - assert isinstance(rc, RPoolChannel) - assert task._out_wc is not None + assert isinstance(rc, PoolReader) + assert task._out_writer is not None # pull the result completely - we should get one task, which calls its # function once. In sync mode, the order matches @@ -460,6 +460,7 @@ class TestThreadPool(TestBase): # order of deletion doesnt matter del(ts) del(rcs) + print pool.num_tasks() assert pool.num_tasks() == null_tasks -- cgit v1.2.1 From 7c36f3648e39ace752c67c71867693ce1eee52a3 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 15:38:40 +0200 Subject: Now tracking the amount of concurrent writers to assure the channel is closed only when there is no one else writing to it. This assures that all tasks can continue working, and put their results accordingly. Shutdown is still not working correctly, but that should be solvable as well. Its still not perfect though ... --- test/git/async/test_pool.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index d34f6773..7cb94a86 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -42,7 +42,7 @@ class _TestTaskBase(object): print self.item_count, fc assert self.item_count == fc self.lock.release() - + assert self._num_writers == 0 return self @@ -381,6 +381,7 @@ class TestThreadPool(TestBase): st = time.time() items = rcs[-1].read() elapsed = time.time() - st + print len(items), ni assert len(items) == ni del(rcs) assert pool.num_tasks() == 0 # tasks depleted, all done, no handles -- cgit v1.2.1 From fbe062bf6dacd3ad63dd827d898337fa542931ac Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 23:55:50 +0200 Subject: Added dependency-task tests, and fixed plenty of ref-count related bugs, as well as concurrency issues. Now it works okay, but the thread-shutdown is still an issue, as it causes incorrect behaviour making the tests fail. Its good, as it hints at additional issues that need to be solved. There is just a little more left on the feature side, but its nearly there --- test/git/async/test_pool.py | 129 +++++++++++++++++++++++++++++++++----------- 1 file changed, 98 insertions(+), 31 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 7cb94a86..4851f61b 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -5,6 +5,7 @@ from git.async.task import * from git.async.thread import terminate_threads from git.async.util import cpu_count import threading +import weakref import time import sys @@ -42,7 +43,9 @@ class _TestTaskBase(object): print self.item_count, fc assert self.item_count == fc self.lock.release() + self._wlock.acquire() assert self._num_writers == 0 + self._wlock.release() return self @@ -122,31 +125,47 @@ class TestThreadPool(TestBase): max_threads = cpu_count() - def _add_task_chain(self, p, ni, count=1, fail_setup=list()): + + def _make_proxy_method(self, t): + """required to prevent binding self into the method we call""" + wt = weakref.proxy(t) + return lambda item: wt.do_fun(item) + + def _add_task_chain(self, p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0): """Create a task chain of feeder, count transformers and order verifcator to the pool p, like t1 -> t2 -> t3 :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would make the third transformer fail after 20 items + :param feeder_channel: if set to a channel, it will be used as input of the + first transformation task. The respective first task in the return value + will be None. + :param id_offset: defines the id of the first transformation task, all subsequent + ones will add one :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))""" nt = p.num_tasks() - feeder = self._make_iterator_task(ni) - frc = p.add_task(feeder) - - assert p.num_tasks() == nt + 1 + feeder = None + frc = feeder_channel + if feeder_channel is None: + feeder = self._make_iterator_task(ni) + frc = p.add_task(feeder) + # END handle specific feeder rcs = [frc] tasks = [feeder] + make_proxy_method = self._make_proxy_method + inrc = frc for tc in xrange(count): - t = TestThreadInputChannelTaskNode(inrc, tc, None) - t.fun = t.do_fun + t = TestThreadInputChannelTaskNode(inrc, tc+id_offset, None) + + t.fun = make_proxy_method(t) + #t.fun = t.do_fun inrc = p.add_task(t) tasks.append(t) rcs.append(inrc) - assert p.num_tasks() == nt + 2 + tc # END create count transformers # setup failure @@ -155,10 +174,10 @@ class TestThreadPool(TestBase): # END setup failure verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None) - verifier.fun = verifier.do_fun + #verifier.fun = verifier.do_fun + verifier.fun = make_proxy_method(verifier) vrc = p.add_task(verifier) - assert p.num_tasks() == nt + tc + 3 tasks.append(verifier) rcs.append(vrc) @@ -169,7 +188,7 @@ class TestThreadPool(TestBase): :param taskcls: the actual iterator type to use :param **kwargs: additional kwargs to be passed to the task""" t = taskcls(iter(range(ni)), 'iterator', None, **kwargs) - t.fun = t.do_fun + t.fun = self._make_proxy_method(t) return t def _assert_single_task(self, p, async=False): @@ -385,6 +404,14 @@ class TestThreadPool(TestBase): assert len(items) == ni del(rcs) assert pool.num_tasks() == 0 # tasks depleted, all done, no handles + # wait a tiny moment - there could still be something unprocessed on the + # queue, increasing the refcount + time.sleep(0.15) + import gc + print gc.get_referrers(ts[-1]) + print len(pool._queue) + assert sys.getrefcount(ts[-1]) == 2 # ts + call + assert sys.getrefcount(ts[0]) == 2 # ts + call print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed) @@ -444,25 +471,53 @@ class TestThreadPool(TestBase): # MULTI-POOL # If two pools are connected, this shold work as well. # The second one has just one more thread - if False: - p2 = ThreadPool(1) - assert p2.size() == 1 - p2ts, p2rcs = self._add_task_chain(p2, ni, count) - - ts, rcs = make_task() - - - del(p2ts) - del(p2rcs) - assert p2.num_tasks() == 0 - del(p2) - - # in the end, we expect all tasks to be gone, automatically - # order of deletion doesnt matter + ts, rcs = make_task() + + # connect verifier channel as feeder of the second pool + p2 = ThreadPool(1) + assert p2.size() == 1 + p2ts, p2rcs = self._add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) + assert p2ts[0] is None # we have no feeder task + assert rcs[-1].pool_ref()() is pool # it didnt change the pool + assert rcs[-1] is p2ts[1].reader() + assert p2.num_tasks() == len(p2ts)-1 # first is None + + # reading from the last one will evaluate all pools correctly + print "read(0) multi-pool" + items = p2rcs[-1].read() + assert len(items) == ni + + # now that both are connected, I can drop my handle to the reader + # without affecting the task-count, but whats more important: + # They remove their tasks correctly once we drop our references in the + # right order + del(p2ts) + assert p2rcs[0] is rcs[-1] + del(p2rcs) + assert p2.num_tasks() == 0 + del(p2) + + assert pool.num_tasks() == null_tasks + len(ts) + + del(ts) + print "del rcs" + print rcs[-1] + print sys.getrefcount(rcs[-1]) del(rcs) + # TODO: make this work - something with the refcount goes wrong, + # they never get cleaned up properly + ts = pool._tasks.nodes print pool.num_tasks() - assert pool.num_tasks() == null_tasks + assert pool.num_tasks() == null_tasks + + + # TODO: Test multi-read(1) + + # in the end, we expect all tasks to be gone, automatically + # order of deletion doesnt matter + + @@ -496,17 +551,28 @@ class TestThreadPool(TestBase): # SINGLE TASK SERIAL SYNC MODE ############################## - # put a few unrelated tasks that we forget about - urc1 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None)) - urc2 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None)) + # put a few unrelated tasks that we forget about - check ref counts and cleanup + t1, t2 = TestThreadTaskNode(iter(list()), "nothing1", None), TestThreadTaskNode(iter(list()), "nothing2", None) + urc1 = p.add_task(t1) + urc2 = p.add_task(t2) assert p.num_tasks() == 2 ## SINGLE TASK ################# self._assert_single_task(p, False) assert p.num_tasks() == 2 del(urc1) - del(urc2) + assert p.num_tasks() == 1 + + p.remove_task(t2) + assert p.num_tasks() == 0 + assert sys.getrefcount(t2) == 2 + + t3 = TestThreadInputChannelTaskNode(urc2, "channel", None) + urc3 = p.add_task(t3) + assert p.num_tasks() == 1 + del(urc3) assert p.num_tasks() == 0 + assert sys.getrefcount(t3) == 2 # DEPENDENT TASKS SYNC MODE @@ -519,6 +585,7 @@ class TestThreadPool(TestBase): # step one gear up - just one thread for now. p.set_size(1) assert p.size() == 1 + print len(threading.enumerate()) assert len(threading.enumerate()) == num_threads + 1 # deleting the pool stops its threads - just to be sure ;) # Its not synchronized, hence we wait a moment -- cgit v1.2.1 From 6d1212e8c412b0b4802bc1080d38d54907db879d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 11:52:01 +0200 Subject: IMPORTANT: sometimes, when notifying waiters by releasing their lock, the lock is not actually released or they are not actually notifyied, staying in a beautysleep. This glitch is probably caused by some detail not treated correctly in the thread python module, which is something we cannot fix. It works most of the time as expected though - maybe some cleanup is not done correctly which causes this --- test/git/async/test_pool.py | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 4851f61b..5bb48cc2 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -199,7 +199,7 @@ class TestThreadPool(TestBase): # add a simple task # it iterates n items - ni = 5000 + ni = 1000 assert ni % 2 == 0, "ni needs to be dividable by 2" assert ni % 4 == 0, "ni needs to be dividable by 4" @@ -382,18 +382,18 @@ class TestThreadPool(TestBase): # includes failure in center task, 'recursive' orphan cleanup # This will also verify that the channel-close mechanism works # t1 -> t2 -> t3 - + print >> sys.stderr, "Threadpool: starting async dependency test in %i threads" % pool.size() null_tasks = pool.num_tasks() - ni = 5000 + ni = 1000 count = 3 aic = count + 2 make_task = lambda *args, **kwargs: self._add_task_chain(pool, ni, count, *args, **kwargs) + ts, rcs = make_task() assert len(ts) == aic assert len(rcs) == aic assert pool.num_tasks() == null_tasks + len(ts) - print pool._tasks.nodes # read(0) ######### @@ -407,9 +407,6 @@ class TestThreadPool(TestBase): # wait a tiny moment - there could still be something unprocessed on the # queue, increasing the refcount time.sleep(0.15) - import gc - print gc.get_referrers(ts[-1]) - print len(pool._queue) assert sys.getrefcount(ts[-1]) == 2 # ts + call assert sys.getrefcount(ts[0]) == 2 # ts + call print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed) @@ -467,15 +464,15 @@ class TestThreadPool(TestBase): items = rcs[-1].read() assert len(items) == fail_after - + # MULTI-POOL # If two pools are connected, this shold work as well. # The second one has just one more thread ts, rcs = make_task() # connect verifier channel as feeder of the second pool - p2 = ThreadPool(1) - assert p2.size() == 1 + p2 = ThreadPool(0) # don't spawn new threads, they have the tendency not to wake up on mutexes + assert p2.size() == 0 p2ts, p2rcs = self._add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) assert p2ts[0] is None # we have no feeder task assert rcs[-1].pool_ref()() is pool # it didnt change the pool @@ -501,14 +498,8 @@ class TestThreadPool(TestBase): del(ts) - print "del rcs" - print rcs[-1] - print sys.getrefcount(rcs[-1]) del(rcs) - # TODO: make this work - something with the refcount goes wrong, - # they never get cleaned up properly - ts = pool._tasks.nodes - print pool.num_tasks() + assert pool.num_tasks() == null_tasks @@ -585,7 +576,6 @@ class TestThreadPool(TestBase): # step one gear up - just one thread for now. p.set_size(1) assert p.size() == 1 - print len(threading.enumerate()) assert len(threading.enumerate()) == num_threads + 1 # deleting the pool stops its threads - just to be sure ;) # Its not synchronized, hence we wait a moment -- cgit v1.2.1