diff options
Diffstat (limited to 'lib/git/async/channel.py')
| -rw-r--r-- | lib/git/async/channel.py | 102 | 
1 files changed, 47 insertions, 55 deletions
| diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index abb31035..9b019707 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -21,61 +21,57 @@ class Channel(object):  	If the channel is closed, any read operation will result in an exception  	This base class is not instantiated directly, but instead serves as constructor -	for RWChannel pairs. +	for Rwriter pairs.  	Create a new channel """ -	__slots__ = tuple() - - -class WChannel(Channel): -	"""The write end of a channel - it is thread-safe""" -	__slots__ = ('_queue') +	__slots__ = 'queue'  	# The queue to use to store the actual data  	QueueCls = AsyncQueue  	def __init__(self): -		"""initialize this instance, able to hold max_items at once -		Write calls will block if the channel is full, until someone reads from it""" -		self._queue = self.QueueCls() -	 -	#{ Interface  -	def write(self, item, block=True, timeout=None): -		"""Send an item into the channel, it can be read from the read end of the  -		channel accordingly -		:param item: Item to send -		:param block: If True, the call will block until there is free space in the  -			channel -		:param timeout: timeout in seconds for blocking calls. -		:raise ReadOnly: when writing into closed channel""" -		# let the queue handle the 'closed' attribute, we write much more often  -		# to an open channel than to a closed one, saving a few cycles -		self._queue.put(item, block, timeout) -		 +		"""initialize this instance with a queue holding the channel contents"""  +		self.queue = self.QueueCls() + + +class SerialChannel(Channel): +	"""A slightly faster version of a Channel, which sacrificed thead-safety for performance""" +	QueueCls = SyncQueue + + +class Writer(object): +	"""The write end of a channel, a file-like interface for a channel""" +	__slots__ = ('write', 'channel') +	 +	def __init__(self, channel): +		"""Initialize the writer to use the given channel""" +		self.channel = channel +		self.write = channel.queue.put +	 +	#{ Interface  	def size(self): -		""":return: approximate number of items that could be read from the read-ends -			of this channel""" -		return self._queue.qsize() +		return self.channel.queue.qsize()  	def close(self):  		"""Close the channel. Multiple close calls on a closed channel are no   		an error""" -		self._queue.set_writable(False) +		self.channel.queue.set_writable(False)  	def closed(self):  		""":return: True if the channel was closed""" -		return not self._queue.writable() +		return not self.channel.queue.writable()  	#} END interface  -class CallbackWChannel(WChannel): +class CallbackWriter(Writer):  	"""The write end of a channel which allows you to setup a callback to be   	called after an item was written to the channel"""  	__slots__ = ('_pre_cb') -	def __init__(self): -		WChannel.__init__(self) +	def __init__(self, channel): +		Writer.__init__(self, channel)  		self._pre_cb = None +		self.write = self._write  	def set_pre_cb(self, fun = lambda item: item):  		"""Install a callback to be called before the given item is written. @@ -88,25 +84,19 @@ class CallbackWChannel(WChannel):  		self._pre_cb = fun  		return prev -	def write(self, item, block=True, timeout=None): +	def _write(self, item, block=True, timeout=None):  		if self._pre_cb:  			item = self._pre_cb(item) -		WChannel.write(self, item, block, timeout) +		self.channel.queue.put(item, block, timeout) -	 -class SerialWChannel(WChannel): -	"""A slightly faster version of a WChannel, which sacrificed thead-safety for -	performance""" -	QueueCls = SyncQueue - -class RChannel(Channel): -	"""The read-end of a corresponding write channel""" -	__slots__ = '_wc' +class Reader(object): +	"""Allows reading from a channel""" +	__slots__ = 'channel' -	def __init__(self, wchannel): +	def __init__(self, channel):  		"""Initialize this instance from its parent write channel""" -		self._wc = wchannel +		self.channel = channel  	#{ Interface @@ -135,7 +125,7 @@ class RChannel(Channel):  		# in non-blocking mode, its all not a problem  		out = list() -		queue = self._wc._queue +		queue = self.channel.queue  		if not block:  			# be as fast as possible in non-blocking mode, hence  			# its a bit 'unrolled' @@ -198,12 +188,12 @@ class RChannel(Channel):  	#} END interface  -class CallbackRChannel(RChannel): +class CallbackReader(Reader):  	"""A channel which sends a callback before items are read from the channel"""  	__slots__ = "_pre_cb" -	def __init__(self, wc): -		RChannel.__init__(self, wc) +	def __init__(self, channel): +		Reader.__init__(self, channel)  		self._pre_cb = None  	def set_pre_cb(self, fun = lambda count: None): @@ -220,18 +210,20 @@ class CallbackRChannel(RChannel):  	def read(self, count=0, block=True, timeout=None):  		if self._pre_cb:  			self._pre_cb(count) -		return RChannel.read(self, count, block, timeout) +		return Reader.read(self, count, block, timeout)  #} END classes  #{ Constructors -def mkchannel(wctype = WChannel, rctype = RChannel): -	"""Create a channel, which consists of one write end and one read end -	:return: tuple(write_channel, read_channel) +def mkchannel(ctype = Channel, wtype = Writer, rtype = Reader): +	"""Create a channel, with a reader and a writer +	:return: tuple(reader, writer) +	:param ctype: Channel to instantiate  	:param wctype: The type of the write channel to instantiate  	:param rctype: The type of the read channel to instantiate""" -	wc = wctype() -	rc = rctype(wc) +	c = ctype() +	wc = wtype(c) +	rc = rtype(c)  	return wc, rc  #} END constructors | 
