diff options
Diffstat (limited to 'test/git/async')
-rw-r--r-- | test/git/async/test_channel.py | 6 | ||||
-rw-r--r-- | test/git/async/test_graph.py | 16 | ||||
-rw-r--r-- | test/git/async/test_pool.py | 135 |
3 files changed, 113 insertions, 44 deletions
diff --git a/test/git/async/test_channel.py b/test/git/async/test_channel.py index 215081cd..a24c7c91 100644 --- a/test/git/async/test_channel.py +++ b/test/git/async/test_channel.py @@ -9,8 +9,8 @@ class TestChannels(TestBase): def test_base(self): # creating channel yields a write and a read channal wc, rc = mkchannel() - assert isinstance(wc, WChannel) # default args - assert isinstance(rc, RChannel) + assert isinstance(wc, Writer) # default args + assert isinstance(rc, Reader) # TEST UNLIMITED SIZE CHANNEL - writing+reading is FIFO @@ -46,7 +46,7 @@ class TestChannels(TestBase): # test callback channels - wc, rc = mkchannel(wctype = CallbackWChannel, rctype = CallbackRChannel) + wc, rc = mkchannel(wtype = CallbackWriter, rtype = CallbackReader) cb = [0, 0] # set slots to one if called def pre_write(item): diff --git a/test/git/async/test_graph.py b/test/git/async/test_graph.py index d0e36159..7630226b 100644 --- a/test/git/async/test_graph.py +++ b/test/git/async/test_graph.py @@ -3,6 +3,7 @@ from test.testlib import * from git.async.graph import * import time +import sys class TestGraph(TestBase): @@ -19,7 +20,7 @@ class TestGraph(TestBase): # delete unconnected nodes for n in g.nodes[:]: - g.del_node(n) + g.remove_node(n) # END del nodes # add a chain of connected nodes @@ -54,8 +55,8 @@ class TestGraph(TestBase): # deleting a connected node clears its neighbour connections assert n3.in_nodes[0] is n2 - assert g.del_node(n2) is g - assert g.del_node(n2) is g # multi-deletion okay + assert g.remove_node(n2) is g + assert g.remove_node(n2) is g # multi-deletion okay assert len(g.nodes) == nn - 1 assert len(n3.in_nodes) == 0 assert len(n1.out_nodes) == 0 @@ -68,3 +69,12 @@ class TestGraph(TestBase): assert dfirst_nodes[-1] == end and dfirst_nodes[-2].id == end.id-1 + # test cleanup + # its at least kept by its graph + assert sys.getrefcount(end) > 3 + del(g) + del(n1); del(n2); del(n3) + del(dfirst_nodes) + del(last) + del(n) + assert sys.getrefcount(end) == 2 diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 679bab31..5bb48cc2 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -5,6 +5,7 @@ from git.async.task import * from git.async.thread import terminate_threads from git.async.util import cpu_count import threading +import weakref import time import sys @@ -42,7 +43,9 @@ class _TestTaskBase(object): print self.item_count, fc assert self.item_count == fc self.lock.release() - + self._wlock.acquire() + assert self._num_writers == 0 + self._wlock.release() return self @@ -122,31 +125,47 @@ class TestThreadPool(TestBase): max_threads = cpu_count() - def _add_task_chain(self, p, ni, count=1, fail_setup=list()): + + def _make_proxy_method(self, t): + """required to prevent binding self into the method we call""" + wt = weakref.proxy(t) + return lambda item: wt.do_fun(item) + + def _add_task_chain(self, p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0): """Create a task chain of feeder, count transformers and order verifcator to the pool p, like t1 -> t2 -> t3 :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would make the third transformer fail after 20 items + :param feeder_channel: if set to a channel, it will be used as input of the + first transformation task. The respective first task in the return value + will be None. + :param id_offset: defines the id of the first transformation task, all subsequent + ones will add one :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))""" nt = p.num_tasks() - feeder = self._make_iterator_task(ni) - frc = p.add_task(feeder) - - assert p.num_tasks() == nt + 1 + feeder = None + frc = feeder_channel + if feeder_channel is None: + feeder = self._make_iterator_task(ni) + frc = p.add_task(feeder) + # END handle specific feeder rcs = [frc] tasks = [feeder] + make_proxy_method = self._make_proxy_method + inrc = frc for tc in xrange(count): - t = TestThreadInputChannelTaskNode(inrc, tc, None) - t.fun = t.do_fun + t = TestThreadInputChannelTaskNode(inrc, tc+id_offset, None) + + t.fun = make_proxy_method(t) + #t.fun = t.do_fun inrc = p.add_task(t) tasks.append(t) rcs.append(inrc) - assert p.num_tasks() == nt + 2 + tc # END create count transformers # setup failure @@ -155,10 +174,10 @@ class TestThreadPool(TestBase): # END setup failure verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None) - verifier.fun = verifier.do_fun + #verifier.fun = verifier.do_fun + verifier.fun = make_proxy_method(verifier) vrc = p.add_task(verifier) - assert p.num_tasks() == nt + tc + 3 tasks.append(verifier) rcs.append(vrc) @@ -169,7 +188,7 @@ class TestThreadPool(TestBase): :param taskcls: the actual iterator type to use :param **kwargs: additional kwargs to be passed to the task""" t = taskcls(iter(range(ni)), 'iterator', None, **kwargs) - t.fun = t.do_fun + t.fun = self._make_proxy_method(t) return t def _assert_single_task(self, p, async=False): @@ -180,7 +199,7 @@ class TestThreadPool(TestBase): # add a simple task # it iterates n items - ni = 5000 + ni = 1000 assert ni % 2 == 0, "ni needs to be dividable by 2" assert ni % 4 == 0, "ni needs to be dividable by 4" @@ -191,8 +210,8 @@ class TestThreadPool(TestBase): assert p.num_tasks() == null_tasks rc = p.add_task(task) assert p.num_tasks() == 1 + null_tasks - assert isinstance(rc, RPoolChannel) - assert task._out_wc is not None + assert isinstance(rc, PoolReader) + assert task._out_writer is not None # pull the result completely - we should get one task, which calls its # function once. In sync mode, the order matches @@ -363,27 +382,33 @@ class TestThreadPool(TestBase): # includes failure in center task, 'recursive' orphan cleanup # This will also verify that the channel-close mechanism works # t1 -> t2 -> t3 - + print >> sys.stderr, "Threadpool: starting async dependency test in %i threads" % pool.size() null_tasks = pool.num_tasks() - ni = 5000 + ni = 1000 count = 3 aic = count + 2 make_task = lambda *args, **kwargs: self._add_task_chain(pool, ni, count, *args, **kwargs) + ts, rcs = make_task() assert len(ts) == aic assert len(rcs) == aic assert pool.num_tasks() == null_tasks + len(ts) - print pool._tasks.nodes # read(0) ######### st = time.time() items = rcs[-1].read() elapsed = time.time() - st + print len(items), ni assert len(items) == ni del(rcs) assert pool.num_tasks() == 0 # tasks depleted, all done, no handles + # wait a tiny moment - there could still be something unprocessed on the + # queue, increasing the refcount + time.sleep(0.15) + assert sys.getrefcount(ts[-1]) == 2 # ts + call + assert sys.getrefcount(ts[0]) == 2 # ts + call print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed) @@ -439,28 +464,51 @@ class TestThreadPool(TestBase): items = rcs[-1].read() assert len(items) == fail_after - + # MULTI-POOL # If two pools are connected, this shold work as well. # The second one has just one more thread - if False: - p2 = ThreadPool(1) - assert p2.size() == 1 - p2ts, p2rcs = self._add_task_chain(p2, ni, count) - - ts, rcs = make_task() - - - del(p2ts) - del(p2rcs) - assert p2.num_tasks() == 0 - del(p2) + ts, rcs = make_task() + + # connect verifier channel as feeder of the second pool + p2 = ThreadPool(0) # don't spawn new threads, they have the tendency not to wake up on mutexes + assert p2.size() == 0 + p2ts, p2rcs = self._add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) + assert p2ts[0] is None # we have no feeder task + assert rcs[-1].pool_ref()() is pool # it didnt change the pool + assert rcs[-1] is p2ts[1].reader() + assert p2.num_tasks() == len(p2ts)-1 # first is None + + # reading from the last one will evaluate all pools correctly + print "read(0) multi-pool" + items = p2rcs[-1].read() + assert len(items) == ni + + # now that both are connected, I can drop my handle to the reader + # without affecting the task-count, but whats more important: + # They remove their tasks correctly once we drop our references in the + # right order + del(p2ts) + assert p2rcs[0] is rcs[-1] + del(p2rcs) + assert p2.num_tasks() == 0 + del(p2) + + assert pool.num_tasks() == null_tasks + len(ts) + + + del(ts) + del(rcs) + + assert pool.num_tasks() == null_tasks + + + # TODO: Test multi-read(1) # in the end, we expect all tasks to be gone, automatically # order of deletion doesnt matter - del(ts) - del(rcs) - assert pool.num_tasks() == null_tasks + + @@ -494,17 +542,28 @@ class TestThreadPool(TestBase): # SINGLE TASK SERIAL SYNC MODE ############################## - # put a few unrelated tasks that we forget about - urc1 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None)) - urc2 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None)) + # put a few unrelated tasks that we forget about - check ref counts and cleanup + t1, t2 = TestThreadTaskNode(iter(list()), "nothing1", None), TestThreadTaskNode(iter(list()), "nothing2", None) + urc1 = p.add_task(t1) + urc2 = p.add_task(t2) assert p.num_tasks() == 2 ## SINGLE TASK ################# self._assert_single_task(p, False) assert p.num_tasks() == 2 del(urc1) - del(urc2) + assert p.num_tasks() == 1 + + p.remove_task(t2) + assert p.num_tasks() == 0 + assert sys.getrefcount(t2) == 2 + + t3 = TestThreadInputChannelTaskNode(urc2, "channel", None) + urc3 = p.add_task(t3) + assert p.num_tasks() == 1 + del(urc3) assert p.num_tasks() == 0 + assert sys.getrefcount(t3) == 2 # DEPENDENT TASKS SYNC MODE |