summaryrefslogtreecommitdiff
path: root/lib/git/async/task.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async/task.py')
-rw-r--r--lib/git/async/task.py22
1 files changed, 16 insertions, 6 deletions
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index 97521cae..dc207c33 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -23,6 +23,7 @@ class OutputChannelTask(Node):
'_done', # True if we are done
'_scheduled_items', # amount of scheduled items that will be processed in total
'_slock', # lock for scheduled items
+ '_pool_ref', # to be set by Pool
'fun', # function to call with items read
'min_count', # minimum amount of items to produce, None means no override
'max_chunksize', # maximium amount of items to process per process call
@@ -84,6 +85,10 @@ class OutputChannelTask(Node):
finally:
self._slock.release()
# END threadsafe return
+
+ def set_pool(self, pool):
+ """Set our pool to the given one, it will be weakref'd"""
+ self._pool_ref = weakref.ref(pool)
def process(self, count=0):
"""Process count items and send the result individually to the output channel"""
@@ -147,6 +152,16 @@ class OutputChannelTask(Node):
# + 1 for the instance we provide to refcount
if self.is_done() and sys.getrefcount(self._out_wc) < 4:
self.close()
+ # additionally, remove ourselves from the pool, this is thread-safe
+ # Previously the pool collected done tasks and removed them,
+ # but this could happen after a read finished, potentially
+ # leaving them on the queue until the read-handle was dropped.
+ # This should assure its more in-time.
+ # I don't like this back-ref.
+ pool = self._pool_ref()
+ if pool:
+ pool.del_task(self)
+ # END remove ourselves from the pool
# END handle channel closure
#{ Configuration
@@ -204,8 +219,7 @@ class InputChannelTask(OutputChannelTask):
For instantiation, it takes all arguments of its base, the first one needs
to be the input channel to read from though."""
__slots__ = (
- 'in_rc', # channel to read items from
- '_pool_ref' # to be set by Pool
+ 'in_rc' # channel to read items from
)
def __init__(self, in_rc, *args, **kwargs):
@@ -242,9 +256,5 @@ class InputChannelTask(OutputChannelTask):
# and call it
return OutputChannelTask.process(self, count)
-
- def set_pool(self, pool):
- """Set our pool to the given one, it will be weakref'd"""
- self._pool_ref = weakref.ref(pool)
#{ Configuration