diff options
Diffstat (limited to 'lib/git/async/task.py')
-rw-r--r-- | lib/git/async/task.py | 51 |
1 files changed, 6 insertions, 45 deletions
diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 0eb4527c..b7b5e699 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,5 +1,7 @@ from graph import Node from util import ReadOnly +from channel import IteratorReader + import threading import weakref @@ -179,56 +181,15 @@ class ThreadTaskBase(object): class InputIteratorTaskBase(OutputChannelTask): """Implements a task which processes items from an iterable in a multi-processing safe manner""" - __slots__ = ('_iterator', '_lock', '_empty') - # the type of the lock to use when reading from the iterator - lock_type = None + __slots__ = tuple() + def __init__(self, iterator, *args, **kwargs): OutputChannelTask.__init__(self, *args, **kwargs) - if not hasattr(iterator, 'next'): - raise ValueError("Iterator %r needs a next() function" % iterator) - self._iterator = iterator - self._lock = self.lock_type() - - # this is necessary to prevent a cyclic ref, preventing us from - # getting deleted ( and collected ) - weakself = weakref.ref(self) - self._read = lambda count: weakself().__read(count) - self._empty = False - + self._read = IteratorReader(iterator).read # defaults to returning our items unchanged self.fun = lambda item: item - - def __read(self, count=0): - """Read count items from the iterator, and return them""" - # not threadsafe, but worst thing that could happen is that - # we try to get items one more time - if self._empty: - return list() - # END early abort - - self._lock.acquire() - try: - if count == 0: - self._empty = True - return list(self._iterator) - else: - out = list() - it = self._iterator - for i in xrange(count): - try: - out.append(it.next()) - except StopIteration: - self._empty = True - break - # END handle empty iterator - # END for each item to take - return out - # END handle count - finally: - self._lock.release() - # END handle locking - + class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase): """An input iterator for threaded pools""" |