diff options
Diffstat (limited to 'lib/git/async')
-rw-r--r-- | lib/git/async/task.py | 43 |
1 files changed, 24 insertions, 19 deletions
diff --git a/lib/git/async/task.py b/lib/git/async/task.py index ae2532d9..a8ba5ac6 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -92,21 +92,24 @@ class OutputChannelTask(Node): # print "from", self.reader().channel items = self._read(count) #print "%r: done reading %i items" % (self.id, len(items)) + try: try: - write = self._out_writer.write - if self.apply_single: - for item in items: - rval = self.fun(item) - write(rval) - # END for each item - else: - # shouldn't apply single be the default anyway ? - # The task designers should chunk them up in advance - rvals = self.fun(items) - for rval in rvals: - write(rval) - # END handle single apply + if items: + write = self._out_writer.write + if self.apply_single: + for item in items: + rval = self.fun(item) + write(rval) + # END for each item + else: + # shouldn't apply single be the default anyway ? + # The task designers should chunk them up in advance + rvals = self.fun(items) + for rval in rvals: + write(rval) + # END handle single apply + # END if there is anything to do finally: self._wlock.acquire() self._num_writers -= 1 @@ -158,12 +161,14 @@ class OutputChannelTask(Node): # Soft close, so others can continue writing their results if self.is_done(): self._wlock.acquire() - if self._num_writers == 0: - #if not self.is_closed(): # DEBUG - # print "Closing channel of %r" % self.id, len(self._out_writer.channel.queue), self._out_writer.channel - self.close() - # END handle writers - self._wlock.release() + try: + if self._num_writers == 0: + # print "Closing channel of %r" % self.id, len(self._out_writer.channel.queue), self._out_writer.channel + self.close() + # END handle writers + finally: + self._wlock.release() + # END assure lock release # END handle channel closure #{ Configuration |