diff options
| author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-10 15:38:40 +0200 | 
|---|---|---|
| committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-10 15:38:40 +0200 | 
| commit | 7c36f3648e39ace752c67c71867693ce1eee52a3 (patch) | |
| tree | 0e7a63fc98c40dfced395ab73dfc44c5e176ad42 /lib/git/async | |
| parent | 55e757928e493ce93056822d510482e4ffcaac2d (diff) | |
| download | gitpython-7c36f3648e39ace752c67c71867693ce1eee52a3.tar.gz | |
Now tracking the amount of concurrent writers to assure the channel is closed only when there is no one else writing to it. This assures that all tasks can continue working, and put their results accordingly. Shutdown is still not working correctly, but that should be solvable as well. Its still not perfect though ...
Diffstat (limited to 'lib/git/async')
| -rw-r--r-- | lib/git/async/pool.py | 24 | ||||
| -rw-r--r-- | lib/git/async/task.py | 64 | 
2 files changed, 59 insertions, 29 deletions
| diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 68551ea3..3fd99c7b 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -33,13 +33,12 @@ import new  class PoolReader(CallbackReader):  	"""A reader designed to read from channels which take part in pools  	It acts like a handle to the underlying task in the pool.""" -	__slots__ = ('_task_ref', '_pool_ref', '_read') +	__slots__ = ('_task_ref', '_pool_ref')  	def __init__(self, channel, task, pool):  		CallbackReader.__init__(self, channel)  		self._task_ref = weakref.ref(task)  		self._pool_ref = weakref.ref(pool) -		self._read = new.instancemethod(CallbackReader.__dict__['read'], self, CallbackReader)  	def __del__(self):  		"""Assures that our task will be deleted if we were the last reader""" @@ -62,11 +61,16 @@ class PoolReader(CallbackReader):  		# it has no way of knowing that the write channel is about to diminsh.  		# which is why we pass the info as a private kwarg  - not nice, but   		# okay for now -		# TODO: Fix this - private/public method  		if sys.getrefcount(self) < 6: -			pool.remove_task(task) +			pool.remove_task(task, _from_destructor_ = True)  		# END handle refcount based removal of task +	#{ Internal +	def _read(self, count=0, block=True, timeout=None): +		return CallbackReader.read(self, count, block, timeout) +	 +	#} END internal +  	#{ Interface  	def pool_ref(self): @@ -261,7 +265,7 @@ class Pool(object):  			if self._num_workers:  				# respect the chunk size, and split the task up if we want   				# to process too much. This can be defined per task -				qput = self._queue +				qput = self._queue.put  				if numchunks > 1:  					for i in xrange(numchunks):  						qput((task.process, chunksize)) @@ -290,16 +294,16 @@ class Pool(object):  		# END for each task to process -	def _remove_task_if_orphaned(self, task): +	def _remove_task_if_orphaned(self, task, from_destructor):  		"""Check the task, and delete it if it is orphaned"""  		# 1 for writer on task, 1 for the getrefcount call + 1 for each other writer/reader  		# If we are getting here from the destructor of an RPool channel,   		# its totally valid to virtually decrement the refcount by 1 as   		# we can expect it to drop once the destructor completes, which is when  		# we finish all recursive calls -		max_ref_count = 3 +		max_ref_count = 3 + from_destructor  		if sys.getrefcount(task.writer().channel) < max_ref_count: -			self.remove_task(task) +			self.remove_task(task, from_destructor)  	#} END internal  	#{ Interface  @@ -370,7 +374,7 @@ class Pool(object):  		finally:  			self._taskgraph_lock.release() -	def remove_task(self, task): +	def remove_task(self, task, _from_destructor_ = False):  		"""Delete the task  		Additionally we will remove orphaned tasks, which can be identified if their   		output channel is only held by themselves, so no one will ever consume  @@ -405,7 +409,7 @@ class Pool(object):  		# END locked deletion  		for t in in_tasks: -			self._remove_task_if_orphaned(t) +			self._remove_task_if_orphaned(t, _from_destructor_)  		# END handle orphans recursively  		return self diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 5a6c1e95..ae2532d9 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -82,23 +82,36 @@ class OutputChannelTask(Node):  	def process(self, count=0):  		"""Process count items and send the result individually to the output channel""" -		# print "%r: reading %i" % (self.id, count) +		# first thing: increment the writer count +		self._wlock.acquire() +		self._num_writers += 1 +		self._wlock.release() +		 +		#print "%r: reading %i" % (self.id, count) +		#if hasattr(self, 'reader'): +		#	print "from", self.reader().channel  		items = self._read(count) -		# print "%r: done reading %i items" % (self.id, len(items)) +		#print "%r: done reading %i items" % (self.id, len(items))  		try: -			write = self._out_writer.write -			if self.apply_single: -				for item in items: -					rval = self.fun(item) -					write(rval) -				# END for each item -			else: -				# shouldn't apply single be the default anyway ?  -				# The task designers should chunk them up in advance -				rvals = self.fun(items) -				for rval in rvals: -					write(rval) -			# END handle single apply +			try: +				write = self._out_writer.write +				if self.apply_single: +					for item in items: +						rval = self.fun(item) +						write(rval) +					# END for each item +				else: +					# shouldn't apply single be the default anyway ?  +					# The task designers should chunk them up in advance +					rvals = self.fun(items) +					for rval in rvals: +						write(rval) +				# END handle single apply +			finally: +				self._wlock.acquire() +				self._num_writers -= 1 +				self._wlock.release() +			# END handle writer count  		except Exception, e:  			print >> sys.stderr, "task %s error:" % self.id, type(e), str(e)	# TODO: REMOVE DEBUG, or make it use logging  			# be sure our task is not scheduled again @@ -144,8 +157,13 @@ class OutputChannelTask(Node):  		# + 1 for the instance we provide to refcount  		# Soft close, so others can continue writing their results  		if self.is_done(): -			# print "Closing channel of %r" % self.id -			self.close() +			self._wlock.acquire() +			if self._num_writers == 0: +				#if not self.is_closed():		# DEBUG  +				#	print "Closing channel of %r" % self.id, len(self._out_writer.channel.queue), self._out_writer.channel +				self.close() +			# END handle writers +			self._wlock.release()  		# END handle channel closure  	#{ Configuration @@ -158,7 +176,7 @@ class ThreadTaskBase(object):  class InputIteratorTaskBase(OutputChannelTask):  	"""Implements a task which processes items from an iterable in a multi-processing   	safe manner""" -	__slots__ = ('_iterator', '_lock') +	__slots__ = ('_iterator', '_lock', '_empty')  	# the type of the lock to use when reading from the iterator  	lock_type = None @@ -169,12 +187,19 @@ class InputIteratorTaskBase(OutputChannelTask):  		self._iterator = iterator  		self._lock = self.lock_type()  		self._read = self.__read +		self._empty = False  	def __read(self, count=0):  		"""Read count items from the iterator, and return them""" +		# not threadsafe, but worst thing that could happen is that  +		# we try to get items one more time +		if self._empty: +			return list() +		# END early abort  		self._lock.acquire()  		try:  			if count == 0: +				self._empty = True  				return list(self._iterator)  			else:  				out = list() @@ -183,6 +208,7 @@ class InputIteratorTaskBase(OutputChannelTask):  					try:  						out.append(it.next())  					except StopIteration: +						self._empty = True  						break  					# END handle empty iterator  				# END for each item to take @@ -198,7 +224,7 @@ class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase):  	lock_type = threading.Lock -class InputChannelTask(OutputChannelTask): +class InputChannelTask(OutputChannelTask, ThreadTaskBase):  	"""Uses an input channel as source for reading items  	For instantiation, it takes all arguments of its base, the first one needs  	to be the input channel to read from though.""" | 
