diff options
| author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-06 23:08:06 +0200 | 
|---|---|---|
| committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-06 23:08:06 +0200 | 
| commit | 867129e2950458ab75523b920a5e227e3efa8bbc (patch) | |
| tree | 714a0ec16d915d04c69e91a3e222a79cdc9532be /lib/git/async/channel.py | |
| parent | 1b27292936c81637f6b9a7141dafaad1126f268e (diff) | |
| download | gitpython-867129e2950458ab75523b920a5e227e3efa8bbc.tar.gz | |
channel.read: enhanced to be sure we don't run into non-atomicity issues related to our channel closed flag, which is the only way not to block forever on read(0) channels which were closed by a thread 'in the meanwhile'
Diffstat (limited to 'lib/git/async/channel.py')
| -rw-r--r-- | lib/git/async/channel.py | 89 | 
1 files changed, 72 insertions, 17 deletions
| diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 70daed24..0a1db26b 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -5,6 +5,9 @@ from Queue import (  	Full  	) +from time import time +import sys +  #{ Classes   class Channel(object):  	"""A channel is similar to a file like object. It has a write end as well as one or @@ -106,26 +109,78 @@ class RChannel(Channel):  			If count was < 1, a list with all items that could be read will be   			returned."""  		# if the channel is closed for writing, we never block -		if self._wc.closed: +		if self._wc.closed or timeout == 0:  			block = False -		 +			 +		# in non-blocking mode, its all not a problem  		out = list() -		try: -			if count == 1: -				out.append(self._wc._queue.get(block, timeout)) -			elif count < 1: -				while True: -					out.append(self._wc._queue.get(block, timeout)) -				# END for each item -				return out -			else: -				for i in xrange(count): -					out.append(self._wc._queue.get(block, timeout)) -				# END for each item +		queue = self._wc._queue +		if not block: +			# be as fast as possible in non-blocking mode, hence +			# its a bit 'unrolled' +			try: +				if count == 1: +					out.append(queue.get(False)) +				elif count < 1: +					while True: +						out.append(queue.get(False)) +					# END for each item +				else: +					for i in xrange(count): +						out.append(queue.get(False)) +					# END for each item +				# END handle count +			except Empty: +				pass +			# END handle exceptions +		else: +			# if we have really bad timing, the source of the channel +			# marks itself closed, but before setting it, the thread  +			# switches to us. We read it, read False, and try to fetch +			# something, and never return. The whole closed channel thing +			# is not atomic ( of course ) +			# This is why we never block for long, to get a chance to recheck +			# for closed channels. +			# We blend this into the timeout of the user +			ourtimeout = 0.25				# the smaller, the more responsive, but the slower  +			wc = self._wc +			timeout = (timeout is None and sys.maxint) or timeout		# make sure we can compute with it +			assert timeout != 0.0, "shouldn't block if timeout is 0"	# okay safe  +			if timeout and ourtimeout > timeout: +				ourtimeout = timeout +			# END setup timeout +			 +			# to get everything into one loop, we set the count accordingly +			if count == 0: +				count = sys.maxint  			# END handle count -		except Empty: -			pass -		# END handle exceptions +			 +			for i in xrange(count): +				have_timeout = False +				st = time() +				while True: +					try: +						if wc.closed: +							have_timeout = True +							break +						# END don't continue on closed channels +						 +						# END abort reading if it was closed ( in the meanwhile ) +						out.append(queue.get(block, ourtimeout)) +						break	# breakout right away +					except Empty: +						if timeout - (time() - st) <= 0: +							# hitting timeout +							have_timeout = True +							break +						# END abort if the user wants no more time spent here +					# END handle timeout +				# END endless timer loop +				if have_timeout: +					break +				# END stop on timeout +			# END for each item +		# END handle blocking  		return out  	#} END interface  | 
