summaryrefslogtreecommitdiff
path: root/lib/git/async/channel.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-12 11:19:18 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-12 11:19:29 +0200
commitbe8955a0fbb77d673587974b763f17c214904b57 (patch)
tree3e4cb96ca162c676692f72fc53dc6b1d8dc29e20 /lib/git/async/channel.py
parenta28942bdf01f4ddb9d0b5a0489bd6f4e101dd775 (diff)
downloadgitpython-be8955a0fbb77d673587974b763f17c214904b57.tar.gz
Cleaned up channel design, Reader and Writer bases don't require a channel anymore, but are abstract.
Added IteratorReader, implementing the reader interface from an iterator. The implementation moved from the TaskIterator to the channel
Diffstat (limited to 'lib/git/async/channel.py')
-rw-r--r--lib/git/async/channel.py139
1 files changed, 116 insertions, 23 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
index ae476cda..79cb5294 100644
--- a/lib/git/async/channel.py
+++ b/lib/git/async/channel.py
@@ -11,10 +11,12 @@ from util import (
)
from time import time
+import threading
import sys
-__all__ = ('Channel', 'SerialChannel', 'Writer', 'CallbackWriter', 'Reader',
- 'CallbackReader', 'mkchannel', 'ReadOnly')
+__all__ = ('Channel', 'SerialChannel', 'Writer', 'ChannelWriter', 'CallbackChannelWriter',
+ 'Reader', 'ChannelReader', 'CallbackChannelReader', 'mkchannel', 'ReadOnly',
+ 'IteratorReader')
#{ Classes
class Channel(object):
@@ -43,15 +45,50 @@ class SerialChannel(Channel):
class Writer(object):
+ """A writer is an object providing write access to a possibly blocking reading device"""
+ __slots__ = tuple()
+
+ #{ Interface
+
+ def __init__(self, device):
+ """Initialize the instance with the device to write to"""
+
+ def write(self, item, block=True, timeout=None):
+ """Write the given item into the device
+ :param block: True if the device may block until space for the item is available
+ :param timeout: The time in seconds to wait for the device to become ready
+ in blocking mode"""
+ raise NotImplementedError()
+
+ def size(self):
+ """:return: number of items already in the device, they could be read with a reader"""
+ raise NotImplementedError()
+
+ def close(self):
+ """Close the channel. Multiple close calls on a closed channel are no
+ an error"""
+ raise NotImplementedError()
+
+ def closed(self):
+ """:return: True if the channel was closed"""
+ raise NotImplementedError()
+
+ #} END interface
+
+
+class ChannelWriter(Writer):
"""The write end of a channel, a file-like interface for a channel"""
- __slots__ = ('write', 'channel')
+ __slots__ = ('channel', '_put')
def __init__(self, channel):
"""Initialize the writer to use the given channel"""
self.channel = channel
- self.write = channel.queue.put
+ self._put = self.channel.queue.put
#{ Interface
+ def write(self, item, block=False, timeout=None):
+ return self._put(item, block, timeout)
+
def size(self):
return self.channel.queue.qsize()
@@ -66,15 +103,14 @@ class Writer(object):
#} END interface
-class CallbackWriter(Writer):
+class CallbackChannelWriter(ChannelWriter):
"""The write end of a channel which allows you to setup a callback to be
called after an item was written to the channel"""
__slots__ = ('_pre_cb')
def __init__(self, channel):
- Writer.__init__(self, channel)
+ super(CallbackChannelWriter, self).__init__(channel)
self._pre_cb = None
- self.write = self._write
def set_pre_cb(self, fun = lambda item: item):
"""Install a callback to be called before the given item is written.
@@ -87,25 +123,22 @@ class CallbackWriter(Writer):
self._pre_cb = fun
return prev
- def _write(self, item, block=True, timeout=None):
+ def write(self, item, block=True, timeout=None):
if self._pre_cb:
item = self._pre_cb(item)
- self.channel.queue.put(item, block, timeout)
+ super(CallbackChannelWriter, self).write(item, block, timeout)
class Reader(object):
- """Allows reading from a channel"""
- __slots__ = 'channel'
+ """Allows reading from a device"""
+ __slots__ = tuple()
- def __init__(self, channel):
- """Initialize this instance from its parent write channel"""
- self.channel = channel
-
-
#{ Interface
-
+ def __init__(self, device):
+ """Initialize the instance with the device to read from"""
+
def read(self, count=0, block=True, timeout=None):
- """read a list of items read from the channel. The list, as a sequence
+ """read a list of items read from the device. The list, as a sequence
of items, is similar to the string of characters returned when reading from
file like objects.
:param count: given amount of items to read. If < 1, all items will be read
@@ -114,11 +147,25 @@ class Reader(object):
given amount of seconds, returning the items it received so far.
The timeout is applied to each read item, not for the whole operation.
:return: single item in a list if count is 1, or a list of count items.
- If the channel was empty and count was 1, an empty list will be returned.
+ If the device was empty and count was 1, an empty list will be returned.
If count was greater 1, a list with less than count items will be
returned.
If count was < 1, a list with all items that could be read will be
returned."""
+ raise NotImplementedError()
+
+
+class ChannelReader(Reader):
+ """Allows reading from a channel. The reader is thread-safe if the channel is as well"""
+ __slots__ = 'channel'
+
+ def __init__(self, channel):
+ """Initialize this instance from its parent write channel"""
+ self.channel = channel
+
+ #{ Interface
+
+ def read(self, count=0, block=True, timeout=None):
# if the channel is closed for writing, we never block
# NOTE: is handled by the queue
# We don't check for a closed state here has it costs time - most of
@@ -191,12 +238,12 @@ class Reader(object):
#} END interface
-class CallbackReader(Reader):
+class CallbackChannelReader(ChannelReader):
"""A channel which sends a callback before items are read from the channel"""
__slots__ = "_pre_cb"
def __init__(self, channel):
- Reader.__init__(self, channel)
+ super(CallbackChannelReader, self).__init__(channel)
self._pre_cb = None
def set_pre_cb(self, fun = lambda count: None):
@@ -213,13 +260,59 @@ class CallbackReader(Reader):
def read(self, count=0, block=True, timeout=None):
if self._pre_cb:
self._pre_cb(count)
- return Reader.read(self, count, block, timeout)
+ return super(CallbackChannelReader, self).read(count, block, timeout)
+
+class IteratorReader(Reader):
+ """A Reader allowing to read items from an iterator, instead of a channel.
+ Reads will never block. Its thread-safe"""
+ __slots__ = ("_empty", '_iter', '_lock')
+
+ # the type of the lock to use when reading from the iterator
+ lock_type = threading.Lock
+
+ def __init__(self, iterator):
+ self._empty = False
+ if not hasattr(iterator, 'next'):
+ raise ValueError("Iterator %r needs a next() function" % iterator)
+ self._iter = iterator
+ self._lock = self.lock_type()
+
+ def read(self, count=0, block=True, timeout=None):
+ """Non-Blocking implementation of read"""
+ # not threadsafe, but worst thing that could happen is that
+ # we try to get items one more time
+ if self._empty:
+ return list()
+ # END early abort
+
+ self._lock.acquire()
+ try:
+ if count == 0:
+ self._empty = True
+ return list(self._iter)
+ else:
+ out = list()
+ it = self._iter
+ for i in xrange(count):
+ try:
+ out.append(it.next())
+ except StopIteration:
+ self._empty = True
+ break
+ # END handle empty iterator
+ # END for each item to take
+ return out
+ # END handle count
+ finally:
+ self._lock.release()
+ # END handle locking
+
#} END classes
#{ Constructors
-def mkchannel(ctype = Channel, wtype = Writer, rtype = Reader):
+def mkchannel(ctype = Channel, wtype = ChannelWriter, rtype = ChannelReader):
"""Create a channel, with a reader and a writer
:return: tuple(reader, writer)
:param ctype: Channel to instantiate