diff options
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r-- | lib/git/async/pool.py | 55 |
1 files changed, 39 insertions, 16 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index f7c1cfe0..2ec18f1a 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -25,6 +25,7 @@ from channel import ( ) import sys +import weakref from time import sleep @@ -33,25 +34,37 @@ class RPoolChannel(CallbackRChannel): before and after an item is to be read. It acts like a handle to the underlying task in the pool.""" - __slots__ = ('_task', '_pool') + __slots__ = ('_task_ref', '_pool_ref') def __init__(self, wchannel, task, pool): CallbackRChannel.__init__(self, wchannel) - self._task = task - self._pool = pool + self._task_ref = weakref.ref(task) + self._pool_ref = weakref.ref(pool) def __del__(self): """Assures that our task will be deleted if we were the last reader""" - del(self._wc) # decrement ref-count early - # now, if this is the last reader to the wc we just handled, there + task = self._task_ref() + if task is None: + return + + pool = self._pool_ref() + if pool is None: + return + + # if this is the last reader to the wc we just handled, there # is no way anyone will ever read from the task again. If so, # delete the task in question, it will take care of itself and orphans # it might leave # 1 is ourselves, + 1 for the call + 1, and 3 magical ones which # I can't explain, but appears to be normal in the destructor # On the caller side, getrefcount returns 2, as expected + # When just calling remove_task, + # it has no way of knowing that the write channel is about to diminsh. + # which is why we pass the info as a private kwarg - not nice, but + # okay for now + # TODO: Fix this - private/public method if sys.getrefcount(self) < 6: - self._pool.remove_task(self._task) + pool.remove_task(task, _from_destructor_=True) # END handle refcount based removal of task def read(self, count=0, block=True, timeout=None): @@ -72,11 +85,16 @@ class RPoolChannel(CallbackRChannel): # if the user tries to use us to read from a done task, we will never # compute as all produced items are already in the channel - skip_compute = self._task.is_done() or self._task.error() + task = self._task_ref() + if task is None: + return list() + # END abort if task was deleted + + skip_compute = task.is_done() or task.error() ########## prepare ############################## if not skip_compute: - self._pool._prepare_channel_read(self._task, count) + self._pool_ref()._prepare_channel_read(task, count) # END prepare pool scheduling @@ -261,11 +279,16 @@ class Pool(object): # END for each task to process - def _remove_task_if_orphaned(self, task): + def _remove_task_if_orphaned(self, task, from_destructor): """Check the task, and delete it if it is orphaned""" # 1 as its stored on the task, 1 for the getrefcount call - if sys.getrefcount(task._out_wc) < 3: - self.remove_task(task) + # If we are getting here from the destructor of an RPool channel, + # its totally valid to virtually decrement the refcount by 1 as + # we can expect it to drop once the destructor completes, which is when + # we finish all recursive calls + max_ref_count = 3 + from_destructor + if sys.getrefcount(task.wchannel()) < max_ref_count: + self.remove_task(task, from_destructor) #} END internal #{ Interface @@ -335,7 +358,7 @@ class Pool(object): finally: self._taskgraph_lock.release() - def remove_task(self, task): + def remove_task(self, task, _from_destructor_=False): """Delete the task Additionally we will remove orphaned tasks, which can be identified if their output channel is only held by themselves, so no one will ever consume @@ -370,7 +393,7 @@ class Pool(object): # END locked deletion for t in in_tasks: - self._remove_task_if_orphaned(t) + self._remove_task_if_orphaned(t, _from_destructor_) # END handle orphans recursively return self @@ -409,11 +432,11 @@ class Pool(object): # If the input channel is one of our read channels, we add the relation if isinstance(task, InputChannelTask): - ic = task.in_rc - if isinstance(ic, RPoolChannel) and ic._pool is self: + ic = task.rchannel() + if isinstance(ic, RPoolChannel) and ic._pool_ref() is self: self._taskgraph_lock.acquire() try: - self._tasks.add_edge(ic._task, task) + self._tasks.add_edge(ic._task_ref(), task) finally: self._taskgraph_lock.release() # END handle edge-adding |