diff options
Diffstat (limited to 'lib/git/async/task.py')
-rw-r--r-- | lib/git/async/task.py | 26 |
1 files changed, 16 insertions, 10 deletions
diff --git a/lib/git/async/task.py b/lib/git/async/task.py index b7b5e699..ac948dc0 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -8,21 +8,27 @@ import weakref import sys import new -__all__ = ('OutputChannelTask', 'ThreadTaskBase', 'InputIteratorTaskBase', - 'InputIteratorThreadTask', 'InputChannelTask') +__all__ = ('Task', 'ThreadTaskBase', 'IteratorTaskBase', + 'IteratorThreadTask', 'ChannelThreadTask') -class OutputChannelTask(Node): +class Task(Node): """Abstracts a named task, which contains additional information on how the task should be queued and processed. - Results of the item processing are sent to a write channel, which is to be + Results of the item processing are sent to a writer, which is to be set by the creator using the ``set_writer`` method. + Items are read using the internal ``_read`` callable, subclasses are meant to + set this to a callable that supports the Reader interface's read function. + * **min_count** assures that not less than min_count items will be processed per call. * **max_chunksize** assures that multi-threading is happening in smaller chunks. If someone wants all items to be processed, using read(0), the whole task would go to one worker, as well as dependent tasks. If you want finer granularity , you can - specify this here, causing chunks to be no larger than max_chunksize""" + specify this here, causing chunks to be no larger than max_chunksize + * **apply_single** if True, default True, individual items will be given to the + worker function. If False, a list of possibly multiple items will be passed + instead.""" __slots__ = ( '_read', # method to yield items to process '_out_writer', # output write channel '_exc', # exception caught @@ -178,32 +184,32 @@ class ThreadTaskBase(object): pass -class InputIteratorTaskBase(OutputChannelTask): +class IteratorTaskBase(Task): """Implements a task which processes items from an iterable in a multi-processing safe manner""" __slots__ = tuple() def __init__(self, iterator, *args, **kwargs): - OutputChannelTask.__init__(self, *args, **kwargs) + Task.__init__(self, *args, **kwargs) self._read = IteratorReader(iterator).read # defaults to returning our items unchanged self.fun = lambda item: item -class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase): +class IteratorThreadTask(IteratorTaskBase, ThreadTaskBase): """An input iterator for threaded pools""" lock_type = threading.Lock -class InputChannelTask(OutputChannelTask, ThreadTaskBase): +class ChannelThreadTask(Task, ThreadTaskBase): """Uses an input channel as source for reading items For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.""" __slots__ = "_pool_ref" def __init__(self, in_reader, *args, **kwargs): - OutputChannelTask.__init__(self, *args, **kwargs) + Task.__init__(self, *args, **kwargs) self._read = in_reader.read self._pool_ref = None |