diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-07 20:01:02 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-07 20:01:02 +0200 |
commit | 654e54d200135e665e07e9f0097d913a77f169da (patch) | |
tree | cec8f92af95dc773985e824f6f6bca136f1a0480 | |
parent | 583cd8807259a69fc01874b798f657c1f9ab7828 (diff) | |
download | gitpython-654e54d200135e665e07e9f0097d913a77f169da.tar.gz |
task: Fixed incorrect handling of channel closure. Performance is alright for up to 2 threads, but 4 are killing the queue
-rw-r--r-- | lib/git/async/pool.py | 40 | ||||
-rw-r--r-- | lib/git/async/task.py | 24 | ||||
-rw-r--r-- | test/git/async/test_pool.py | 4 |
3 files changed, 43 insertions, 25 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 227cabfc..3de98777 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -21,6 +21,7 @@ from channel import ( ) import sys +from time import sleep class RPoolChannel(RChannel): @@ -371,32 +372,27 @@ class Pool(object): This method blocks until all tasks to be removed have been processed, if they are currently being processed. :return: self""" - # now delete our actual node - must set it done os it closes its channels. - # Otherwise further reads of output tasks will block. - # Actually they may still block if anyone wants to read all ... without - # a timeout - # keep its input nodes as we check whether they were orphaned - in_tasks = task.in_nodes - task.set_done() self._taskgraph_lock.acquire() try: - self._taskorder_cache.clear() - # before we can delete the task, make sure its write channel - # is closed, otherwise people might still be waiting for its result. - # If a channel is not closed, this could also mean its not yet fully - # processed, but more importantly, there must be no task being processed - # right now. - # TODO: figure this out - for worker in self._workers: - r = worker.routine() - if r and r.im_self is task: - raise NotImplementedError("todo") - # END handle running task - # END check for in-progress routine + # it can be that the task is already deleted, but its chunk was on the + # queue until now, so its marked consumed again + if not task in self._tasks.nodes: + return self + # END early abort + + # the task we are currently deleting could also be processed by + # a thread right now. We don't care about it as its taking care about + # its write channel itself, and sends everything it can to it. + # For it it doesn't matter that its not part of our task graph anymore. + + # now delete our actual node - be sure its done to prevent further + # processing in case there are still client reads on their way. + task.set_done() - # its done, close the channel for writing - task.close() + # keep its input nodes as we check whether they were orphaned + in_tasks = task.in_nodes self._tasks.del_node(task) + self._taskorder_cache.clear() finally: self._taskgraph_lock.release() # END locked deletion diff --git a/lib/git/async/task.py b/lib/git/async/task.py index f106c381..b282e371 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -2,6 +2,7 @@ from graph import Node import threading import weakref +import sys import new class OutputChannelTask(Node): @@ -44,7 +45,6 @@ class OutputChannelTask(Node): def set_done(self): """Set ourselves to being done, has we have completed the processing""" self._done = True - self.close() def set_wc(self, wc): """Set the write channel to the given one @@ -69,17 +69,25 @@ class OutputChannelTask(Node): """Process count items and send the result individually to the output channel""" items = self._read(count) try: + # increase the ref-count - we use this to determine whether anyone else + # is currently handling our output channel. As this method runs asynchronously, + # we have to make sure that the channel is closed by the last finishing task, + # which is not necessarily the one which determines that he is done + # as he couldn't read anymore items. + # The refcount will be dropped in the moment we get out of here. + wc = self._out_wc if self.apply_single: for item in items: - self._out_wc.write(self.fun(item)) + wc.write(self.fun(item)) # END for each item else: - self._out_wc.write(self.fun(items)) + wc.write(self.fun(items)) # END handle single apply except Exception, e: self._exc = e self.set_done() # END exception handling + del(wc) # if we didn't get all demanded items, which is also the case if count is 0 # we have depleted the input channel and are done @@ -89,6 +97,16 @@ class OutputChannelTask(Node): if not items or len(items) != count: self.set_done() # END handle done state + + # If we appear to be the only one left with our output channel, and are + # closed ( this could have been set in another thread as well ), make + # sure to close the output channel. + # The count is: 1 = wc itself, 2 = first reader channel, and we have only + # one, 3 is ours + x for every thread having its copy on the stack + # + 1 for the instance we provide to refcount + if self.is_done() and sys.getrefcount(self._out_wc) < 5: + self.close() + # END handle channel closure #{ Configuration diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 791f89d4..19e86a9a 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -246,6 +246,10 @@ class TestThreadPool(TestBase): p.set_size(2) self._assert_single_task(p, True) + # kill it + p.set_size(4) + self._assert_single_task(p, True) + # DEPENDENT TASK ASYNC MODE ########################### self._assert_async_dependent_tasks(p) |