diff options
Diffstat (limited to 'lib/git/async/task.py')
| -rw-r--r-- | lib/git/async/task.py | 24 | 
1 files changed, 21 insertions, 3 deletions
| diff --git a/lib/git/async/task.py b/lib/git/async/task.py index f106c381..b282e371 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -2,6 +2,7 @@ from graph import Node  import threading  import weakref +import sys  import new  class OutputChannelTask(Node): @@ -44,7 +45,6 @@ class OutputChannelTask(Node):  	def set_done(self):  		"""Set ourselves to being done, has we have completed the processing"""  		self._done = True -		self.close()  	def set_wc(self, wc):  		"""Set the write channel to the given one @@ -69,17 +69,25 @@ class OutputChannelTask(Node):  		"""Process count items and send the result individually to the output channel"""  		items = self._read(count)  		try: +			# increase the ref-count - we use this to determine whether anyone else +			# is currently handling our output channel. As this method runs asynchronously,  +			# we have to make sure that the channel is closed by the last finishing task, +			# which is not necessarily the one which determines that he is done +			# as he couldn't read anymore items. +			# The refcount will be dropped in the moment we get out of here. +			wc = self._out_wc  			if self.apply_single:  				for item in items: -					self._out_wc.write(self.fun(item)) +					wc.write(self.fun(item))  				# END for each item  			else: -				self._out_wc.write(self.fun(items)) +				wc.write(self.fun(items))  			# END handle single apply  		except Exception, e:  			self._exc = e  			self.set_done()  		# END exception handling +		del(wc)  		# if we didn't get all demanded items, which is also the case if count is 0  		# we have depleted the input channel and are done @@ -89,6 +97,16 @@ class OutputChannelTask(Node):  		if not items or len(items) != count:  			self.set_done()  		# END handle done state +		 +		# If we appear to be the only one left with our output channel, and are  +		# closed ( this could have been set in another thread as well ), make  +		# sure to close the output channel. +		# The count is: 1 = wc itself, 2 = first reader channel, and we have only  +		# one, 3 is ours + x for every thread having its copy on the stack  +		# + 1 for the instance we provide to refcount +		if self.is_done() and sys.getrefcount(self._out_wc) < 5: +			self.close() +		# END handle channel closure  	#{ Configuration | 
