From be8955a0fbb77d673587974b763f17c214904b57 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 12 Jun 2010 11:19:18 +0200 Subject: Cleaned up channel design, Reader and Writer bases don't require a channel anymore, but are abstract. Added IteratorReader, implementing the reader interface from an iterator. The implementation moved from the TaskIterator to the channel --- lib/git/async/channel.py | 139 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 116 insertions(+), 23 deletions(-) (limited to 'lib/git/async/channel.py') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index ae476cda..79cb5294 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -11,10 +11,12 @@ from util import ( ) from time import time +import threading import sys -__all__ = ('Channel', 'SerialChannel', 'Writer', 'CallbackWriter', 'Reader', - 'CallbackReader', 'mkchannel', 'ReadOnly') +__all__ = ('Channel', 'SerialChannel', 'Writer', 'ChannelWriter', 'CallbackChannelWriter', + 'Reader', 'ChannelReader', 'CallbackChannelReader', 'mkchannel', 'ReadOnly', + 'IteratorReader') #{ Classes class Channel(object): @@ -43,15 +45,50 @@ class SerialChannel(Channel): class Writer(object): + """A writer is an object providing write access to a possibly blocking reading device""" + __slots__ = tuple() + + #{ Interface + + def __init__(self, device): + """Initialize the instance with the device to write to""" + + def write(self, item, block=True, timeout=None): + """Write the given item into the device + :param block: True if the device may block until space for the item is available + :param timeout: The time in seconds to wait for the device to become ready + in blocking mode""" + raise NotImplementedError() + + def size(self): + """:return: number of items already in the device, they could be read with a reader""" + raise NotImplementedError() + + def close(self): + """Close the channel. Multiple close calls on a closed channel are no + an error""" + raise NotImplementedError() + + def closed(self): + """:return: True if the channel was closed""" + raise NotImplementedError() + + #} END interface + + +class ChannelWriter(Writer): """The write end of a channel, a file-like interface for a channel""" - __slots__ = ('write', 'channel') + __slots__ = ('channel', '_put') def __init__(self, channel): """Initialize the writer to use the given channel""" self.channel = channel - self.write = channel.queue.put + self._put = self.channel.queue.put #{ Interface + def write(self, item, block=False, timeout=None): + return self._put(item, block, timeout) + def size(self): return self.channel.queue.qsize() @@ -66,15 +103,14 @@ class Writer(object): #} END interface -class CallbackWriter(Writer): +class CallbackChannelWriter(ChannelWriter): """The write end of a channel which allows you to setup a callback to be called after an item was written to the channel""" __slots__ = ('_pre_cb') def __init__(self, channel): - Writer.__init__(self, channel) + super(CallbackChannelWriter, self).__init__(channel) self._pre_cb = None - self.write = self._write def set_pre_cb(self, fun = lambda item: item): """Install a callback to be called before the given item is written. @@ -87,25 +123,22 @@ class CallbackWriter(Writer): self._pre_cb = fun return prev - def _write(self, item, block=True, timeout=None): + def write(self, item, block=True, timeout=None): if self._pre_cb: item = self._pre_cb(item) - self.channel.queue.put(item, block, timeout) + super(CallbackChannelWriter, self).write(item, block, timeout) class Reader(object): - """Allows reading from a channel""" - __slots__ = 'channel' + """Allows reading from a device""" + __slots__ = tuple() - def __init__(self, channel): - """Initialize this instance from its parent write channel""" - self.channel = channel - - #{ Interface - + def __init__(self, device): + """Initialize the instance with the device to read from""" + def read(self, count=0, block=True, timeout=None): - """read a list of items read from the channel. The list, as a sequence + """read a list of items read from the device. The list, as a sequence of items, is similar to the string of characters returned when reading from file like objects. :param count: given amount of items to read. If < 1, all items will be read @@ -114,11 +147,25 @@ class Reader(object): given amount of seconds, returning the items it received so far. The timeout is applied to each read item, not for the whole operation. :return: single item in a list if count is 1, or a list of count items. - If the channel was empty and count was 1, an empty list will be returned. + If the device was empty and count was 1, an empty list will be returned. If count was greater 1, a list with less than count items will be returned. If count was < 1, a list with all items that could be read will be returned.""" + raise NotImplementedError() + + +class ChannelReader(Reader): + """Allows reading from a channel. The reader is thread-safe if the channel is as well""" + __slots__ = 'channel' + + def __init__(self, channel): + """Initialize this instance from its parent write channel""" + self.channel = channel + + #{ Interface + + def read(self, count=0, block=True, timeout=None): # if the channel is closed for writing, we never block # NOTE: is handled by the queue # We don't check for a closed state here has it costs time - most of @@ -191,12 +238,12 @@ class Reader(object): #} END interface -class CallbackReader(Reader): +class CallbackChannelReader(ChannelReader): """A channel which sends a callback before items are read from the channel""" __slots__ = "_pre_cb" def __init__(self, channel): - Reader.__init__(self, channel) + super(CallbackChannelReader, self).__init__(channel) self._pre_cb = None def set_pre_cb(self, fun = lambda count: None): @@ -213,13 +260,59 @@ class CallbackReader(Reader): def read(self, count=0, block=True, timeout=None): if self._pre_cb: self._pre_cb(count) - return Reader.read(self, count, block, timeout) + return super(CallbackChannelReader, self).read(count, block, timeout) + +class IteratorReader(Reader): + """A Reader allowing to read items from an iterator, instead of a channel. + Reads will never block. Its thread-safe""" + __slots__ = ("_empty", '_iter', '_lock') + + # the type of the lock to use when reading from the iterator + lock_type = threading.Lock + + def __init__(self, iterator): + self._empty = False + if not hasattr(iterator, 'next'): + raise ValueError("Iterator %r needs a next() function" % iterator) + self._iter = iterator + self._lock = self.lock_type() + + def read(self, count=0, block=True, timeout=None): + """Non-Blocking implementation of read""" + # not threadsafe, but worst thing that could happen is that + # we try to get items one more time + if self._empty: + return list() + # END early abort + + self._lock.acquire() + try: + if count == 0: + self._empty = True + return list(self._iter) + else: + out = list() + it = self._iter + for i in xrange(count): + try: + out.append(it.next()) + except StopIteration: + self._empty = True + break + # END handle empty iterator + # END for each item to take + return out + # END handle count + finally: + self._lock.release() + # END handle locking + #} END classes #{ Constructors -def mkchannel(ctype = Channel, wtype = Writer, rtype = Reader): +def mkchannel(ctype = Channel, wtype = ChannelWriter, rtype = ChannelReader): """Create a channel, with a reader and a writer :return: tuple(reader, writer) :param ctype: Channel to instantiate -- cgit v1.2.1