summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py30
1 files changed, 14 insertions, 16 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 1767c61c..7bddf7da 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -53,9 +53,8 @@ class RPoolChannel(RChannel):
# I can't explain, but appears to be normal in the destructor
# On the caller side, getrefcount returns 2, as expected
if sys.getrefcount(self) < 6:
- print "__del__"
- self._pool.del_task(self._task)
- print "done"
+ self._pool.remove_task(self._task)
+ # END handle refcount based removal of task
def set_pre_cb(self, fun = lambda count: None):
"""Install a callback to call with the item count to be read before any
@@ -237,12 +236,14 @@ class Pool(object):
# the list includes our tasks - the first one to evaluate first, the
# requested one last
for task in dfirst_tasks:
- if task.error() or task.is_done():
+ # if task.error() or task.is_done():
# in theory, the should never be consumed task in the pool, right ?
- # They delete themselves once they are done.
- # TODO: remove this check for performance later
- raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?")
- #continue
+ # They delete themselves once they are done. But as we run asynchronously,
+ # It can be that someone reads, while a task realizes its done, and
+ # we get here to prepare the read although it already is done.
+ # Its not a problem though, the task wiill not do anything.
+ # Hence we don't waste our time with checking for it
+ # raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?")
# END skip processing
# if the task does not have the required output on its queue, schedule
@@ -316,11 +317,11 @@ class Pool(object):
"""Called after we processed a read to cleanup"""
pass
- def _del_task_if_orphaned(self, task):
+ def _remove_task_if_orphaned(self, task):
"""Check the task, and delete it if it is orphaned"""
# 1 as its stored on the task, 1 for the getrefcount call
if sys.getrefcount(task._out_wc) < 3:
- self.del_task(task)
+ self.remove_task(task)
#} END internal
#{ Interface
@@ -351,7 +352,6 @@ class Pool(object):
# Just adding more workers is not a problem at all.
add_count = size - cur_count
for i in range(add_count):
- print "Add worker"
self.WorkerCls(self._queue).start()
# END for each new worker to create
self._num_workers += add_count
@@ -361,7 +361,6 @@ class Pool(object):
# could be added as we speak.
del_count = cur_count - size
for i in range(del_count):
- print "stop worker"
self._queue.put((self.WorkerCls.stop, True)) # arg doesnt matter
# END for each thread to stop
self._num_workers -= del_count
@@ -390,7 +389,7 @@ class Pool(object):
finally:
self._taskgraph_lock.release()
- def del_task(self, task):
+ def remove_task(self, task):
"""Delete the task
Additionally we will remove orphaned tasks, which can be identified if their
output channel is only held by themselves, so no one will ever consume
@@ -399,7 +398,6 @@ class Pool(object):
This method blocks until all tasks to be removed have been processed, if
they are currently being processed.
:return: self"""
- print "del_task: getting lock"
self._taskgraph_lock.acquire()
try:
# it can be that the task is already deleted, but its chunk was on the
@@ -407,7 +405,7 @@ class Pool(object):
if not task in self._tasks.nodes:
return self
# END early abort
- print "deleting ", id(task)
+
# the task we are currently deleting could also be processed by
# a thread right now. We don't care about it as its taking care about
# its write channel itself, and sends everything it can to it.
@@ -426,7 +424,7 @@ class Pool(object):
# END locked deletion
for t in in_tasks:
- self._del_task_if_orphaned(t)
+ self._remove_task_if_orphaned(t)
# END handle orphans recursively
return self