summaryrefslogtreecommitdiff
path: root/lib/git/async/task.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-12 12:38:02 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-12 12:40:07 +0200
commit7a0b79ee574999ecbc76696506352e4a5a0d7159 (patch)
treee0e2aa63b7dc649083858366eaedb6ac4cc5739b /lib/git/async/task.py
parent1d8a577ffc6ad7ce1465001ddebdc157aecc1617 (diff)
downloadgitpython-7a0b79ee574999ecbc76696506352e4a5a0d7159.tar.gz
task: improved naming of task types, improved pool test to be less dependent on starting with just the main thread
Diffstat (limited to 'lib/git/async/task.py')
-rw-r--r--lib/git/async/task.py26
1 files changed, 16 insertions, 10 deletions
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index b7b5e699..ac948dc0 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -8,21 +8,27 @@ import weakref
import sys
import new
-__all__ = ('OutputChannelTask', 'ThreadTaskBase', 'InputIteratorTaskBase',
- 'InputIteratorThreadTask', 'InputChannelTask')
+__all__ = ('Task', 'ThreadTaskBase', 'IteratorTaskBase',
+ 'IteratorThreadTask', 'ChannelThreadTask')
-class OutputChannelTask(Node):
+class Task(Node):
"""Abstracts a named task, which contains
additional information on how the task should be queued and processed.
- Results of the item processing are sent to a write channel, which is to be
+ Results of the item processing are sent to a writer, which is to be
set by the creator using the ``set_writer`` method.
+ Items are read using the internal ``_read`` callable, subclasses are meant to
+ set this to a callable that supports the Reader interface's read function.
+
* **min_count** assures that not less than min_count items will be processed per call.
* **max_chunksize** assures that multi-threading is happening in smaller chunks. If
someone wants all items to be processed, using read(0), the whole task would go to
one worker, as well as dependent tasks. If you want finer granularity , you can
- specify this here, causing chunks to be no larger than max_chunksize"""
+ specify this here, causing chunks to be no larger than max_chunksize
+ * **apply_single** if True, default True, individual items will be given to the
+ worker function. If False, a list of possibly multiple items will be passed
+ instead."""
__slots__ = ( '_read', # method to yield items to process
'_out_writer', # output write channel
'_exc', # exception caught
@@ -178,32 +184,32 @@ class ThreadTaskBase(object):
pass
-class InputIteratorTaskBase(OutputChannelTask):
+class IteratorTaskBase(Task):
"""Implements a task which processes items from an iterable in a multi-processing
safe manner"""
__slots__ = tuple()
def __init__(self, iterator, *args, **kwargs):
- OutputChannelTask.__init__(self, *args, **kwargs)
+ Task.__init__(self, *args, **kwargs)
self._read = IteratorReader(iterator).read
# defaults to returning our items unchanged
self.fun = lambda item: item
-class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase):
+class IteratorThreadTask(IteratorTaskBase, ThreadTaskBase):
"""An input iterator for threaded pools"""
lock_type = threading.Lock
-class InputChannelTask(OutputChannelTask, ThreadTaskBase):
+class ChannelThreadTask(Task, ThreadTaskBase):
"""Uses an input channel as source for reading items
For instantiation, it takes all arguments of its base, the first one needs
to be the input channel to read from though."""
__slots__ = "_pool_ref"
def __init__(self, in_reader, *args, **kwargs):
- OutputChannelTask.__init__(self, *args, **kwargs)
+ Task.__init__(self, *args, **kwargs)
self._read = in_reader.read
self._pool_ref = None