diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-12 11:19:18 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-12 11:19:29 +0200 |
commit | be8955a0fbb77d673587974b763f17c214904b57 (patch) | |
tree | 3e4cb96ca162c676692f72fc53dc6b1d8dc29e20 /lib/git | |
parent | a28942bdf01f4ddb9d0b5a0489bd6f4e101dd775 (diff) | |
download | gitpython-be8955a0fbb77d673587974b763f17c214904b57.tar.gz |
Cleaned up channel design, Reader and Writer bases don't require a channel anymore, but are abstract.
Added IteratorReader, implementing the reader interface from an iterator. The implementation moved from the TaskIterator to the channel
Diffstat (limited to 'lib/git')
-rw-r--r-- | lib/git/async/channel.py | 139 | ||||
-rw-r--r-- | lib/git/async/pool.py | 15 | ||||
-rw-r--r-- | lib/git/async/task.py | 51 |
3 files changed, 130 insertions, 75 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index ae476cda..79cb5294 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -11,10 +11,12 @@ from util import ( ) from time import time +import threading import sys -__all__ = ('Channel', 'SerialChannel', 'Writer', 'CallbackWriter', 'Reader', - 'CallbackReader', 'mkchannel', 'ReadOnly') +__all__ = ('Channel', 'SerialChannel', 'Writer', 'ChannelWriter', 'CallbackChannelWriter', + 'Reader', 'ChannelReader', 'CallbackChannelReader', 'mkchannel', 'ReadOnly', + 'IteratorReader') #{ Classes class Channel(object): @@ -43,15 +45,50 @@ class SerialChannel(Channel): class Writer(object): + """A writer is an object providing write access to a possibly blocking reading device""" + __slots__ = tuple() + + #{ Interface + + def __init__(self, device): + """Initialize the instance with the device to write to""" + + def write(self, item, block=True, timeout=None): + """Write the given item into the device + :param block: True if the device may block until space for the item is available + :param timeout: The time in seconds to wait for the device to become ready + in blocking mode""" + raise NotImplementedError() + + def size(self): + """:return: number of items already in the device, they could be read with a reader""" + raise NotImplementedError() + + def close(self): + """Close the channel. Multiple close calls on a closed channel are no + an error""" + raise NotImplementedError() + + def closed(self): + """:return: True if the channel was closed""" + raise NotImplementedError() + + #} END interface + + +class ChannelWriter(Writer): """The write end of a channel, a file-like interface for a channel""" - __slots__ = ('write', 'channel') + __slots__ = ('channel', '_put') def __init__(self, channel): """Initialize the writer to use the given channel""" self.channel = channel - self.write = channel.queue.put + self._put = self.channel.queue.put #{ Interface + def write(self, item, block=False, timeout=None): + return self._put(item, block, timeout) + def size(self): return self.channel.queue.qsize() @@ -66,15 +103,14 @@ class Writer(object): #} END interface -class CallbackWriter(Writer): +class CallbackChannelWriter(ChannelWriter): """The write end of a channel which allows you to setup a callback to be called after an item was written to the channel""" __slots__ = ('_pre_cb') def __init__(self, channel): - Writer.__init__(self, channel) + super(CallbackChannelWriter, self).__init__(channel) self._pre_cb = None - self.write = self._write def set_pre_cb(self, fun = lambda item: item): """Install a callback to be called before the given item is written. @@ -87,25 +123,22 @@ class CallbackWriter(Writer): self._pre_cb = fun return prev - def _write(self, item, block=True, timeout=None): + def write(self, item, block=True, timeout=None): if self._pre_cb: item = self._pre_cb(item) - self.channel.queue.put(item, block, timeout) + super(CallbackChannelWriter, self).write(item, block, timeout) class Reader(object): - """Allows reading from a channel""" - __slots__ = 'channel' + """Allows reading from a device""" + __slots__ = tuple() - def __init__(self, channel): - """Initialize this instance from its parent write channel""" - self.channel = channel - - #{ Interface - + def __init__(self, device): + """Initialize the instance with the device to read from""" + def read(self, count=0, block=True, timeout=None): - """read a list of items read from the channel. The list, as a sequence + """read a list of items read from the device. The list, as a sequence of items, is similar to the string of characters returned when reading from file like objects. :param count: given amount of items to read. If < 1, all items will be read @@ -114,11 +147,25 @@ class Reader(object): given amount of seconds, returning the items it received so far. The timeout is applied to each read item, not for the whole operation. :return: single item in a list if count is 1, or a list of count items. - If the channel was empty and count was 1, an empty list will be returned. + If the device was empty and count was 1, an empty list will be returned. If count was greater 1, a list with less than count items will be returned. If count was < 1, a list with all items that could be read will be returned.""" + raise NotImplementedError() + + +class ChannelReader(Reader): + """Allows reading from a channel. The reader is thread-safe if the channel is as well""" + __slots__ = 'channel' + + def __init__(self, channel): + """Initialize this instance from its parent write channel""" + self.channel = channel + + #{ Interface + + def read(self, count=0, block=True, timeout=None): # if the channel is closed for writing, we never block # NOTE: is handled by the queue # We don't check for a closed state here has it costs time - most of @@ -191,12 +238,12 @@ class Reader(object): #} END interface -class CallbackReader(Reader): +class CallbackChannelReader(ChannelReader): """A channel which sends a callback before items are read from the channel""" __slots__ = "_pre_cb" def __init__(self, channel): - Reader.__init__(self, channel) + super(CallbackChannelReader, self).__init__(channel) self._pre_cb = None def set_pre_cb(self, fun = lambda count: None): @@ -213,13 +260,59 @@ class CallbackReader(Reader): def read(self, count=0, block=True, timeout=None): if self._pre_cb: self._pre_cb(count) - return Reader.read(self, count, block, timeout) + return super(CallbackChannelReader, self).read(count, block, timeout) + +class IteratorReader(Reader): + """A Reader allowing to read items from an iterator, instead of a channel. + Reads will never block. Its thread-safe""" + __slots__ = ("_empty", '_iter', '_lock') + + # the type of the lock to use when reading from the iterator + lock_type = threading.Lock + + def __init__(self, iterator): + self._empty = False + if not hasattr(iterator, 'next'): + raise ValueError("Iterator %r needs a next() function" % iterator) + self._iter = iterator + self._lock = self.lock_type() + + def read(self, count=0, block=True, timeout=None): + """Non-Blocking implementation of read""" + # not threadsafe, but worst thing that could happen is that + # we try to get items one more time + if self._empty: + return list() + # END early abort + + self._lock.acquire() + try: + if count == 0: + self._empty = True + return list(self._iter) + else: + out = list() + it = self._iter + for i in xrange(count): + try: + out.append(it.next()) + except StopIteration: + self._empty = True + break + # END handle empty iterator + # END for each item to take + return out + # END handle count + finally: + self._lock.release() + # END handle locking + #} END classes #{ Constructors -def mkchannel(ctype = Channel, wtype = Writer, rtype = Reader): +def mkchannel(ctype = Channel, wtype = ChannelWriter, rtype = ChannelReader): """Create a channel, with a reader and a writer :return: tuple(reader, writer) :param ctype: Channel to instantiate diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index cf14e47b..8f33a029 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -18,10 +18,10 @@ from Queue import ( from graph import Graph from channel import ( mkchannel, - Writer, + ChannelWriter, Channel, SerialChannel, - CallbackReader + CallbackChannelReader ) import sys @@ -32,13 +32,14 @@ import new __all__ = ('PoolReader', 'Pool', 'ThreadPool') -class PoolReader(CallbackReader): + +class PoolReader(CallbackChannelReader): """A reader designed to read from channels which take part in pools It acts like a handle to the underlying task in the pool.""" __slots__ = ('_task_ref', '_pool_ref') def __init__(self, channel, task, pool): - CallbackReader.__init__(self, channel) + CallbackChannelReader.__init__(self, channel) self._task_ref = weakref.ref(task) self._pool_ref = weakref.ref(pool) @@ -69,7 +70,7 @@ class PoolReader(CallbackReader): #{ Internal def _read(self, count=0, block=True, timeout=None): - return CallbackReader.read(self, count, block, timeout) + return CallbackChannelReader.read(self, count, block, timeout) #} END internal @@ -115,7 +116,7 @@ class PoolReader(CallbackReader): ####### read data ######## ########################## # read actual items, tasks were setup to put their output into our channel ( as well ) - items = CallbackReader.read(self, count, block, timeout) + items = CallbackChannelReader.read(self, count, block, timeout) ########################## @@ -446,7 +447,7 @@ class Pool(object): ch = None if wc is None: ch = ctype() - wc = Writer(ch) + wc = ChannelWriter(ch) task.set_writer(wc) else: ch = wc.channel diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 0eb4527c..b7b5e699 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,5 +1,7 @@ from graph import Node from util import ReadOnly +from channel import IteratorReader + import threading import weakref @@ -179,56 +181,15 @@ class ThreadTaskBase(object): class InputIteratorTaskBase(OutputChannelTask): """Implements a task which processes items from an iterable in a multi-processing safe manner""" - __slots__ = ('_iterator', '_lock', '_empty') - # the type of the lock to use when reading from the iterator - lock_type = None + __slots__ = tuple() + def __init__(self, iterator, *args, **kwargs): OutputChannelTask.__init__(self, *args, **kwargs) - if not hasattr(iterator, 'next'): - raise ValueError("Iterator %r needs a next() function" % iterator) - self._iterator = iterator - self._lock = self.lock_type() - - # this is necessary to prevent a cyclic ref, preventing us from - # getting deleted ( and collected ) - weakself = weakref.ref(self) - self._read = lambda count: weakself().__read(count) - self._empty = False - + self._read = IteratorReader(iterator).read # defaults to returning our items unchanged self.fun = lambda item: item - - def __read(self, count=0): - """Read count items from the iterator, and return them""" - # not threadsafe, but worst thing that could happen is that - # we try to get items one more time - if self._empty: - return list() - # END early abort - - self._lock.acquire() - try: - if count == 0: - self._empty = True - return list(self._iterator) - else: - out = list() - it = self._iterator - for i in xrange(count): - try: - out.append(it.next()) - except StopIteration: - self._empty = True - break - # END handle empty iterator - # END for each item to take - return out - # END handle count - finally: - self._lock.release() - # END handle locking - + class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase): """An input iterator for threaded pools""" |