summaryrefslogtreecommitdiff
path: root/lib/git/async/task.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async/task.py')
-rw-r--r--lib/git/async/task.py64
1 files changed, 45 insertions, 19 deletions
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index 5a6c1e95..ae2532d9 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -82,23 +82,36 @@ class OutputChannelTask(Node):
def process(self, count=0):
"""Process count items and send the result individually to the output channel"""
- # print "%r: reading %i" % (self.id, count)
+ # first thing: increment the writer count
+ self._wlock.acquire()
+ self._num_writers += 1
+ self._wlock.release()
+
+ #print "%r: reading %i" % (self.id, count)
+ #if hasattr(self, 'reader'):
+ # print "from", self.reader().channel
items = self._read(count)
- # print "%r: done reading %i items" % (self.id, len(items))
+ #print "%r: done reading %i items" % (self.id, len(items))
try:
- write = self._out_writer.write
- if self.apply_single:
- for item in items:
- rval = self.fun(item)
- write(rval)
- # END for each item
- else:
- # shouldn't apply single be the default anyway ?
- # The task designers should chunk them up in advance
- rvals = self.fun(items)
- for rval in rvals:
- write(rval)
- # END handle single apply
+ try:
+ write = self._out_writer.write
+ if self.apply_single:
+ for item in items:
+ rval = self.fun(item)
+ write(rval)
+ # END for each item
+ else:
+ # shouldn't apply single be the default anyway ?
+ # The task designers should chunk them up in advance
+ rvals = self.fun(items)
+ for rval in rvals:
+ write(rval)
+ # END handle single apply
+ finally:
+ self._wlock.acquire()
+ self._num_writers -= 1
+ self._wlock.release()
+ # END handle writer count
except Exception, e:
print >> sys.stderr, "task %s error:" % self.id, type(e), str(e) # TODO: REMOVE DEBUG, or make it use logging
# be sure our task is not scheduled again
@@ -144,8 +157,13 @@ class OutputChannelTask(Node):
# + 1 for the instance we provide to refcount
# Soft close, so others can continue writing their results
if self.is_done():
- # print "Closing channel of %r" % self.id
- self.close()
+ self._wlock.acquire()
+ if self._num_writers == 0:
+ #if not self.is_closed(): # DEBUG
+ # print "Closing channel of %r" % self.id, len(self._out_writer.channel.queue), self._out_writer.channel
+ self.close()
+ # END handle writers
+ self._wlock.release()
# END handle channel closure
#{ Configuration
@@ -158,7 +176,7 @@ class ThreadTaskBase(object):
class InputIteratorTaskBase(OutputChannelTask):
"""Implements a task which processes items from an iterable in a multi-processing
safe manner"""
- __slots__ = ('_iterator', '_lock')
+ __slots__ = ('_iterator', '_lock', '_empty')
# the type of the lock to use when reading from the iterator
lock_type = None
@@ -169,12 +187,19 @@ class InputIteratorTaskBase(OutputChannelTask):
self._iterator = iterator
self._lock = self.lock_type()
self._read = self.__read
+ self._empty = False
def __read(self, count=0):
"""Read count items from the iterator, and return them"""
+ # not threadsafe, but worst thing that could happen is that
+ # we try to get items one more time
+ if self._empty:
+ return list()
+ # END early abort
self._lock.acquire()
try:
if count == 0:
+ self._empty = True
return list(self._iterator)
else:
out = list()
@@ -183,6 +208,7 @@ class InputIteratorTaskBase(OutputChannelTask):
try:
out.append(it.next())
except StopIteration:
+ self._empty = True
break
# END handle empty iterator
# END for each item to take
@@ -198,7 +224,7 @@ class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase):
lock_type = threading.Lock
-class InputChannelTask(OutputChannelTask):
+class InputChannelTask(OutputChannelTask, ThreadTaskBase):
"""Uses an input channel as source for reading items
For instantiation, it takes all arguments of its base, the first one needs
to be the input channel to read from though."""