diff options
-rw-r--r-- | lib/git/async/graph.py | 14 | ||||
-rw-r--r-- | lib/git/async/pool.py | 2 | ||||
-rw-r--r-- | lib/git/async/task.py | 10 | ||||
-rw-r--r-- | lib/git/async/thread.py | 43 | ||||
-rw-r--r-- | test/git/async/test_graph.py | 16 | ||||
-rw-r--r-- | test/git/async/test_pool.py | 129 |
6 files changed, 155 insertions, 59 deletions
diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py index e3999cdc..9ee0e891 100644 --- a/lib/git/async/graph.py +++ b/lib/git/async/graph.py @@ -25,14 +25,24 @@ class Graph(object): def __init__(self): self.nodes = list() + + def __del__(self): + """Deletes bidericational dependencies""" + for node in self.nodes: + node.in_nodes = None + node.out_nodes = None + # END cleanup nodes + + # otherwise the nodes would keep floating around + def add_node(self, node): """Add a new node to the graph :return: the newly added node""" self.nodes.append(node) return node - def del_node(self, node): + def remove_node(self, node): """Delete a node from the graph :return: self""" try: @@ -46,6 +56,8 @@ class Graph(object): del(outn.in_nodes[outn.in_nodes.index(node)]) for inn in node.in_nodes: del(inn.out_nodes[inn.out_nodes.index(node)]) + node.out_nodes = list() + node.in_nodes = list() return self def add_edge(self, u, v): diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 3fd99c7b..0aad90ae 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -402,7 +402,7 @@ class Pool(object): # keep its input nodes as we check whether they were orphaned in_tasks = task.in_nodes - self._tasks.del_node(task) + self._tasks.remove_node(task) self._taskorder_cache.clear() finally: self._taskgraph_lock.release() diff --git a/lib/git/async/task.py b/lib/git/async/task.py index a8ba5ac6..49e7e7cf 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -82,7 +82,8 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" - # first thing: increment the writer count + # first thing: increment the writer count - other tasks must be able + # to respond properly ( even if it turns out we don't need it later ) self._wlock.acquire() self._num_writers += 1 self._wlock.release() @@ -191,7 +192,11 @@ class InputIteratorTaskBase(OutputChannelTask): raise ValueError("Iterator %r needs a next() function" % iterator) self._iterator = iterator self._lock = self.lock_type() - self._read = self.__read + + # this is necessary to prevent a cyclic ref, preventing us from + # getting deleted ( and collected ) + weakself = weakref.ref(self) + self._read = lambda count: weakself().__read(count) self._empty = False def __read(self, count=0): @@ -201,6 +206,7 @@ class InputIteratorTaskBase(OutputChannelTask): if self._empty: return list() # END early abort + self._lock.acquire() try: if count == 0: diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index faeda04f..b8d2e418 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -116,7 +116,7 @@ class WorkerThread(TerminatableThread): t[1] = optional, tuple or list of arguments to pass to the routine t[2] = optional, dictionary of keyword arguments to pass to the routine """ - __slots__ = ('inq', '_current_routine') + __slots__ = ('inq') # define how often we should check for a shutdown request in case our @@ -128,7 +128,6 @@ class WorkerThread(TerminatableThread): self.inq = inq if inq is None: self.inq = Queue.Queue() - self._current_routine = None # routine we execute right now @classmethod def stop(cls, *args): @@ -141,7 +140,6 @@ class WorkerThread(TerminatableThread): gettask = self.inq.get while True: - self._current_routine = None if self._should_terminate(): break # END check for stop request @@ -153,22 +151,27 @@ class WorkerThread(TerminatableThread): assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need" routine, arg = tasktuple - self._current_routine = routine - try: - rval = None - if inspect.ismethod(routine): - if routine.im_self is None: - rval = routine(self, arg) - else: + try: + rval = None + if inspect.ismethod(routine): + if routine.im_self is None: + rval = routine(self, arg) + else: + rval = routine(arg) + elif inspect.isroutine(routine): rval = routine(arg) - elif inspect.isroutine(routine): - rval = routine(arg) - else: - # ignore unknown items - print >> sys.stderr, "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) - break - # END make routine call + else: + # ignore unknown items + print >> sys.stderr, "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) + break + # END make routine call + finally: + # make sure we delete the routine to release the reference as soon + # as possible. Otherwise objects might not be destroyed + # while we are waiting + del(routine) + del(tasktuple) except StopProcessing: print self.name, "stops processing" # DEBUG break @@ -176,12 +179,10 @@ class WorkerThread(TerminatableThread): print >> sys.stderr, "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) continue # just continue # END routine exception handling + + # END handle routine release # END endless loop - def routine(self): - """:return: routine we are currently executing, or None if we have no task""" - return self._current_routine - def stop_and_join(self): """Send stop message to ourselves""" self.inq.put((self.stop, None)) diff --git a/test/git/async/test_graph.py b/test/git/async/test_graph.py index d0e36159..7630226b 100644 --- a/test/git/async/test_graph.py +++ b/test/git/async/test_graph.py @@ -3,6 +3,7 @@ from test.testlib import * from git.async.graph import * import time +import sys class TestGraph(TestBase): @@ -19,7 +20,7 @@ class TestGraph(TestBase): # delete unconnected nodes for n in g.nodes[:]: - g.del_node(n) + g.remove_node(n) # END del nodes # add a chain of connected nodes @@ -54,8 +55,8 @@ class TestGraph(TestBase): # deleting a connected node clears its neighbour connections assert n3.in_nodes[0] is n2 - assert g.del_node(n2) is g - assert g.del_node(n2) is g # multi-deletion okay + assert g.remove_node(n2) is g + assert g.remove_node(n2) is g # multi-deletion okay assert len(g.nodes) == nn - 1 assert len(n3.in_nodes) == 0 assert len(n1.out_nodes) == 0 @@ -68,3 +69,12 @@ class TestGraph(TestBase): assert dfirst_nodes[-1] == end and dfirst_nodes[-2].id == end.id-1 + # test cleanup + # its at least kept by its graph + assert sys.getrefcount(end) > 3 + del(g) + del(n1); del(n2); del(n3) + del(dfirst_nodes) + del(last) + del(n) + assert sys.getrefcount(end) == 2 diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 7cb94a86..4851f61b 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -5,6 +5,7 @@ from git.async.task import * from git.async.thread import terminate_threads from git.async.util import cpu_count import threading +import weakref import time import sys @@ -42,7 +43,9 @@ class _TestTaskBase(object): print self.item_count, fc assert self.item_count == fc self.lock.release() + self._wlock.acquire() assert self._num_writers == 0 + self._wlock.release() return self @@ -122,31 +125,47 @@ class TestThreadPool(TestBase): max_threads = cpu_count() - def _add_task_chain(self, p, ni, count=1, fail_setup=list()): + + def _make_proxy_method(self, t): + """required to prevent binding self into the method we call""" + wt = weakref.proxy(t) + return lambda item: wt.do_fun(item) + + def _add_task_chain(self, p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0): """Create a task chain of feeder, count transformers and order verifcator to the pool p, like t1 -> t2 -> t3 :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would make the third transformer fail after 20 items + :param feeder_channel: if set to a channel, it will be used as input of the + first transformation task. The respective first task in the return value + will be None. + :param id_offset: defines the id of the first transformation task, all subsequent + ones will add one :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))""" nt = p.num_tasks() - feeder = self._make_iterator_task(ni) - frc = p.add_task(feeder) - - assert p.num_tasks() == nt + 1 + feeder = None + frc = feeder_channel + if feeder_channel is None: + feeder = self._make_iterator_task(ni) + frc = p.add_task(feeder) + # END handle specific feeder rcs = [frc] tasks = [feeder] + make_proxy_method = self._make_proxy_method + inrc = frc for tc in xrange(count): - t = TestThreadInputChannelTaskNode(inrc, tc, None) - t.fun = t.do_fun + t = TestThreadInputChannelTaskNode(inrc, tc+id_offset, None) + + t.fun = make_proxy_method(t) + #t.fun = t.do_fun inrc = p.add_task(t) tasks.append(t) rcs.append(inrc) - assert p.num_tasks() == nt + 2 + tc # END create count transformers # setup failure @@ -155,10 +174,10 @@ class TestThreadPool(TestBase): # END setup failure verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None) - verifier.fun = verifier.do_fun + #verifier.fun = verifier.do_fun + verifier.fun = make_proxy_method(verifier) vrc = p.add_task(verifier) - assert p.num_tasks() == nt + tc + 3 tasks.append(verifier) rcs.append(vrc) @@ -169,7 +188,7 @@ class TestThreadPool(TestBase): :param taskcls: the actual iterator type to use :param **kwargs: additional kwargs to be passed to the task""" t = taskcls(iter(range(ni)), 'iterator', None, **kwargs) - t.fun = t.do_fun + t.fun = self._make_proxy_method(t) return t def _assert_single_task(self, p, async=False): @@ -385,6 +404,14 @@ class TestThreadPool(TestBase): assert len(items) == ni del(rcs) assert pool.num_tasks() == 0 # tasks depleted, all done, no handles + # wait a tiny moment - there could still be something unprocessed on the + # queue, increasing the refcount + time.sleep(0.15) + import gc + print gc.get_referrers(ts[-1]) + print len(pool._queue) + assert sys.getrefcount(ts[-1]) == 2 # ts + call + assert sys.getrefcount(ts[0]) == 2 # ts + call print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed) @@ -444,25 +471,53 @@ class TestThreadPool(TestBase): # MULTI-POOL # If two pools are connected, this shold work as well. # The second one has just one more thread - if False: - p2 = ThreadPool(1) - assert p2.size() == 1 - p2ts, p2rcs = self._add_task_chain(p2, ni, count) - - ts, rcs = make_task() - - - del(p2ts) - del(p2rcs) - assert p2.num_tasks() == 0 - del(p2) - - # in the end, we expect all tasks to be gone, automatically - # order of deletion doesnt matter + ts, rcs = make_task() + + # connect verifier channel as feeder of the second pool + p2 = ThreadPool(1) + assert p2.size() == 1 + p2ts, p2rcs = self._add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) + assert p2ts[0] is None # we have no feeder task + assert rcs[-1].pool_ref()() is pool # it didnt change the pool + assert rcs[-1] is p2ts[1].reader() + assert p2.num_tasks() == len(p2ts)-1 # first is None + + # reading from the last one will evaluate all pools correctly + print "read(0) multi-pool" + items = p2rcs[-1].read() + assert len(items) == ni + + # now that both are connected, I can drop my handle to the reader + # without affecting the task-count, but whats more important: + # They remove their tasks correctly once we drop our references in the + # right order + del(p2ts) + assert p2rcs[0] is rcs[-1] + del(p2rcs) + assert p2.num_tasks() == 0 + del(p2) + + assert pool.num_tasks() == null_tasks + len(ts) + + del(ts) + print "del rcs" + print rcs[-1] + print sys.getrefcount(rcs[-1]) del(rcs) + # TODO: make this work - something with the refcount goes wrong, + # they never get cleaned up properly + ts = pool._tasks.nodes print pool.num_tasks() - assert pool.num_tasks() == null_tasks + assert pool.num_tasks() == null_tasks + + + # TODO: Test multi-read(1) + + # in the end, we expect all tasks to be gone, automatically + # order of deletion doesnt matter + + @@ -496,17 +551,28 @@ class TestThreadPool(TestBase): # SINGLE TASK SERIAL SYNC MODE ############################## - # put a few unrelated tasks that we forget about - urc1 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None)) - urc2 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None)) + # put a few unrelated tasks that we forget about - check ref counts and cleanup + t1, t2 = TestThreadTaskNode(iter(list()), "nothing1", None), TestThreadTaskNode(iter(list()), "nothing2", None) + urc1 = p.add_task(t1) + urc2 = p.add_task(t2) assert p.num_tasks() == 2 ## SINGLE TASK ################# self._assert_single_task(p, False) assert p.num_tasks() == 2 del(urc1) - del(urc2) + assert p.num_tasks() == 1 + + p.remove_task(t2) + assert p.num_tasks() == 0 + assert sys.getrefcount(t2) == 2 + + t3 = TestThreadInputChannelTaskNode(urc2, "channel", None) + urc3 = p.add_task(t3) + assert p.num_tasks() == 1 + del(urc3) assert p.num_tasks() == 0 + assert sys.getrefcount(t3) == 2 # DEPENDENT TASKS SYNC MODE @@ -519,6 +585,7 @@ class TestThreadPool(TestBase): # step one gear up - just one thread for now. p.set_size(1) assert p.size() == 1 + print len(threading.enumerate()) assert len(threading.enumerate()) == num_threads + 1 # deleting the pool stops its threads - just to be sure ;) # Its not synchronized, hence we wait a moment |