diff options
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r-- | lib/git/async/pool.py | 30 |
1 files changed, 14 insertions, 16 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 1767c61c..7bddf7da 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -53,9 +53,8 @@ class RPoolChannel(RChannel): # I can't explain, but appears to be normal in the destructor # On the caller side, getrefcount returns 2, as expected if sys.getrefcount(self) < 6: - print "__del__" - self._pool.del_task(self._task) - print "done" + self._pool.remove_task(self._task) + # END handle refcount based removal of task def set_pre_cb(self, fun = lambda count: None): """Install a callback to call with the item count to be read before any @@ -237,12 +236,14 @@ class Pool(object): # the list includes our tasks - the first one to evaluate first, the # requested one last for task in dfirst_tasks: - if task.error() or task.is_done(): + # if task.error() or task.is_done(): # in theory, the should never be consumed task in the pool, right ? - # They delete themselves once they are done. - # TODO: remove this check for performance later - raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") - #continue + # They delete themselves once they are done. But as we run asynchronously, + # It can be that someone reads, while a task realizes its done, and + # we get here to prepare the read although it already is done. + # Its not a problem though, the task wiill not do anything. + # Hence we don't waste our time with checking for it + # raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") # END skip processing # if the task does not have the required output on its queue, schedule @@ -316,11 +317,11 @@ class Pool(object): """Called after we processed a read to cleanup""" pass - def _del_task_if_orphaned(self, task): + def _remove_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" # 1 as its stored on the task, 1 for the getrefcount call if sys.getrefcount(task._out_wc) < 3: - self.del_task(task) + self.remove_task(task) #} END internal #{ Interface @@ -351,7 +352,6 @@ class Pool(object): # Just adding more workers is not a problem at all. add_count = size - cur_count for i in range(add_count): - print "Add worker" self.WorkerCls(self._queue).start() # END for each new worker to create self._num_workers += add_count @@ -361,7 +361,6 @@ class Pool(object): # could be added as we speak. del_count = cur_count - size for i in range(del_count): - print "stop worker" self._queue.put((self.WorkerCls.stop, True)) # arg doesnt matter # END for each thread to stop self._num_workers -= del_count @@ -390,7 +389,7 @@ class Pool(object): finally: self._taskgraph_lock.release() - def del_task(self, task): + def remove_task(self, task): """Delete the task Additionally we will remove orphaned tasks, which can be identified if their output channel is only held by themselves, so no one will ever consume @@ -399,7 +398,6 @@ class Pool(object): This method blocks until all tasks to be removed have been processed, if they are currently being processed. :return: self""" - print "del_task: getting lock" self._taskgraph_lock.acquire() try: # it can be that the task is already deleted, but its chunk was on the @@ -407,7 +405,7 @@ class Pool(object): if not task in self._tasks.nodes: return self # END early abort - print "deleting ", id(task) + # the task we are currently deleting could also be processed by # a thread right now. We don't care about it as its taking care about # its write channel itself, and sends everything it can to it. @@ -426,7 +424,7 @@ class Pool(object): # END locked deletion for t in in_tasks: - self._del_task_if_orphaned(t) + self._remove_task_if_orphaned(t) # END handle orphans recursively return self |