summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-07 20:01:02 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-07 20:01:02 +0200
commit654e54d200135e665e07e9f0097d913a77f169da (patch)
treecec8f92af95dc773985e824f6f6bca136f1a0480
parent583cd8807259a69fc01874b798f657c1f9ab7828 (diff)
downloadgitpython-654e54d200135e665e07e9f0097d913a77f169da.tar.gz
task: Fixed incorrect handling of channel closure. Performance is alright for up to 2 threads, but 4 are killing the queue
-rw-r--r--lib/git/async/pool.py40
-rw-r--r--lib/git/async/task.py24
-rw-r--r--test/git/async/test_pool.py4
3 files changed, 43 insertions, 25 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 227cabfc..3de98777 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -21,6 +21,7 @@ from channel import (
)
import sys
+from time import sleep
class RPoolChannel(RChannel):
@@ -371,32 +372,27 @@ class Pool(object):
This method blocks until all tasks to be removed have been processed, if
they are currently being processed.
:return: self"""
- # now delete our actual node - must set it done os it closes its channels.
- # Otherwise further reads of output tasks will block.
- # Actually they may still block if anyone wants to read all ... without
- # a timeout
- # keep its input nodes as we check whether they were orphaned
- in_tasks = task.in_nodes
- task.set_done()
self._taskgraph_lock.acquire()
try:
- self._taskorder_cache.clear()
- # before we can delete the task, make sure its write channel
- # is closed, otherwise people might still be waiting for its result.
- # If a channel is not closed, this could also mean its not yet fully
- # processed, but more importantly, there must be no task being processed
- # right now.
- # TODO: figure this out
- for worker in self._workers:
- r = worker.routine()
- if r and r.im_self is task:
- raise NotImplementedError("todo")
- # END handle running task
- # END check for in-progress routine
+ # it can be that the task is already deleted, but its chunk was on the
+ # queue until now, so its marked consumed again
+ if not task in self._tasks.nodes:
+ return self
+ # END early abort
+
+ # the task we are currently deleting could also be processed by
+ # a thread right now. We don't care about it as its taking care about
+ # its write channel itself, and sends everything it can to it.
+ # For it it doesn't matter that its not part of our task graph anymore.
+
+ # now delete our actual node - be sure its done to prevent further
+ # processing in case there are still client reads on their way.
+ task.set_done()
- # its done, close the channel for writing
- task.close()
+ # keep its input nodes as we check whether they were orphaned
+ in_tasks = task.in_nodes
self._tasks.del_node(task)
+ self._taskorder_cache.clear()
finally:
self._taskgraph_lock.release()
# END locked deletion
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index f106c381..b282e371 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -2,6 +2,7 @@ from graph import Node
import threading
import weakref
+import sys
import new
class OutputChannelTask(Node):
@@ -44,7 +45,6 @@ class OutputChannelTask(Node):
def set_done(self):
"""Set ourselves to being done, has we have completed the processing"""
self._done = True
- self.close()
def set_wc(self, wc):
"""Set the write channel to the given one
@@ -69,17 +69,25 @@ class OutputChannelTask(Node):
"""Process count items and send the result individually to the output channel"""
items = self._read(count)
try:
+ # increase the ref-count - we use this to determine whether anyone else
+ # is currently handling our output channel. As this method runs asynchronously,
+ # we have to make sure that the channel is closed by the last finishing task,
+ # which is not necessarily the one which determines that he is done
+ # as he couldn't read anymore items.
+ # The refcount will be dropped in the moment we get out of here.
+ wc = self._out_wc
if self.apply_single:
for item in items:
- self._out_wc.write(self.fun(item))
+ wc.write(self.fun(item))
# END for each item
else:
- self._out_wc.write(self.fun(items))
+ wc.write(self.fun(items))
# END handle single apply
except Exception, e:
self._exc = e
self.set_done()
# END exception handling
+ del(wc)
# if we didn't get all demanded items, which is also the case if count is 0
# we have depleted the input channel and are done
@@ -89,6 +97,16 @@ class OutputChannelTask(Node):
if not items or len(items) != count:
self.set_done()
# END handle done state
+
+ # If we appear to be the only one left with our output channel, and are
+ # closed ( this could have been set in another thread as well ), make
+ # sure to close the output channel.
+ # The count is: 1 = wc itself, 2 = first reader channel, and we have only
+ # one, 3 is ours + x for every thread having its copy on the stack
+ # + 1 for the instance we provide to refcount
+ if self.is_done() and sys.getrefcount(self._out_wc) < 5:
+ self.close()
+ # END handle channel closure
#{ Configuration
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
index 791f89d4..19e86a9a 100644
--- a/test/git/async/test_pool.py
+++ b/test/git/async/test_pool.py
@@ -246,6 +246,10 @@ class TestThreadPool(TestBase):
p.set_size(2)
self._assert_single_task(p, True)
+ # kill it
+ p.set_size(4)
+ self._assert_single_task(p, True)
+
# DEPENDENT TASK ASYNC MODE
###########################
self._assert_async_dependent_tasks(p)