summaryrefslogtreecommitdiff
path: root/lib/git
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git')
-rw-r--r--lib/git/async/pool.py55
-rw-r--r--lib/git/async/task.py7
2 files changed, 44 insertions, 18 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index f7c1cfe0..2ec18f1a 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -25,6 +25,7 @@ from channel import (
)
import sys
+import weakref
from time import sleep
@@ -33,25 +34,37 @@ class RPoolChannel(CallbackRChannel):
before and after an item is to be read.
It acts like a handle to the underlying task in the pool."""
- __slots__ = ('_task', '_pool')
+ __slots__ = ('_task_ref', '_pool_ref')
def __init__(self, wchannel, task, pool):
CallbackRChannel.__init__(self, wchannel)
- self._task = task
- self._pool = pool
+ self._task_ref = weakref.ref(task)
+ self._pool_ref = weakref.ref(pool)
def __del__(self):
"""Assures that our task will be deleted if we were the last reader"""
- del(self._wc) # decrement ref-count early
- # now, if this is the last reader to the wc we just handled, there
+ task = self._task_ref()
+ if task is None:
+ return
+
+ pool = self._pool_ref()
+ if pool is None:
+ return
+
+ # if this is the last reader to the wc we just handled, there
# is no way anyone will ever read from the task again. If so,
# delete the task in question, it will take care of itself and orphans
# it might leave
# 1 is ourselves, + 1 for the call + 1, and 3 magical ones which
# I can't explain, but appears to be normal in the destructor
# On the caller side, getrefcount returns 2, as expected
+ # When just calling remove_task,
+ # it has no way of knowing that the write channel is about to diminsh.
+ # which is why we pass the info as a private kwarg - not nice, but
+ # okay for now
+ # TODO: Fix this - private/public method
if sys.getrefcount(self) < 6:
- self._pool.remove_task(self._task)
+ pool.remove_task(task, _from_destructor_=True)
# END handle refcount based removal of task
def read(self, count=0, block=True, timeout=None):
@@ -72,11 +85,16 @@ class RPoolChannel(CallbackRChannel):
# if the user tries to use us to read from a done task, we will never
# compute as all produced items are already in the channel
- skip_compute = self._task.is_done() or self._task.error()
+ task = self._task_ref()
+ if task is None:
+ return list()
+ # END abort if task was deleted
+
+ skip_compute = task.is_done() or task.error()
########## prepare ##############################
if not skip_compute:
- self._pool._prepare_channel_read(self._task, count)
+ self._pool_ref()._prepare_channel_read(task, count)
# END prepare pool scheduling
@@ -261,11 +279,16 @@ class Pool(object):
# END for each task to process
- def _remove_task_if_orphaned(self, task):
+ def _remove_task_if_orphaned(self, task, from_destructor):
"""Check the task, and delete it if it is orphaned"""
# 1 as its stored on the task, 1 for the getrefcount call
- if sys.getrefcount(task._out_wc) < 3:
- self.remove_task(task)
+ # If we are getting here from the destructor of an RPool channel,
+ # its totally valid to virtually decrement the refcount by 1 as
+ # we can expect it to drop once the destructor completes, which is when
+ # we finish all recursive calls
+ max_ref_count = 3 + from_destructor
+ if sys.getrefcount(task.wchannel()) < max_ref_count:
+ self.remove_task(task, from_destructor)
#} END internal
#{ Interface
@@ -335,7 +358,7 @@ class Pool(object):
finally:
self._taskgraph_lock.release()
- def remove_task(self, task):
+ def remove_task(self, task, _from_destructor_=False):
"""Delete the task
Additionally we will remove orphaned tasks, which can be identified if their
output channel is only held by themselves, so no one will ever consume
@@ -370,7 +393,7 @@ class Pool(object):
# END locked deletion
for t in in_tasks:
- self._remove_task_if_orphaned(t)
+ self._remove_task_if_orphaned(t, _from_destructor_)
# END handle orphans recursively
return self
@@ -409,11 +432,11 @@ class Pool(object):
# If the input channel is one of our read channels, we add the relation
if isinstance(task, InputChannelTask):
- ic = task.in_rc
- if isinstance(ic, RPoolChannel) and ic._pool is self:
+ ic = task.rchannel()
+ if isinstance(ic, RPoolChannel) and ic._pool_ref() is self:
self._taskgraph_lock.acquire()
try:
- self._tasks.add_edge(ic._task, task)
+ self._tasks.add_edge(ic._task_ref(), task)
finally:
self._taskgraph_lock.release()
# END handle edge-adding
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index f98336b2..03b40492 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -208,5 +208,8 @@ class InputChannelTask(OutputChannelTask):
OutputChannelTask.__init__(self, *args, **kwargs)
self._read = in_rc.read
- #{ Configuration
-
+ def rchannel(self):
+ """:return: input channel from which we read"""
+ # the instance is bound in its instance method - lets use this to keep
+ # the refcount at one ( per consumer )
+ return self._read.im_self