diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-12 12:41:20 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-12 12:41:20 +0200 |
commit | f91495e271597034226f1b9651345091083172c4 (patch) | |
tree | e0e2aa63b7dc649083858366eaedb6ac4cc5739b /lib/git/async/channel.py | |
parent | 7c1169f6ea406fec1e26e99821e18e66437e65eb (diff) | |
parent | 7a0b79ee574999ecbc76696506352e4a5a0d7159 (diff) | |
download | gitpython-f91495e271597034226f1b9651345091083172c4.tar.gz |
Merge branch 'async'
Diffstat (limited to 'lib/git/async/channel.py')
-rw-r--r-- | lib/git/async/channel.py | 338 |
1 files changed, 338 insertions, 0 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py new file mode 100644 index 00000000..a29ff17c --- /dev/null +++ b/lib/git/async/channel.py @@ -0,0 +1,338 @@ +"""Contains a queue based channel implementation""" +from Queue import ( + Empty, + Full + ) + +from util import ( + AsyncQueue, + SyncQueue, + ReadOnly + ) + +from time import time +import threading +import sys + +__all__ = ('Channel', 'SerialChannel', 'Writer', 'ChannelWriter', 'CallbackChannelWriter', + 'Reader', 'ChannelReader', 'CallbackChannelReader', 'mkchannel', 'ReadOnly', + 'IteratorReader') + +#{ Classes +class Channel(object): + """A channel is similar to a file like object. It has a write end as well as one or + more read ends. If Data is in the channel, it can be read, if not the read operation + will block until data becomes available. + If the channel is closed, any read operation will result in an exception + + This base class is not instantiated directly, but instead serves as constructor + for Rwriter pairs. + + Create a new channel """ + __slots__ = 'queue' + + # The queue to use to store the actual data + QueueCls = AsyncQueue + + def __init__(self): + """initialize this instance with a queue holding the channel contents""" + self.queue = self.QueueCls() + + +class SerialChannel(Channel): + """A slightly faster version of a Channel, which sacrificed thead-safety for performance""" + QueueCls = SyncQueue + + +class Writer(object): + """A writer is an object providing write access to a possibly blocking reading device""" + __slots__ = tuple() + + #{ Interface + + def __init__(self, device): + """Initialize the instance with the device to write to""" + + def write(self, item, block=True, timeout=None): + """Write the given item into the device + :param block: True if the device may block until space for the item is available + :param timeout: The time in seconds to wait for the device to become ready + in blocking mode""" + raise NotImplementedError() + + def size(self): + """:return: number of items already in the device, they could be read with a reader""" + raise NotImplementedError() + + def close(self): + """Close the channel. Multiple close calls on a closed channel are no + an error""" + raise NotImplementedError() + + def closed(self): + """:return: True if the channel was closed""" + raise NotImplementedError() + + #} END interface + + +class ChannelWriter(Writer): + """The write end of a channel, a file-like interface for a channel""" + __slots__ = ('channel', '_put') + + def __init__(self, channel): + """Initialize the writer to use the given channel""" + self.channel = channel + self._put = self.channel.queue.put + + #{ Interface + def write(self, item, block=False, timeout=None): + return self._put(item, block, timeout) + + def size(self): + return self.channel.queue.qsize() + + def close(self): + """Close the channel. Multiple close calls on a closed channel are no + an error""" + self.channel.queue.set_writable(False) + + def closed(self): + """:return: True if the channel was closed""" + return not self.channel.queue.writable() + #} END interface + + +class CallbackWriterMixin(object): + """The write end of a channel which allows you to setup a callback to be + called after an item was written to the channel""" + # slots don't work with mixin's :( + # __slots__ = ('_pre_cb') + + def __init__(self, *args): + super(CallbackWriterMixin, self).__init__(*args) + self._pre_cb = None + + def set_pre_cb(self, fun = lambda item: item): + """Install a callback to be called before the given item is written. + It returns a possibly altered item which will be written to the channel + instead, making it useful for pre-write item conversions. + Providing None uninstalls the current method. + :return: the previously installed function or None + :note: Must be thread-safe if the channel is used in multiple threads""" + prev = self._pre_cb + self._pre_cb = fun + return prev + + def write(self, item, block=True, timeout=None): + if self._pre_cb: + item = self._pre_cb(item) + super(CallbackWriterMixin, self).write(item, block, timeout) + + +class CallbackChannelWriter(CallbackWriterMixin, ChannelWriter): + """Implements a channel writer with callback functionality""" + pass + + +class Reader(object): + """Allows reading from a device""" + __slots__ = tuple() + + #{ Interface + def __init__(self, device): + """Initialize the instance with the device to read from""" + + def read(self, count=0, block=True, timeout=None): + """read a list of items read from the device. The list, as a sequence + of items, is similar to the string of characters returned when reading from + file like objects. + :param count: given amount of items to read. If < 1, all items will be read + :param block: if True, the call will block until an item is available + :param timeout: if positive and block is True, it will block only for the + given amount of seconds, returning the items it received so far. + The timeout is applied to each read item, not for the whole operation. + :return: single item in a list if count is 1, or a list of count items. + If the device was empty and count was 1, an empty list will be returned. + If count was greater 1, a list with less than count items will be + returned. + If count was < 1, a list with all items that could be read will be + returned.""" + raise NotImplementedError() + + +class ChannelReader(Reader): + """Allows reading from a channel. The reader is thread-safe if the channel is as well""" + __slots__ = 'channel' + + def __init__(self, channel): + """Initialize this instance from its parent write channel""" + self.channel = channel + + #{ Interface + + def read(self, count=0, block=True, timeout=None): + # if the channel is closed for writing, we never block + # NOTE: is handled by the queue + # We don't check for a closed state here has it costs time - most of + # the time, it will not be closed, and will bail out automatically once + # it gets closed + + + # in non-blocking mode, its all not a problem + out = list() + queue = self.channel.queue + if not block: + # be as fast as possible in non-blocking mode, hence + # its a bit 'unrolled' + try: + if count == 1: + out.append(queue.get(False)) + elif count < 1: + while True: + out.append(queue.get(False)) + # END for each item + else: + for i in xrange(count): + out.append(queue.get(False)) + # END for each item + # END handle count + except Empty: + pass + # END handle exceptions + else: + # to get everything into one loop, we set the count accordingly + if count == 0: + count = sys.maxint + # END handle count + + i = 0 + while i < count: + try: + out.append(queue.get(block, timeout)) + i += 1 + except Empty: + # here we are only if + # someone woke us up to inform us about the queue that changed + # its writable state + # The following branch checks for closed channels, and pulls + # as many items as we need and as possible, before + # leaving the loop. + if not queue.writable(): + try: + while i < count: + out.append(queue.get(False, None)) + i += 1 + # END count loop + except Empty: + break # out of count loop + # END handle absolutely empty queue + # END handle closed channel + + # if we are here, we woke up and the channel is not closed + # Either the queue became writable again, which currently shouldn't + # be able to happen in the channel, or someone read with a timeout + # that actually timed out. + # As it timed out, which is the only reason we are here, + # we have to abort + break + # END ignore empty + + # END for each item + # END handle blocking + return out + + #} END interface + + +class CallbackReaderMixin(object): + """A channel which sends a callback before items are read from the channel""" + # unfortunately, slots can only use direct inheritance, have to turn it off :( + # __slots__ = "_pre_cb" + + def __init__(self, *args): + super(CallbackReaderMixin, self).__init__(*args) + self._pre_cb = None + + def set_pre_cb(self, fun = lambda count: None): + """Install a callback to call with the item count to be read before any + item is actually read from the channel. + Exceptions will be propagated. + If a function is not provided, the call is effectively uninstalled. + :return: the previously installed callback or None + :note: The callback must be threadsafe if the channel is used by multiple threads.""" + prev = self._pre_cb + self._pre_cb = fun + return prev + + def read(self, count=0, block=True, timeout=None): + if self._pre_cb: + self._pre_cb(count) + return super(CallbackReaderMixin, self).read(count, block, timeout) + + +class CallbackChannelReader(CallbackReaderMixin, ChannelReader): + """Implements a channel reader with callback functionality""" + pass + + +class IteratorReader(Reader): + """A Reader allowing to read items from an iterator, instead of a channel. + Reads will never block. Its thread-safe""" + __slots__ = ("_empty", '_iter', '_lock') + + # the type of the lock to use when reading from the iterator + lock_type = threading.Lock + + def __init__(self, iterator): + self._empty = False + if not hasattr(iterator, 'next'): + raise ValueError("Iterator %r needs a next() function" % iterator) + self._iter = iterator + self._lock = self.lock_type() + + def read(self, count=0, block=True, timeout=None): + """Non-Blocking implementation of read""" + # not threadsafe, but worst thing that could happen is that + # we try to get items one more time + if self._empty: + return list() + # END early abort + + self._lock.acquire() + try: + if count == 0: + self._empty = True + return list(self._iter) + else: + out = list() + it = self._iter + for i in xrange(count): + try: + out.append(it.next()) + except StopIteration: + self._empty = True + break + # END handle empty iterator + # END for each item to take + return out + # END handle count + finally: + self._lock.release() + # END handle locking + + +#} END classes + +#{ Constructors +def mkchannel(ctype = Channel, wtype = ChannelWriter, rtype = ChannelReader): + """Create a channel, with a reader and a writer + :return: tuple(reader, writer) + :param ctype: Channel to instantiate + :param wctype: The type of the write channel to instantiate + :param rctype: The type of the read channel to instantiate""" + c = ctype() + wc = wtype(c) + rc = rtype(c) + return wc, rc +#} END constructors |