diff options
Diffstat (limited to 'lib/git/async/task.py')
-rw-r--r-- | lib/git/async/task.py | 22 |
1 files changed, 16 insertions, 6 deletions
diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 97521cae..dc207c33 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -23,6 +23,7 @@ class OutputChannelTask(Node): '_done', # True if we are done '_scheduled_items', # amount of scheduled items that will be processed in total '_slock', # lock for scheduled items + '_pool_ref', # to be set by Pool 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -84,6 +85,10 @@ class OutputChannelTask(Node): finally: self._slock.release() # END threadsafe return + + def set_pool(self, pool): + """Set our pool to the given one, it will be weakref'd""" + self._pool_ref = weakref.ref(pool) def process(self, count=0): """Process count items and send the result individually to the output channel""" @@ -147,6 +152,16 @@ class OutputChannelTask(Node): # + 1 for the instance we provide to refcount if self.is_done() and sys.getrefcount(self._out_wc) < 4: self.close() + # additionally, remove ourselves from the pool, this is thread-safe + # Previously the pool collected done tasks and removed them, + # but this could happen after a read finished, potentially + # leaving them on the queue until the read-handle was dropped. + # This should assure its more in-time. + # I don't like this back-ref. + pool = self._pool_ref() + if pool: + pool.del_task(self) + # END remove ourselves from the pool # END handle channel closure #{ Configuration @@ -204,8 +219,7 @@ class InputChannelTask(OutputChannelTask): For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.""" __slots__ = ( - 'in_rc', # channel to read items from - '_pool_ref' # to be set by Pool + 'in_rc' # channel to read items from ) def __init__(self, in_rc, *args, **kwargs): @@ -242,9 +256,5 @@ class InputChannelTask(OutputChannelTask): # and call it return OutputChannelTask.process(self, count) - - def set_pool(self, pool): - """Set our pool to the given one, it will be weakref'd""" - self._pool_ref = weakref.ref(pool) #{ Configuration |