diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-12 14:34:09 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-12 14:34:09 +0200 |
commit | 86ea63504f3e8a74cfb1d533be9d9602d2d17e27 (patch) | |
tree | a2c59af267666a4b44bda748b806585c46faae99 /lib/git/async/channel.py | |
parent | f91495e271597034226f1b9651345091083172c4 (diff) | |
download | gitpython-86ea63504f3e8a74cfb1d533be9d9602d2d17e27.tar.gz |
Removed async from tree
Diffstat (limited to 'lib/git/async/channel.py')
-rw-r--r-- | lib/git/async/channel.py | 338 |
1 files changed, 0 insertions, 338 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py deleted file mode 100644 index a29ff17c..00000000 --- a/lib/git/async/channel.py +++ /dev/null @@ -1,338 +0,0 @@ -"""Contains a queue based channel implementation""" -from Queue import ( - Empty, - Full - ) - -from util import ( - AsyncQueue, - SyncQueue, - ReadOnly - ) - -from time import time -import threading -import sys - -__all__ = ('Channel', 'SerialChannel', 'Writer', 'ChannelWriter', 'CallbackChannelWriter', - 'Reader', 'ChannelReader', 'CallbackChannelReader', 'mkchannel', 'ReadOnly', - 'IteratorReader') - -#{ Classes -class Channel(object): - """A channel is similar to a file like object. It has a write end as well as one or - more read ends. If Data is in the channel, it can be read, if not the read operation - will block until data becomes available. - If the channel is closed, any read operation will result in an exception - - This base class is not instantiated directly, but instead serves as constructor - for Rwriter pairs. - - Create a new channel """ - __slots__ = 'queue' - - # The queue to use to store the actual data - QueueCls = AsyncQueue - - def __init__(self): - """initialize this instance with a queue holding the channel contents""" - self.queue = self.QueueCls() - - -class SerialChannel(Channel): - """A slightly faster version of a Channel, which sacrificed thead-safety for performance""" - QueueCls = SyncQueue - - -class Writer(object): - """A writer is an object providing write access to a possibly blocking reading device""" - __slots__ = tuple() - - #{ Interface - - def __init__(self, device): - """Initialize the instance with the device to write to""" - - def write(self, item, block=True, timeout=None): - """Write the given item into the device - :param block: True if the device may block until space for the item is available - :param timeout: The time in seconds to wait for the device to become ready - in blocking mode""" - raise NotImplementedError() - - def size(self): - """:return: number of items already in the device, they could be read with a reader""" - raise NotImplementedError() - - def close(self): - """Close the channel. Multiple close calls on a closed channel are no - an error""" - raise NotImplementedError() - - def closed(self): - """:return: True if the channel was closed""" - raise NotImplementedError() - - #} END interface - - -class ChannelWriter(Writer): - """The write end of a channel, a file-like interface for a channel""" - __slots__ = ('channel', '_put') - - def __init__(self, channel): - """Initialize the writer to use the given channel""" - self.channel = channel - self._put = self.channel.queue.put - - #{ Interface - def write(self, item, block=False, timeout=None): - return self._put(item, block, timeout) - - def size(self): - return self.channel.queue.qsize() - - def close(self): - """Close the channel. Multiple close calls on a closed channel are no - an error""" - self.channel.queue.set_writable(False) - - def closed(self): - """:return: True if the channel was closed""" - return not self.channel.queue.writable() - #} END interface - - -class CallbackWriterMixin(object): - """The write end of a channel which allows you to setup a callback to be - called after an item was written to the channel""" - # slots don't work with mixin's :( - # __slots__ = ('_pre_cb') - - def __init__(self, *args): - super(CallbackWriterMixin, self).__init__(*args) - self._pre_cb = None - - def set_pre_cb(self, fun = lambda item: item): - """Install a callback to be called before the given item is written. - It returns a possibly altered item which will be written to the channel - instead, making it useful for pre-write item conversions. - Providing None uninstalls the current method. - :return: the previously installed function or None - :note: Must be thread-safe if the channel is used in multiple threads""" - prev = self._pre_cb - self._pre_cb = fun - return prev - - def write(self, item, block=True, timeout=None): - if self._pre_cb: - item = self._pre_cb(item) - super(CallbackWriterMixin, self).write(item, block, timeout) - - -class CallbackChannelWriter(CallbackWriterMixin, ChannelWriter): - """Implements a channel writer with callback functionality""" - pass - - -class Reader(object): - """Allows reading from a device""" - __slots__ = tuple() - - #{ Interface - def __init__(self, device): - """Initialize the instance with the device to read from""" - - def read(self, count=0, block=True, timeout=None): - """read a list of items read from the device. The list, as a sequence - of items, is similar to the string of characters returned when reading from - file like objects. - :param count: given amount of items to read. If < 1, all items will be read - :param block: if True, the call will block until an item is available - :param timeout: if positive and block is True, it will block only for the - given amount of seconds, returning the items it received so far. - The timeout is applied to each read item, not for the whole operation. - :return: single item in a list if count is 1, or a list of count items. - If the device was empty and count was 1, an empty list will be returned. - If count was greater 1, a list with less than count items will be - returned. - If count was < 1, a list with all items that could be read will be - returned.""" - raise NotImplementedError() - - -class ChannelReader(Reader): - """Allows reading from a channel. The reader is thread-safe if the channel is as well""" - __slots__ = 'channel' - - def __init__(self, channel): - """Initialize this instance from its parent write channel""" - self.channel = channel - - #{ Interface - - def read(self, count=0, block=True, timeout=None): - # if the channel is closed for writing, we never block - # NOTE: is handled by the queue - # We don't check for a closed state here has it costs time - most of - # the time, it will not be closed, and will bail out automatically once - # it gets closed - - - # in non-blocking mode, its all not a problem - out = list() - queue = self.channel.queue - if not block: - # be as fast as possible in non-blocking mode, hence - # its a bit 'unrolled' - try: - if count == 1: - out.append(queue.get(False)) - elif count < 1: - while True: - out.append(queue.get(False)) - # END for each item - else: - for i in xrange(count): - out.append(queue.get(False)) - # END for each item - # END handle count - except Empty: - pass - # END handle exceptions - else: - # to get everything into one loop, we set the count accordingly - if count == 0: - count = sys.maxint - # END handle count - - i = 0 - while i < count: - try: - out.append(queue.get(block, timeout)) - i += 1 - except Empty: - # here we are only if - # someone woke us up to inform us about the queue that changed - # its writable state - # The following branch checks for closed channels, and pulls - # as many items as we need and as possible, before - # leaving the loop. - if not queue.writable(): - try: - while i < count: - out.append(queue.get(False, None)) - i += 1 - # END count loop - except Empty: - break # out of count loop - # END handle absolutely empty queue - # END handle closed channel - - # if we are here, we woke up and the channel is not closed - # Either the queue became writable again, which currently shouldn't - # be able to happen in the channel, or someone read with a timeout - # that actually timed out. - # As it timed out, which is the only reason we are here, - # we have to abort - break - # END ignore empty - - # END for each item - # END handle blocking - return out - - #} END interface - - -class CallbackReaderMixin(object): - """A channel which sends a callback before items are read from the channel""" - # unfortunately, slots can only use direct inheritance, have to turn it off :( - # __slots__ = "_pre_cb" - - def __init__(self, *args): - super(CallbackReaderMixin, self).__init__(*args) - self._pre_cb = None - - def set_pre_cb(self, fun = lambda count: None): - """Install a callback to call with the item count to be read before any - item is actually read from the channel. - Exceptions will be propagated. - If a function is not provided, the call is effectively uninstalled. - :return: the previously installed callback or None - :note: The callback must be threadsafe if the channel is used by multiple threads.""" - prev = self._pre_cb - self._pre_cb = fun - return prev - - def read(self, count=0, block=True, timeout=None): - if self._pre_cb: - self._pre_cb(count) - return super(CallbackReaderMixin, self).read(count, block, timeout) - - -class CallbackChannelReader(CallbackReaderMixin, ChannelReader): - """Implements a channel reader with callback functionality""" - pass - - -class IteratorReader(Reader): - """A Reader allowing to read items from an iterator, instead of a channel. - Reads will never block. Its thread-safe""" - __slots__ = ("_empty", '_iter', '_lock') - - # the type of the lock to use when reading from the iterator - lock_type = threading.Lock - - def __init__(self, iterator): - self._empty = False - if not hasattr(iterator, 'next'): - raise ValueError("Iterator %r needs a next() function" % iterator) - self._iter = iterator - self._lock = self.lock_type() - - def read(self, count=0, block=True, timeout=None): - """Non-Blocking implementation of read""" - # not threadsafe, but worst thing that could happen is that - # we try to get items one more time - if self._empty: - return list() - # END early abort - - self._lock.acquire() - try: - if count == 0: - self._empty = True - return list(self._iter) - else: - out = list() - it = self._iter - for i in xrange(count): - try: - out.append(it.next()) - except StopIteration: - self._empty = True - break - # END handle empty iterator - # END for each item to take - return out - # END handle count - finally: - self._lock.release() - # END handle locking - - -#} END classes - -#{ Constructors -def mkchannel(ctype = Channel, wtype = ChannelWriter, rtype = ChannelReader): - """Create a channel, with a reader and a writer - :return: tuple(reader, writer) - :param ctype: Channel to instantiate - :param wctype: The type of the write channel to instantiate - :param rctype: The type of the read channel to instantiate""" - c = ctype() - wc = wtype(c) - rc = rtype(c) - return wc, rc -#} END constructors |