diff options
Diffstat (limited to 'lib/git/async/channel.py')
| -rw-r--r-- | lib/git/async/channel.py | 139 | 
1 files changed, 116 insertions, 23 deletions
| diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index ae476cda..79cb5294 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -11,10 +11,12 @@ from util import (  		)  from time import time +import threading  import sys -__all__ = ('Channel', 'SerialChannel', 'Writer', 'CallbackWriter', 'Reader', -			'CallbackReader', 'mkchannel', 'ReadOnly') +__all__ = ('Channel', 'SerialChannel', 'Writer', 'ChannelWriter', 'CallbackChannelWriter', +			'Reader', 'ChannelReader', 'CallbackChannelReader', 'mkchannel', 'ReadOnly',  +			'IteratorReader')  #{ Classes   class Channel(object): @@ -43,15 +45,50 @@ class SerialChannel(Channel):  class Writer(object): +	"""A writer is an object providing write access to a possibly blocking reading device""" +	__slots__ = tuple() +	 +	#{ Interface +	 +	def __init__(self, device): +		"""Initialize the instance with the device to write to""" +	 +	def write(self, item, block=True, timeout=None): +		"""Write the given item into the device +		:param block: True if the device may block until space for the item is available +		:param timeout: The time in seconds to wait for the device to become ready  +		in blocking mode""" +		raise NotImplementedError() +		 +	def size(self): +		""":return: number of items already in the device, they could be read with a reader""" +		raise NotImplementedError() +		 +	def close(self): +		"""Close the channel. Multiple close calls on a closed channel are no  +		an error""" +		raise NotImplementedError() +		 +	def closed(self): +		""":return: True if the channel was closed""" +		raise NotImplementedError() +		 +	#} END interface +	 + +class ChannelWriter(Writer):  	"""The write end of a channel, a file-like interface for a channel""" -	__slots__ = ('write', 'channel') +	__slots__ = ('channel', '_put')  	def __init__(self, channel):  		"""Initialize the writer to use the given channel"""  		self.channel = channel -		self.write = channel.queue.put +		self._put = self.channel.queue.put  	#{ Interface +	def write(self, item, block=False, timeout=None): +		return self._put(item, block, timeout) +		  	def size(self):  		return self.channel.queue.qsize() @@ -66,15 +103,14 @@ class Writer(object):  	#} END interface  -class CallbackWriter(Writer): +class CallbackChannelWriter(ChannelWriter):  	"""The write end of a channel which allows you to setup a callback to be   	called after an item was written to the channel"""  	__slots__ = ('_pre_cb')  	def __init__(self, channel): -		Writer.__init__(self, channel) +		super(CallbackChannelWriter, self).__init__(channel)  		self._pre_cb = None -		self.write = self._write  	def set_pre_cb(self, fun = lambda item: item):  		"""Install a callback to be called before the given item is written. @@ -87,25 +123,22 @@ class CallbackWriter(Writer):  		self._pre_cb = fun  		return prev -	def _write(self, item, block=True, timeout=None): +	def write(self, item, block=True, timeout=None):  		if self._pre_cb:  			item = self._pre_cb(item) -		self.channel.queue.put(item, block, timeout) +		super(CallbackChannelWriter, self).write(item, block, timeout)  class Reader(object): -	"""Allows reading from a channel""" -	__slots__ = 'channel' +	"""Allows reading from a device""" +	__slots__ = tuple() -	def __init__(self, channel): -		"""Initialize this instance from its parent write channel""" -		self.channel = channel -		 -		  	#{ Interface -	 +	def __init__(self, device): +		"""Initialize the instance with the device to read from""" +		  	def read(self, count=0, block=True, timeout=None): -		"""read a list of items read from the channel. The list, as a sequence +		"""read a list of items read from the device. The list, as a sequence  		of items, is similar to the string of characters returned when reading from   		file like objects.  		:param count: given amount of items to read. If < 1, all items will be read @@ -114,11 +147,25 @@ class Reader(object):  			given amount of seconds, returning the items it received so far.  			The timeout is applied to each read item, not for the whole operation.  		:return: single item in a list if count is 1, or a list of count items.  -			If the channel was empty and count was 1, an empty list will be returned. +			If the device was empty and count was 1, an empty list will be returned.  			If count was greater 1, a list with less than count items will be   			returned.  			If count was < 1, a list with all items that could be read will be   			returned.""" +		raise NotImplementedError() +		 + +class ChannelReader(Reader): +	"""Allows reading from a channel. The reader is thread-safe if the channel is as well""" +	__slots__ = 'channel' +	 +	def __init__(self, channel): +		"""Initialize this instance from its parent write channel""" +		self.channel = channel +		 +	#{ Interface +	 +	def read(self, count=0, block=True, timeout=None):  		# if the channel is closed for writing, we never block  		# NOTE: is handled by the queue  		# We don't check for a closed state here has it costs time - most of  @@ -191,12 +238,12 @@ class Reader(object):  	#} END interface  -class CallbackReader(Reader): +class CallbackChannelReader(ChannelReader):  	"""A channel which sends a callback before items are read from the channel"""  	__slots__ = "_pre_cb"  	def __init__(self, channel): -		Reader.__init__(self, channel) +		super(CallbackChannelReader, self).__init__(channel)  		self._pre_cb = None  	def set_pre_cb(self, fun = lambda count: None): @@ -213,13 +260,59 @@ class CallbackReader(Reader):  	def read(self, count=0, block=True, timeout=None):  		if self._pre_cb:  			self._pre_cb(count) -		return Reader.read(self, count, block, timeout) +		return super(CallbackChannelReader, self).read(count, block, timeout) + +class IteratorReader(Reader): +	"""A Reader allowing to read items from an iterator, instead of a channel. +	Reads will never block. Its thread-safe""" +	__slots__ = ("_empty", '_iter', '_lock') +	 +	# the type of the lock to use when reading from the iterator +	lock_type = threading.Lock +	 +	def __init__(self, iterator): +		self._empty = False +		if not hasattr(iterator, 'next'): +			raise ValueError("Iterator %r needs a next() function" % iterator) +		self._iter = iterator +		self._lock = self.lock_type() +		 +	def read(self, count=0, block=True, timeout=None): +		"""Non-Blocking implementation of read""" +		# not threadsafe, but worst thing that could happen is that  +		# we try to get items one more time +		if self._empty: +			return list() +		# END early abort +		 +		self._lock.acquire() +		try: +			if count == 0: +				self._empty = True +				return list(self._iter) +			else: +				out = list() +				it = self._iter +				for i in xrange(count): +					try: +						out.append(it.next()) +					except StopIteration: +						self._empty = True +						break +					# END handle empty iterator +				# END for each item to take +				return out +			# END handle count +		finally: +			self._lock.release() +		# END handle locking +		  #} END classes  #{ Constructors -def mkchannel(ctype = Channel, wtype = Writer, rtype = Reader): +def mkchannel(ctype = Channel, wtype = ChannelWriter, rtype = ChannelReader):  	"""Create a channel, with a reader and a writer  	:return: tuple(reader, writer)  	:param ctype: Channel to instantiate | 
