diff options
| -rw-r--r-- | lib/git/async/pool.py | 13 | ||||
| -rw-r--r-- | lib/git/async/task.py | 37 | 
2 files changed, 30 insertions, 20 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 7d4e96d1..f7c1cfe0 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -388,18 +388,21 @@ class Pool(object):  			self._taskorder_cache.clear()  			self._tasks.add_node(task) -			# fix locks - in serial mode, the task does not need real locks -			# Additionally, use a non-threadsafe queue +			# Use a non-threadsafe queue  			# This brings about 15% more performance, but sacrifices thread-safety  			# when reading from multiple threads.  			if self.size() == 0:  				wctype = SerialWChannel  			# END improve locks -			# setup the tasks channel -			wc = wctype() +			# setup the tasks channel - respect the task creators choice though +			# if it is set. +			wc = task.wchannel() +			if wc is None: +				wc = wctype() +			# END create write channel ifunset  			rc = RPoolChannel(wc, task, self) -			task.set_wc(wc) +			task.set_wchannel(wc)  		finally:  			self._taskgraph_lock.release()  		# END sync task addition diff --git a/lib/git/async/task.py b/lib/git/async/task.py index be02cfe8..f98336b2 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,4 +1,5 @@  from graph import Node +from channel import WChannel  from util import ReadOnly  import threading @@ -11,8 +12,8 @@ class OutputChannelTask(Node):  	"""Abstracts a named task as part of a set of interdependent tasks, which contains   	additional information on how the task should be queued and processed. -	Results of the item processing are sent to an output channel, which is to be  -	set by the creator +	Results of the item processing are sent to a write channel, which is to be  +	set by the creator using the ``set_wchannel`` method.  	* **min_count** assures that not less than min_count items will be processed per call.  	* **max_chunksize** assures that multi-threading is happening in smaller chunks. If  @@ -29,10 +30,11 @@ class OutputChannelTask(Node):  					'apply_single'		# apply single items even if multiple where read  					) -	def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0): +	def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0,  +					wchannel=None):  		Node.__init__(self, id)  		self._read = None					# to be set by subclasss  -		self._out_wc = None					# to be set later +		self._out_wc = wchannel				# to be set later  		self._exc = None  		self._done = False  		self.fun = fun @@ -48,13 +50,21 @@ class OutputChannelTask(Node):  		"""Set ourselves to being done, has we have completed the processing"""  		self._done = True -	def set_wc(self, wc): -		"""Set the write channel to the given one -		:note: resets it done state in order to allow proper queue handling""" -		self._done = False		# TODO : fix this, this is a side-effect -		self._scheduled_items = 0 +	def set_wchannel(self, wc): +		"""Set the write channel to the given one"""  		self._out_wc = wc +	def wchannel(self): +		""":return: a proxy to our write channel or None if non is set +		:note: you must not hold a reference to our write channel when the  +			task is being processed. This would cause the write channel never  +			to be closed as the task will think there is still another instance +			being processed which can close the channel once it is done. +			In the worst case, this will block your reads.""" +		if self._out_wc is None: +			return None +		return self._out_wc +		  	def close(self):  		"""A closed task will close its channel to assure the readers will wake up  		:note: its safe to call this method multiple times""" @@ -128,8 +138,10 @@ class OutputChannelTask(Node):  		# END handle done state  		# If we appear to be the only one left with our output channel, and are  -		# closed ( this could have been set in another thread as well ), make  +		# done ( this could have been set in another thread as well ), make   		# sure to close the output channel. +		# Waiting with this to be the last one helps to keep the  +		# write-channel writable longer  		# The count is: 1 = wc itself, 2 = first reader channel, + x for every   		# thread having its copy on the stack   		# + 1 for the instance we provide to refcount @@ -196,10 +208,5 @@ class InputChannelTask(OutputChannelTask):  		OutputChannelTask.__init__(self, *args, **kwargs)  		self._read = in_rc.read -	def process(self, count=1): -		# for now, just blindly read our input, could trigger a pool, even  -		# ours, but why not ? It should be able to handle this -		# TODO: remove this method -		super(InputChannelTask, self).process(count)  	#{ Configuration  | 
