summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py24
1 files changed, 14 insertions, 10 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 68551ea3..3fd99c7b 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -33,13 +33,12 @@ import new
class PoolReader(CallbackReader):
"""A reader designed to read from channels which take part in pools
It acts like a handle to the underlying task in the pool."""
- __slots__ = ('_task_ref', '_pool_ref', '_read')
+ __slots__ = ('_task_ref', '_pool_ref')
def __init__(self, channel, task, pool):
CallbackReader.__init__(self, channel)
self._task_ref = weakref.ref(task)
self._pool_ref = weakref.ref(pool)
- self._read = new.instancemethod(CallbackReader.__dict__['read'], self, CallbackReader)
def __del__(self):
"""Assures that our task will be deleted if we were the last reader"""
@@ -62,11 +61,16 @@ class PoolReader(CallbackReader):
# it has no way of knowing that the write channel is about to diminsh.
# which is why we pass the info as a private kwarg - not nice, but
# okay for now
- # TODO: Fix this - private/public method
if sys.getrefcount(self) < 6:
- pool.remove_task(task)
+ pool.remove_task(task, _from_destructor_ = True)
# END handle refcount based removal of task
+ #{ Internal
+ def _read(self, count=0, block=True, timeout=None):
+ return CallbackReader.read(self, count, block, timeout)
+
+ #} END internal
+
#{ Interface
def pool_ref(self):
@@ -261,7 +265,7 @@ class Pool(object):
if self._num_workers:
# respect the chunk size, and split the task up if we want
# to process too much. This can be defined per task
- qput = self._queue
+ qput = self._queue.put
if numchunks > 1:
for i in xrange(numchunks):
qput((task.process, chunksize))
@@ -290,16 +294,16 @@ class Pool(object):
# END for each task to process
- def _remove_task_if_orphaned(self, task):
+ def _remove_task_if_orphaned(self, task, from_destructor):
"""Check the task, and delete it if it is orphaned"""
# 1 for writer on task, 1 for the getrefcount call + 1 for each other writer/reader
# If we are getting here from the destructor of an RPool channel,
# its totally valid to virtually decrement the refcount by 1 as
# we can expect it to drop once the destructor completes, which is when
# we finish all recursive calls
- max_ref_count = 3
+ max_ref_count = 3 + from_destructor
if sys.getrefcount(task.writer().channel) < max_ref_count:
- self.remove_task(task)
+ self.remove_task(task, from_destructor)
#} END internal
#{ Interface
@@ -370,7 +374,7 @@ class Pool(object):
finally:
self._taskgraph_lock.release()
- def remove_task(self, task):
+ def remove_task(self, task, _from_destructor_ = False):
"""Delete the task
Additionally we will remove orphaned tasks, which can be identified if their
output channel is only held by themselves, so no one will ever consume
@@ -405,7 +409,7 @@ class Pool(object):
# END locked deletion
for t in in_tasks:
- self._remove_task_if_orphaned(t)
+ self._remove_task_if_orphaned(t, _from_destructor_)
# END handle orphans recursively
return self