summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-09 10:34:12 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-09 10:34:12 +0200
commit15941ca090a2c3c987324fc911bbc6f89e941c47 (patch)
tree3c508eb2e8be484e8685cddaa1de72826fbf9302 /lib/git/async/pool.py
parentf78d4a28f307a9d7943a06be9f919304c25ac2d9 (diff)
downloadgitpython-15941ca090a2c3c987324fc911bbc6f89e941c47.tar.gz
queue: fixed critical bug in the notify method, as it was not at all thread-safe, causing locks to be released multiple times. Now it runs very fast, and very stable apparently.
Now its about putting previous features back in, and studying their results, before more complex task graphs can be examined
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py30
1 files changed, 14 insertions, 16 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 1767c61c..7bddf7da 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -53,9 +53,8 @@ class RPoolChannel(RChannel):
# I can't explain, but appears to be normal in the destructor
# On the caller side, getrefcount returns 2, as expected
if sys.getrefcount(self) < 6:
- print "__del__"
- self._pool.del_task(self._task)
- print "done"
+ self._pool.remove_task(self._task)
+ # END handle refcount based removal of task
def set_pre_cb(self, fun = lambda count: None):
"""Install a callback to call with the item count to be read before any
@@ -237,12 +236,14 @@ class Pool(object):
# the list includes our tasks - the first one to evaluate first, the
# requested one last
for task in dfirst_tasks:
- if task.error() or task.is_done():
+ # if task.error() or task.is_done():
# in theory, the should never be consumed task in the pool, right ?
- # They delete themselves once they are done.
- # TODO: remove this check for performance later
- raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?")
- #continue
+ # They delete themselves once they are done. But as we run asynchronously,
+ # It can be that someone reads, while a task realizes its done, and
+ # we get here to prepare the read although it already is done.
+ # Its not a problem though, the task wiill not do anything.
+ # Hence we don't waste our time with checking for it
+ # raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?")
# END skip processing
# if the task does not have the required output on its queue, schedule
@@ -316,11 +317,11 @@ class Pool(object):
"""Called after we processed a read to cleanup"""
pass
- def _del_task_if_orphaned(self, task):
+ def _remove_task_if_orphaned(self, task):
"""Check the task, and delete it if it is orphaned"""
# 1 as its stored on the task, 1 for the getrefcount call
if sys.getrefcount(task._out_wc) < 3:
- self.del_task(task)
+ self.remove_task(task)
#} END internal
#{ Interface
@@ -351,7 +352,6 @@ class Pool(object):
# Just adding more workers is not a problem at all.
add_count = size - cur_count
for i in range(add_count):
- print "Add worker"
self.WorkerCls(self._queue).start()
# END for each new worker to create
self._num_workers += add_count
@@ -361,7 +361,6 @@ class Pool(object):
# could be added as we speak.
del_count = cur_count - size
for i in range(del_count):
- print "stop worker"
self._queue.put((self.WorkerCls.stop, True)) # arg doesnt matter
# END for each thread to stop
self._num_workers -= del_count
@@ -390,7 +389,7 @@ class Pool(object):
finally:
self._taskgraph_lock.release()
- def del_task(self, task):
+ def remove_task(self, task):
"""Delete the task
Additionally we will remove orphaned tasks, which can be identified if their
output channel is only held by themselves, so no one will ever consume
@@ -399,7 +398,6 @@ class Pool(object):
This method blocks until all tasks to be removed have been processed, if
they are currently being processed.
:return: self"""
- print "del_task: getting lock"
self._taskgraph_lock.acquire()
try:
# it can be that the task is already deleted, but its chunk was on the
@@ -407,7 +405,7 @@ class Pool(object):
if not task in self._tasks.nodes:
return self
# END early abort
- print "deleting ", id(task)
+
# the task we are currently deleting could also be processed by
# a thread right now. We don't care about it as its taking care about
# its write channel itself, and sends everything it can to it.
@@ -426,7 +424,7 @@ class Pool(object):
# END locked deletion
for t in in_tasks:
- self._del_task_if_orphaned(t)
+ self._remove_task_if_orphaned(t)
# END handle orphans recursively
return self