diff options
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r-- | lib/git/async/pool.py | 24 |
1 files changed, 14 insertions, 10 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 68551ea3..3fd99c7b 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -33,13 +33,12 @@ import new class PoolReader(CallbackReader): """A reader designed to read from channels which take part in pools It acts like a handle to the underlying task in the pool.""" - __slots__ = ('_task_ref', '_pool_ref', '_read') + __slots__ = ('_task_ref', '_pool_ref') def __init__(self, channel, task, pool): CallbackReader.__init__(self, channel) self._task_ref = weakref.ref(task) self._pool_ref = weakref.ref(pool) - self._read = new.instancemethod(CallbackReader.__dict__['read'], self, CallbackReader) def __del__(self): """Assures that our task will be deleted if we were the last reader""" @@ -62,11 +61,16 @@ class PoolReader(CallbackReader): # it has no way of knowing that the write channel is about to diminsh. # which is why we pass the info as a private kwarg - not nice, but # okay for now - # TODO: Fix this - private/public method if sys.getrefcount(self) < 6: - pool.remove_task(task) + pool.remove_task(task, _from_destructor_ = True) # END handle refcount based removal of task + #{ Internal + def _read(self, count=0, block=True, timeout=None): + return CallbackReader.read(self, count, block, timeout) + + #} END internal + #{ Interface def pool_ref(self): @@ -261,7 +265,7 @@ class Pool(object): if self._num_workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task - qput = self._queue + qput = self._queue.put if numchunks > 1: for i in xrange(numchunks): qput((task.process, chunksize)) @@ -290,16 +294,16 @@ class Pool(object): # END for each task to process - def _remove_task_if_orphaned(self, task): + def _remove_task_if_orphaned(self, task, from_destructor): """Check the task, and delete it if it is orphaned""" # 1 for writer on task, 1 for the getrefcount call + 1 for each other writer/reader # If we are getting here from the destructor of an RPool channel, # its totally valid to virtually decrement the refcount by 1 as # we can expect it to drop once the destructor completes, which is when # we finish all recursive calls - max_ref_count = 3 + max_ref_count = 3 + from_destructor if sys.getrefcount(task.writer().channel) < max_ref_count: - self.remove_task(task) + self.remove_task(task, from_destructor) #} END internal #{ Interface @@ -370,7 +374,7 @@ class Pool(object): finally: self._taskgraph_lock.release() - def remove_task(self, task): + def remove_task(self, task, _from_destructor_ = False): """Delete the task Additionally we will remove orphaned tasks, which can be identified if their output channel is only held by themselves, so no one will ever consume @@ -405,7 +409,7 @@ class Pool(object): # END locked deletion for t in in_tasks: - self._remove_task_if_orphaned(t) + self._remove_task_if_orphaned(t, _from_destructor_) # END handle orphans recursively return self |