diff options
Diffstat (limited to 'lib/git/async/task.py')
| -rw-r--r-- | lib/git/async/task.py | 47 | 
1 files changed, 45 insertions, 2 deletions
| diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 4e8aef54..cf486f48 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -21,6 +21,8 @@ class OutputChannelTask(Node):  					'_out_wc', 			# output write channel  					'_exc',				# exception caught  					'_done',			# True if we are done +					'_scheduled_items', # amount of scheduled items that will be processed in total +					'_slock',			# lock for scheduled items  					'fun',				# function to call with items read  					'min_count', 		# minimum amount of items to produce, None means no override  					'max_chunksize',	# maximium amount of items to process per process call @@ -33,6 +35,8 @@ class OutputChannelTask(Node):  		self._out_wc = None					# to be set later  		self._exc = None  		self._done = False +		self._scheduled_items = 0 +		self._slock = threading.Lock()  		self.fun = fun  		self.min_count = None  		self.max_chunksize = 0				# note set @@ -50,6 +54,7 @@ class OutputChannelTask(Node):  		"""Set the write channel to the given one  		:note: resets it done state in order to allow proper queue handling"""  		self._done = False +		self._scheduled_items = 0  		self._out_wc = wc  	def close(self): @@ -65,6 +70,21 @@ class OutputChannelTask(Node):  		""":return: Exception caught during last processing or None"""  		return self._exc +	def add_scheduled_items(self, count): +		"""Add the given amount of scheduled items to this task""" +		self._slock.acquire() +		self._scheduled_items += count  +		self._slock.release() +		 +	def scheduled_item_count(self): +		""":return: amount of scheduled items for this task""" +		self._slock.acquire() +		try: +			return self._scheduled_items +		finally: +			self._slock.release() +		# END threadsafe return +  	def process(self, count=0):  		"""Process count items and send the result individually to the output channel"""  		items = self._read(count) @@ -78,14 +98,33 @@ class OutputChannelTask(Node):  			wc = self._out_wc  			if self.apply_single:  				for item in items: -					wc.write(self.fun(item)) +					rval = self.fun(item) +					# decrement afterwards, the its unscheduled once its produced   +					self._slock.acquire() +					self._scheduled_items -= 1 +					self._slock.release() +					wc.write(rval)  				# END for each item  			else: -				wc.write(self.fun(items)) +				# shouldn't apply single be the default anyway ?  +				# The task designers should chunk them up in advance +				rvals = self.fun(items) +				self._slock.acquire() +				self._scheduled_items -= len(items) +				self._slock.release() +				for rval in rvals: +					wc.write(rval)  			# END handle single apply  		except Exception, e:  			self._exc = e  			self.set_done() +			# unschedule all, we don't know how many have been produced actually +			# but only if we don't apply single please  +			if not self.apply_single: +				self._slock.acquire() +				self._scheduled_items -= len(items) +				self._slock.release() +			# END unschedule all  		# END exception handling  		del(wc) @@ -189,6 +228,10 @@ class InputChannelTask(OutputChannelTask):  		#  for each task, which would allow to precisely determine whether   		#  the pool as to be triggered, and bail out early. Problem would   		#	be the  +		# * Perhaps one shouldn't seek the perfect solution , but instead +		#  document whats working and what not, or under which conditions. +		#  The whole system is simple, but gets more complicated the +		#  smarter it wants to be.  		if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref():  			self._read = self._in_rc._read | 
