diff options
| author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-09 14:47:41 +0200 | 
|---|---|---|
| committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-09 14:50:39 +0200 | 
| commit | 0974f8737a3c56a7c076f9d0b757c6cb106324fb (patch) | |
| tree | d05394903b8dfc63e34806bac7bccb700628a531 /lib/git/async/channel.py | |
| parent | 4e6bece08aea01859a232e99a1e1ad8cc1eb7d36 (diff) | |
| download | gitpython-0974f8737a3c56a7c076f9d0b757c6cb106324fb.tar.gz | |
Channel: Read method revised - now it really really doesn't block anymore, and it runs faster as well, about 2/3 of the performance we have when being in serial mode
Diffstat (limited to 'lib/git/async/channel.py')
| -rw-r--r-- | lib/git/async/channel.py | 85 | 
1 files changed, 37 insertions, 48 deletions
| diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 58c35f96..3a277e7e 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -38,12 +38,11 @@ class Channel(object):  class WChannel(Channel):  	"""The write end of a channel""" -	__slots__ = ('_closed', '_queue') +	__slots__ = ('_queue')  	def __init__(self):  		"""initialize this instance, able to hold max_items at once  		Write calls will block if the channel is full, until someone reads from it""" -		self._closed = False  		self._queue = AsyncQueue() @@ -55,15 +54,10 @@ class WChannel(Channel):  		:param block: If True, the call will block until there is free space in the   			channel  		:param timeout: timeout in seconds for blocking calls. -		:raise IOError: when writing into closed file -		:raise EOFError: when writing into a non-blocking full channel""" +		:raise ReadOnly: when writing into closed channel"""  		# let the queue handle the 'closed' attribute, we write much more often   		# to an open channel than to a closed one, saving a few cycles -		try: -			self._queue.put(item, block, timeout) -		except ReadOnly: -			raise IOError("Cannot write to a closed channel") -		# END exception handling +		self._queue.put(item, block, timeout)  	def size(self):  		""":return: approximate number of items that could be read from the read-ends @@ -73,15 +67,11 @@ class WChannel(Channel):  	def close(self):  		"""Close the channel. Multiple close calls on a closed channel are no   		an error""" -		# yes, close it a little too early, better than having anyone put  -		# additional items -		self._closed = True  		self._queue.set_writable(False) -	@property  	def closed(self):  		""":return: True if the channel was closed""" -		return self._closed +		return not self._queue.writable()  	#} END interface  @@ -104,6 +94,7 @@ class RChannel(Channel):  		:param block: if True, the call will block until an item is available  		:param timeout: if positive and block is True, it will block only for the   			given amount of seconds, returning the items it received so far. +			The timeout is applied to each read item, not for the whole operation.  		:return: single item in a list if count is 1, or a list of count items.   			If the channel was empty and count was 1, an empty list will be returned.  			If count was greater 1, a list with less than count items will be  @@ -112,9 +103,11 @@ class RChannel(Channel):  			returned."""  		# if the channel is closed for writing, we never block  		# NOTE: is handled by the queue -		if self._wc.closed or timeout == 0: -			block = False -			 +		# We don't check for a closed state here has it costs time - most of  +		# the time, it will not be closed, and will bail out automatically once +		# it gets closed +		 +		  		# in non-blocking mode, its all not a problem  		out = list()  		queue = self._wc._queue @@ -142,42 +135,38 @@ class RChannel(Channel):  				count = sys.maxint  			# END handle count -			endtime = sys.maxint		# allows timeout for whole operation -			if timeout is not None: -				endtime = time() + timeout -			# could be improved by a separate: no-endtime branch, saving the time calls -			for i in xrange(count): +			i = 0 +			while i < count:  				try:  					out.append(queue.get(block, timeout)) +					i += 1  				except Empty: -					# here we are only if there is nothing on the queue,  -					# and if we are blocking. If we are not blocking, this  -					# indiccates that the queue was set unwritable in the meanwhile. -					# hence we can abort now to prevent reading (possibly) forever -					# Besides, this is racy as all threads will rip on the channel -					# without waiting until its empty -					if not block: -						break -				# END ignore empty -				 -				# if we have been unblocked because the closed state changed  -				# in the meanwhile, stop trying -				# NOTE: must NOT cache _wc -				if self._wc.closed: -					# If we were closed, we drop out even if there might still  -					# be items. Now its time to get these items, according to  -					# our count. Just switch to unblocking mode. -					# If we are to read unlimited items, this would run forever,  -					# but the EmptyException handler takes care of this -					block = False +					# here we are only if  +					# someone woke us up to inform us about the queue that changed +					# its writable state +					# The following branch checks for closed channels, and pulls +					# as many items as we need and as possible, before  +					# leaving the loop. +					if not queue.writable(): +						try: +							while i < count: +								out.append(queue.get(False, None)) +								i += 1 +							# END count loop +						except Empty: +							break	# out of count loop  +						# END handle absolutely empty queue +					# END handle closed channel  -					# we don't continue, but let the timer decide whether -					# it wants to abort -				# END handle channel cloased -				 -				if time() >= endtime: +					# if we are here, we woke up and the channel is not closed +					# Either the queue became writable again, which currently shouldn't +					# be able to happen in the channel, or someone read with a timeout +					# that actually timed out. +					# As it timed out, which is the only reason we are here,  +					# we have to abort  					break -				# END stop operation on timeout +				# END ignore empty +				  			# END for each item  		# END handle blocking  		return out | 
