summaryrefslogtreecommitdiff
path: root/lib/git
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-12 11:19:18 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-12 11:19:29 +0200
commitbe8955a0fbb77d673587974b763f17c214904b57 (patch)
tree3e4cb96ca162c676692f72fc53dc6b1d8dc29e20 /lib/git
parenta28942bdf01f4ddb9d0b5a0489bd6f4e101dd775 (diff)
downloadgitpython-be8955a0fbb77d673587974b763f17c214904b57.tar.gz
Cleaned up channel design, Reader and Writer bases don't require a channel anymore, but are abstract.
Added IteratorReader, implementing the reader interface from an iterator. The implementation moved from the TaskIterator to the channel
Diffstat (limited to 'lib/git')
-rw-r--r--lib/git/async/channel.py139
-rw-r--r--lib/git/async/pool.py15
-rw-r--r--lib/git/async/task.py51
3 files changed, 130 insertions, 75 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
index ae476cda..79cb5294 100644
--- a/lib/git/async/channel.py
+++ b/lib/git/async/channel.py
@@ -11,10 +11,12 @@ from util import (
)
from time import time
+import threading
import sys
-__all__ = ('Channel', 'SerialChannel', 'Writer', 'CallbackWriter', 'Reader',
- 'CallbackReader', 'mkchannel', 'ReadOnly')
+__all__ = ('Channel', 'SerialChannel', 'Writer', 'ChannelWriter', 'CallbackChannelWriter',
+ 'Reader', 'ChannelReader', 'CallbackChannelReader', 'mkchannel', 'ReadOnly',
+ 'IteratorReader')
#{ Classes
class Channel(object):
@@ -43,15 +45,50 @@ class SerialChannel(Channel):
class Writer(object):
+ """A writer is an object providing write access to a possibly blocking reading device"""
+ __slots__ = tuple()
+
+ #{ Interface
+
+ def __init__(self, device):
+ """Initialize the instance with the device to write to"""
+
+ def write(self, item, block=True, timeout=None):
+ """Write the given item into the device
+ :param block: True if the device may block until space for the item is available
+ :param timeout: The time in seconds to wait for the device to become ready
+ in blocking mode"""
+ raise NotImplementedError()
+
+ def size(self):
+ """:return: number of items already in the device, they could be read with a reader"""
+ raise NotImplementedError()
+
+ def close(self):
+ """Close the channel. Multiple close calls on a closed channel are no
+ an error"""
+ raise NotImplementedError()
+
+ def closed(self):
+ """:return: True if the channel was closed"""
+ raise NotImplementedError()
+
+ #} END interface
+
+
+class ChannelWriter(Writer):
"""The write end of a channel, a file-like interface for a channel"""
- __slots__ = ('write', 'channel')
+ __slots__ = ('channel', '_put')
def __init__(self, channel):
"""Initialize the writer to use the given channel"""
self.channel = channel
- self.write = channel.queue.put
+ self._put = self.channel.queue.put
#{ Interface
+ def write(self, item, block=False, timeout=None):
+ return self._put(item, block, timeout)
+
def size(self):
return self.channel.queue.qsize()
@@ -66,15 +103,14 @@ class Writer(object):
#} END interface
-class CallbackWriter(Writer):
+class CallbackChannelWriter(ChannelWriter):
"""The write end of a channel which allows you to setup a callback to be
called after an item was written to the channel"""
__slots__ = ('_pre_cb')
def __init__(self, channel):
- Writer.__init__(self, channel)
+ super(CallbackChannelWriter, self).__init__(channel)
self._pre_cb = None
- self.write = self._write
def set_pre_cb(self, fun = lambda item: item):
"""Install a callback to be called before the given item is written.
@@ -87,25 +123,22 @@ class CallbackWriter(Writer):
self._pre_cb = fun
return prev
- def _write(self, item, block=True, timeout=None):
+ def write(self, item, block=True, timeout=None):
if self._pre_cb:
item = self._pre_cb(item)
- self.channel.queue.put(item, block, timeout)
+ super(CallbackChannelWriter, self).write(item, block, timeout)
class Reader(object):
- """Allows reading from a channel"""
- __slots__ = 'channel'
+ """Allows reading from a device"""
+ __slots__ = tuple()
- def __init__(self, channel):
- """Initialize this instance from its parent write channel"""
- self.channel = channel
-
-
#{ Interface
-
+ def __init__(self, device):
+ """Initialize the instance with the device to read from"""
+
def read(self, count=0, block=True, timeout=None):
- """read a list of items read from the channel. The list, as a sequence
+ """read a list of items read from the device. The list, as a sequence
of items, is similar to the string of characters returned when reading from
file like objects.
:param count: given amount of items to read. If < 1, all items will be read
@@ -114,11 +147,25 @@ class Reader(object):
given amount of seconds, returning the items it received so far.
The timeout is applied to each read item, not for the whole operation.
:return: single item in a list if count is 1, or a list of count items.
- If the channel was empty and count was 1, an empty list will be returned.
+ If the device was empty and count was 1, an empty list will be returned.
If count was greater 1, a list with less than count items will be
returned.
If count was < 1, a list with all items that could be read will be
returned."""
+ raise NotImplementedError()
+
+
+class ChannelReader(Reader):
+ """Allows reading from a channel. The reader is thread-safe if the channel is as well"""
+ __slots__ = 'channel'
+
+ def __init__(self, channel):
+ """Initialize this instance from its parent write channel"""
+ self.channel = channel
+
+ #{ Interface
+
+ def read(self, count=0, block=True, timeout=None):
# if the channel is closed for writing, we never block
# NOTE: is handled by the queue
# We don't check for a closed state here has it costs time - most of
@@ -191,12 +238,12 @@ class Reader(object):
#} END interface
-class CallbackReader(Reader):
+class CallbackChannelReader(ChannelReader):
"""A channel which sends a callback before items are read from the channel"""
__slots__ = "_pre_cb"
def __init__(self, channel):
- Reader.__init__(self, channel)
+ super(CallbackChannelReader, self).__init__(channel)
self._pre_cb = None
def set_pre_cb(self, fun = lambda count: None):
@@ -213,13 +260,59 @@ class CallbackReader(Reader):
def read(self, count=0, block=True, timeout=None):
if self._pre_cb:
self._pre_cb(count)
- return Reader.read(self, count, block, timeout)
+ return super(CallbackChannelReader, self).read(count, block, timeout)
+
+class IteratorReader(Reader):
+ """A Reader allowing to read items from an iterator, instead of a channel.
+ Reads will never block. Its thread-safe"""
+ __slots__ = ("_empty", '_iter', '_lock')
+
+ # the type of the lock to use when reading from the iterator
+ lock_type = threading.Lock
+
+ def __init__(self, iterator):
+ self._empty = False
+ if not hasattr(iterator, 'next'):
+ raise ValueError("Iterator %r needs a next() function" % iterator)
+ self._iter = iterator
+ self._lock = self.lock_type()
+
+ def read(self, count=0, block=True, timeout=None):
+ """Non-Blocking implementation of read"""
+ # not threadsafe, but worst thing that could happen is that
+ # we try to get items one more time
+ if self._empty:
+ return list()
+ # END early abort
+
+ self._lock.acquire()
+ try:
+ if count == 0:
+ self._empty = True
+ return list(self._iter)
+ else:
+ out = list()
+ it = self._iter
+ for i in xrange(count):
+ try:
+ out.append(it.next())
+ except StopIteration:
+ self._empty = True
+ break
+ # END handle empty iterator
+ # END for each item to take
+ return out
+ # END handle count
+ finally:
+ self._lock.release()
+ # END handle locking
+
#} END classes
#{ Constructors
-def mkchannel(ctype = Channel, wtype = Writer, rtype = Reader):
+def mkchannel(ctype = Channel, wtype = ChannelWriter, rtype = ChannelReader):
"""Create a channel, with a reader and a writer
:return: tuple(reader, writer)
:param ctype: Channel to instantiate
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index cf14e47b..8f33a029 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -18,10 +18,10 @@ from Queue import (
from graph import Graph
from channel import (
mkchannel,
- Writer,
+ ChannelWriter,
Channel,
SerialChannel,
- CallbackReader
+ CallbackChannelReader
)
import sys
@@ -32,13 +32,14 @@ import new
__all__ = ('PoolReader', 'Pool', 'ThreadPool')
-class PoolReader(CallbackReader):
+
+class PoolReader(CallbackChannelReader):
"""A reader designed to read from channels which take part in pools
It acts like a handle to the underlying task in the pool."""
__slots__ = ('_task_ref', '_pool_ref')
def __init__(self, channel, task, pool):
- CallbackReader.__init__(self, channel)
+ CallbackChannelReader.__init__(self, channel)
self._task_ref = weakref.ref(task)
self._pool_ref = weakref.ref(pool)
@@ -69,7 +70,7 @@ class PoolReader(CallbackReader):
#{ Internal
def _read(self, count=0, block=True, timeout=None):
- return CallbackReader.read(self, count, block, timeout)
+ return CallbackChannelReader.read(self, count, block, timeout)
#} END internal
@@ -115,7 +116,7 @@ class PoolReader(CallbackReader):
####### read data ########
##########################
# read actual items, tasks were setup to put their output into our channel ( as well )
- items = CallbackReader.read(self, count, block, timeout)
+ items = CallbackChannelReader.read(self, count, block, timeout)
##########################
@@ -446,7 +447,7 @@ class Pool(object):
ch = None
if wc is None:
ch = ctype()
- wc = Writer(ch)
+ wc = ChannelWriter(ch)
task.set_writer(wc)
else:
ch = wc.channel
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index 0eb4527c..b7b5e699 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -1,5 +1,7 @@
from graph import Node
from util import ReadOnly
+from channel import IteratorReader
+
import threading
import weakref
@@ -179,56 +181,15 @@ class ThreadTaskBase(object):
class InputIteratorTaskBase(OutputChannelTask):
"""Implements a task which processes items from an iterable in a multi-processing
safe manner"""
- __slots__ = ('_iterator', '_lock', '_empty')
- # the type of the lock to use when reading from the iterator
- lock_type = None
+ __slots__ = tuple()
+
def __init__(self, iterator, *args, **kwargs):
OutputChannelTask.__init__(self, *args, **kwargs)
- if not hasattr(iterator, 'next'):
- raise ValueError("Iterator %r needs a next() function" % iterator)
- self._iterator = iterator
- self._lock = self.lock_type()
-
- # this is necessary to prevent a cyclic ref, preventing us from
- # getting deleted ( and collected )
- weakself = weakref.ref(self)
- self._read = lambda count: weakself().__read(count)
- self._empty = False
-
+ self._read = IteratorReader(iterator).read
# defaults to returning our items unchanged
self.fun = lambda item: item
-
- def __read(self, count=0):
- """Read count items from the iterator, and return them"""
- # not threadsafe, but worst thing that could happen is that
- # we try to get items one more time
- if self._empty:
- return list()
- # END early abort
-
- self._lock.acquire()
- try:
- if count == 0:
- self._empty = True
- return list(self._iterator)
- else:
- out = list()
- it = self._iterator
- for i in xrange(count):
- try:
- out.append(it.next())
- except StopIteration:
- self._empty = True
- break
- # END handle empty iterator
- # END for each item to take
- return out
- # END handle count
- finally:
- self._lock.release()
- # END handle locking
-
+
class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase):
"""An input iterator for threaded pools"""