summaryrefslogtreecommitdiff
path: root/lib/git/async/task.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async/task.py')
-rw-r--r--lib/git/async/task.py51
1 files changed, 6 insertions, 45 deletions
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index 0eb4527c..b7b5e699 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -1,5 +1,7 @@
from graph import Node
from util import ReadOnly
+from channel import IteratorReader
+
import threading
import weakref
@@ -179,56 +181,15 @@ class ThreadTaskBase(object):
class InputIteratorTaskBase(OutputChannelTask):
"""Implements a task which processes items from an iterable in a multi-processing
safe manner"""
- __slots__ = ('_iterator', '_lock', '_empty')
- # the type of the lock to use when reading from the iterator
- lock_type = None
+ __slots__ = tuple()
+
def __init__(self, iterator, *args, **kwargs):
OutputChannelTask.__init__(self, *args, **kwargs)
- if not hasattr(iterator, 'next'):
- raise ValueError("Iterator %r needs a next() function" % iterator)
- self._iterator = iterator
- self._lock = self.lock_type()
-
- # this is necessary to prevent a cyclic ref, preventing us from
- # getting deleted ( and collected )
- weakself = weakref.ref(self)
- self._read = lambda count: weakself().__read(count)
- self._empty = False
-
+ self._read = IteratorReader(iterator).read
# defaults to returning our items unchanged
self.fun = lambda item: item
-
- def __read(self, count=0):
- """Read count items from the iterator, and return them"""
- # not threadsafe, but worst thing that could happen is that
- # we try to get items one more time
- if self._empty:
- return list()
- # END early abort
-
- self._lock.acquire()
- try:
- if count == 0:
- self._empty = True
- return list(self._iterator)
- else:
- out = list()
- it = self._iterator
- for i in xrange(count):
- try:
- out.append(it.next())
- except StopIteration:
- self._empty = True
- break
- # END handle empty iterator
- # END for each item to take
- return out
- # END handle count
- finally:
- self._lock.release()
- # END handle locking
-
+
class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase):
"""An input iterator for threaded pools"""