summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-10 15:38:40 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-10 15:38:40 +0200
commit7c36f3648e39ace752c67c71867693ce1eee52a3 (patch)
tree0e7a63fc98c40dfced395ab73dfc44c5e176ad42 /lib/git/async/pool.py
parent55e757928e493ce93056822d510482e4ffcaac2d (diff)
downloadgitpython-7c36f3648e39ace752c67c71867693ce1eee52a3.tar.gz
Now tracking the amount of concurrent writers to assure the channel is closed only when there is no one else writing to it. This assures that all tasks can continue working, and put their results accordingly. Shutdown is still not working correctly, but that should be solvable as well. Its still not perfect though ...
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py24
1 files changed, 14 insertions, 10 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 68551ea3..3fd99c7b 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -33,13 +33,12 @@ import new
class PoolReader(CallbackReader):
"""A reader designed to read from channels which take part in pools
It acts like a handle to the underlying task in the pool."""
- __slots__ = ('_task_ref', '_pool_ref', '_read')
+ __slots__ = ('_task_ref', '_pool_ref')
def __init__(self, channel, task, pool):
CallbackReader.__init__(self, channel)
self._task_ref = weakref.ref(task)
self._pool_ref = weakref.ref(pool)
- self._read = new.instancemethod(CallbackReader.__dict__['read'], self, CallbackReader)
def __del__(self):
"""Assures that our task will be deleted if we were the last reader"""
@@ -62,11 +61,16 @@ class PoolReader(CallbackReader):
# it has no way of knowing that the write channel is about to diminsh.
# which is why we pass the info as a private kwarg - not nice, but
# okay for now
- # TODO: Fix this - private/public method
if sys.getrefcount(self) < 6:
- pool.remove_task(task)
+ pool.remove_task(task, _from_destructor_ = True)
# END handle refcount based removal of task
+ #{ Internal
+ def _read(self, count=0, block=True, timeout=None):
+ return CallbackReader.read(self, count, block, timeout)
+
+ #} END internal
+
#{ Interface
def pool_ref(self):
@@ -261,7 +265,7 @@ class Pool(object):
if self._num_workers:
# respect the chunk size, and split the task up if we want
# to process too much. This can be defined per task
- qput = self._queue
+ qput = self._queue.put
if numchunks > 1:
for i in xrange(numchunks):
qput((task.process, chunksize))
@@ -290,16 +294,16 @@ class Pool(object):
# END for each task to process
- def _remove_task_if_orphaned(self, task):
+ def _remove_task_if_orphaned(self, task, from_destructor):
"""Check the task, and delete it if it is orphaned"""
# 1 for writer on task, 1 for the getrefcount call + 1 for each other writer/reader
# If we are getting here from the destructor of an RPool channel,
# its totally valid to virtually decrement the refcount by 1 as
# we can expect it to drop once the destructor completes, which is when
# we finish all recursive calls
- max_ref_count = 3
+ max_ref_count = 3 + from_destructor
if sys.getrefcount(task.writer().channel) < max_ref_count:
- self.remove_task(task)
+ self.remove_task(task, from_destructor)
#} END internal
#{ Interface
@@ -370,7 +374,7 @@ class Pool(object):
finally:
self._taskgraph_lock.release()
- def remove_task(self, task):
+ def remove_task(self, task, _from_destructor_ = False):
"""Delete the task
Additionally we will remove orphaned tasks, which can be identified if their
output channel is only held by themselves, so no one will ever consume
@@ -405,7 +409,7 @@ class Pool(object):
# END locked deletion
for t in in_tasks:
- self._remove_task_if_orphaned(t)
+ self._remove_task_if_orphaned(t, _from_destructor_)
# END handle orphans recursively
return self