summaryrefslogtreecommitdiff
path: root/lib/git/async/channel.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async/channel.py')
-rw-r--r--lib/git/async/channel.py338
1 files changed, 338 insertions, 0 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
new file mode 100644
index 00000000..a29ff17c
--- /dev/null
+++ b/lib/git/async/channel.py
@@ -0,0 +1,338 @@
+"""Contains a queue based channel implementation"""
+from Queue import (
+ Empty,
+ Full
+ )
+
+from util import (
+ AsyncQueue,
+ SyncQueue,
+ ReadOnly
+ )
+
+from time import time
+import threading
+import sys
+
+__all__ = ('Channel', 'SerialChannel', 'Writer', 'ChannelWriter', 'CallbackChannelWriter',
+ 'Reader', 'ChannelReader', 'CallbackChannelReader', 'mkchannel', 'ReadOnly',
+ 'IteratorReader')
+
+#{ Classes
+class Channel(object):
+ """A channel is similar to a file like object. It has a write end as well as one or
+ more read ends. If Data is in the channel, it can be read, if not the read operation
+ will block until data becomes available.
+ If the channel is closed, any read operation will result in an exception
+
+ This base class is not instantiated directly, but instead serves as constructor
+ for Rwriter pairs.
+
+ Create a new channel """
+ __slots__ = 'queue'
+
+ # The queue to use to store the actual data
+ QueueCls = AsyncQueue
+
+ def __init__(self):
+ """initialize this instance with a queue holding the channel contents"""
+ self.queue = self.QueueCls()
+
+
+class SerialChannel(Channel):
+ """A slightly faster version of a Channel, which sacrificed thead-safety for performance"""
+ QueueCls = SyncQueue
+
+
+class Writer(object):
+ """A writer is an object providing write access to a possibly blocking reading device"""
+ __slots__ = tuple()
+
+ #{ Interface
+
+ def __init__(self, device):
+ """Initialize the instance with the device to write to"""
+
+ def write(self, item, block=True, timeout=None):
+ """Write the given item into the device
+ :param block: True if the device may block until space for the item is available
+ :param timeout: The time in seconds to wait for the device to become ready
+ in blocking mode"""
+ raise NotImplementedError()
+
+ def size(self):
+ """:return: number of items already in the device, they could be read with a reader"""
+ raise NotImplementedError()
+
+ def close(self):
+ """Close the channel. Multiple close calls on a closed channel are no
+ an error"""
+ raise NotImplementedError()
+
+ def closed(self):
+ """:return: True if the channel was closed"""
+ raise NotImplementedError()
+
+ #} END interface
+
+
+class ChannelWriter(Writer):
+ """The write end of a channel, a file-like interface for a channel"""
+ __slots__ = ('channel', '_put')
+
+ def __init__(self, channel):
+ """Initialize the writer to use the given channel"""
+ self.channel = channel
+ self._put = self.channel.queue.put
+
+ #{ Interface
+ def write(self, item, block=False, timeout=None):
+ return self._put(item, block, timeout)
+
+ def size(self):
+ return self.channel.queue.qsize()
+
+ def close(self):
+ """Close the channel. Multiple close calls on a closed channel are no
+ an error"""
+ self.channel.queue.set_writable(False)
+
+ def closed(self):
+ """:return: True if the channel was closed"""
+ return not self.channel.queue.writable()
+ #} END interface
+
+
+class CallbackWriterMixin(object):
+ """The write end of a channel which allows you to setup a callback to be
+ called after an item was written to the channel"""
+ # slots don't work with mixin's :(
+ # __slots__ = ('_pre_cb')
+
+ def __init__(self, *args):
+ super(CallbackWriterMixin, self).__init__(*args)
+ self._pre_cb = None
+
+ def set_pre_cb(self, fun = lambda item: item):
+ """Install a callback to be called before the given item is written.
+ It returns a possibly altered item which will be written to the channel
+ instead, making it useful for pre-write item conversions.
+ Providing None uninstalls the current method.
+ :return: the previously installed function or None
+ :note: Must be thread-safe if the channel is used in multiple threads"""
+ prev = self._pre_cb
+ self._pre_cb = fun
+ return prev
+
+ def write(self, item, block=True, timeout=None):
+ if self._pre_cb:
+ item = self._pre_cb(item)
+ super(CallbackWriterMixin, self).write(item, block, timeout)
+
+
+class CallbackChannelWriter(CallbackWriterMixin, ChannelWriter):
+ """Implements a channel writer with callback functionality"""
+ pass
+
+
+class Reader(object):
+ """Allows reading from a device"""
+ __slots__ = tuple()
+
+ #{ Interface
+ def __init__(self, device):
+ """Initialize the instance with the device to read from"""
+
+ def read(self, count=0, block=True, timeout=None):
+ """read a list of items read from the device. The list, as a sequence
+ of items, is similar to the string of characters returned when reading from
+ file like objects.
+ :param count: given amount of items to read. If < 1, all items will be read
+ :param block: if True, the call will block until an item is available
+ :param timeout: if positive and block is True, it will block only for the
+ given amount of seconds, returning the items it received so far.
+ The timeout is applied to each read item, not for the whole operation.
+ :return: single item in a list if count is 1, or a list of count items.
+ If the device was empty and count was 1, an empty list will be returned.
+ If count was greater 1, a list with less than count items will be
+ returned.
+ If count was < 1, a list with all items that could be read will be
+ returned."""
+ raise NotImplementedError()
+
+
+class ChannelReader(Reader):
+ """Allows reading from a channel. The reader is thread-safe if the channel is as well"""
+ __slots__ = 'channel'
+
+ def __init__(self, channel):
+ """Initialize this instance from its parent write channel"""
+ self.channel = channel
+
+ #{ Interface
+
+ def read(self, count=0, block=True, timeout=None):
+ # if the channel is closed for writing, we never block
+ # NOTE: is handled by the queue
+ # We don't check for a closed state here has it costs time - most of
+ # the time, it will not be closed, and will bail out automatically once
+ # it gets closed
+
+
+ # in non-blocking mode, its all not a problem
+ out = list()
+ queue = self.channel.queue
+ if not block:
+ # be as fast as possible in non-blocking mode, hence
+ # its a bit 'unrolled'
+ try:
+ if count == 1:
+ out.append(queue.get(False))
+ elif count < 1:
+ while True:
+ out.append(queue.get(False))
+ # END for each item
+ else:
+ for i in xrange(count):
+ out.append(queue.get(False))
+ # END for each item
+ # END handle count
+ except Empty:
+ pass
+ # END handle exceptions
+ else:
+ # to get everything into one loop, we set the count accordingly
+ if count == 0:
+ count = sys.maxint
+ # END handle count
+
+ i = 0
+ while i < count:
+ try:
+ out.append(queue.get(block, timeout))
+ i += 1
+ except Empty:
+ # here we are only if
+ # someone woke us up to inform us about the queue that changed
+ # its writable state
+ # The following branch checks for closed channels, and pulls
+ # as many items as we need and as possible, before
+ # leaving the loop.
+ if not queue.writable():
+ try:
+ while i < count:
+ out.append(queue.get(False, None))
+ i += 1
+ # END count loop
+ except Empty:
+ break # out of count loop
+ # END handle absolutely empty queue
+ # END handle closed channel
+
+ # if we are here, we woke up and the channel is not closed
+ # Either the queue became writable again, which currently shouldn't
+ # be able to happen in the channel, or someone read with a timeout
+ # that actually timed out.
+ # As it timed out, which is the only reason we are here,
+ # we have to abort
+ break
+ # END ignore empty
+
+ # END for each item
+ # END handle blocking
+ return out
+
+ #} END interface
+
+
+class CallbackReaderMixin(object):
+ """A channel which sends a callback before items are read from the channel"""
+ # unfortunately, slots can only use direct inheritance, have to turn it off :(
+ # __slots__ = "_pre_cb"
+
+ def __init__(self, *args):
+ super(CallbackReaderMixin, self).__init__(*args)
+ self._pre_cb = None
+
+ def set_pre_cb(self, fun = lambda count: None):
+ """Install a callback to call with the item count to be read before any
+ item is actually read from the channel.
+ Exceptions will be propagated.
+ If a function is not provided, the call is effectively uninstalled.
+ :return: the previously installed callback or None
+ :note: The callback must be threadsafe if the channel is used by multiple threads."""
+ prev = self._pre_cb
+ self._pre_cb = fun
+ return prev
+
+ def read(self, count=0, block=True, timeout=None):
+ if self._pre_cb:
+ self._pre_cb(count)
+ return super(CallbackReaderMixin, self).read(count, block, timeout)
+
+
+class CallbackChannelReader(CallbackReaderMixin, ChannelReader):
+ """Implements a channel reader with callback functionality"""
+ pass
+
+
+class IteratorReader(Reader):
+ """A Reader allowing to read items from an iterator, instead of a channel.
+ Reads will never block. Its thread-safe"""
+ __slots__ = ("_empty", '_iter', '_lock')
+
+ # the type of the lock to use when reading from the iterator
+ lock_type = threading.Lock
+
+ def __init__(self, iterator):
+ self._empty = False
+ if not hasattr(iterator, 'next'):
+ raise ValueError("Iterator %r needs a next() function" % iterator)
+ self._iter = iterator
+ self._lock = self.lock_type()
+
+ def read(self, count=0, block=True, timeout=None):
+ """Non-Blocking implementation of read"""
+ # not threadsafe, but worst thing that could happen is that
+ # we try to get items one more time
+ if self._empty:
+ return list()
+ # END early abort
+
+ self._lock.acquire()
+ try:
+ if count == 0:
+ self._empty = True
+ return list(self._iter)
+ else:
+ out = list()
+ it = self._iter
+ for i in xrange(count):
+ try:
+ out.append(it.next())
+ except StopIteration:
+ self._empty = True
+ break
+ # END handle empty iterator
+ # END for each item to take
+ return out
+ # END handle count
+ finally:
+ self._lock.release()
+ # END handle locking
+
+
+#} END classes
+
+#{ Constructors
+def mkchannel(ctype = Channel, wtype = ChannelWriter, rtype = ChannelReader):
+ """Create a channel, with a reader and a writer
+ :return: tuple(reader, writer)
+ :param ctype: Channel to instantiate
+ :param wctype: The type of the write channel to instantiate
+ :param rctype: The type of the read channel to instantiate"""
+ c = ctype()
+ wc = wtype(c)
+ rc = rtype(c)
+ return wc, rc
+#} END constructors