diff options
Diffstat (limited to 'lib/git/async/task.py')
-rw-r--r-- | lib/git/async/task.py | 26 |
1 files changed, 25 insertions, 1 deletions
diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 3137746c..f106c381 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,5 +1,7 @@ from graph import Node + import threading +import weakref import new class OutputChannelTask(Node): @@ -17,6 +19,7 @@ class OutputChannelTask(Node): __slots__ = ( '_read', # method to yield items to process '_out_wc', # output write channel '_exc', # exception caught + '_done', # True if we are done 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -28,6 +31,7 @@ class OutputChannelTask(Node): self._read = None # to be set by subclasss self._out_wc = None # to be set later self._exc = None + self._done = False self.fun = fun self.min_count = None self.max_chunksize = 0 # note set @@ -35,12 +39,28 @@ class OutputChannelTask(Node): def is_done(self): """:return: True if we are finished processing""" - return self._out_wc.closed + return self._done def set_done(self): """Set ourselves to being done, has we have completed the processing""" + self._done = True + self.close() + + def set_wc(self, wc): + """Set the write channel to the given one + :note: resets it done state in order to allow proper queue handling""" + self._done = False + self._out_wc = wc + + def close(self): + """A closed task will close its channel to assure the readers will wake up + :note: its safe to call this method multiple times""" self._out_wc.close() + def is_closed(self): + """:return: True if the task's write channel is closed""" + return self._out_wc.closed + def error(self): """:return: Exception caught during last processing or None""" return self._exc @@ -148,5 +168,9 @@ class InputChannelTask(OutputChannelTask): # and call it return OutputChannelTask.process(self, count) + + def set_pool(self, pool): + """Set our pool to the given one, it will be weakref'd""" + self._pool_ref = weakref.ref(pool) #{ Configuration |