diff options
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r-- | lib/git/async/pool.py | 40 |
1 files changed, 18 insertions, 22 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 227cabfc..3de98777 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -21,6 +21,7 @@ from channel import ( ) import sys +from time import sleep class RPoolChannel(RChannel): @@ -371,32 +372,27 @@ class Pool(object): This method blocks until all tasks to be removed have been processed, if they are currently being processed. :return: self""" - # now delete our actual node - must set it done os it closes its channels. - # Otherwise further reads of output tasks will block. - # Actually they may still block if anyone wants to read all ... without - # a timeout - # keep its input nodes as we check whether they were orphaned - in_tasks = task.in_nodes - task.set_done() self._taskgraph_lock.acquire() try: - self._taskorder_cache.clear() - # before we can delete the task, make sure its write channel - # is closed, otherwise people might still be waiting for its result. - # If a channel is not closed, this could also mean its not yet fully - # processed, but more importantly, there must be no task being processed - # right now. - # TODO: figure this out - for worker in self._workers: - r = worker.routine() - if r and r.im_self is task: - raise NotImplementedError("todo") - # END handle running task - # END check for in-progress routine + # it can be that the task is already deleted, but its chunk was on the + # queue until now, so its marked consumed again + if not task in self._tasks.nodes: + return self + # END early abort + + # the task we are currently deleting could also be processed by + # a thread right now. We don't care about it as its taking care about + # its write channel itself, and sends everything it can to it. + # For it it doesn't matter that its not part of our task graph anymore. + + # now delete our actual node - be sure its done to prevent further + # processing in case there are still client reads on their way. + task.set_done() - # its done, close the channel for writing - task.close() + # keep its input nodes as we check whether they were orphaned + in_tasks = task.in_nodes self._tasks.del_node(task) + self._taskorder_cache.clear() finally: self._taskgraph_lock.release() # END locked deletion |