summaryrefslogtreecommitdiff
path: root/lib/git/async/channel.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-10 14:39:57 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-10 14:39:57 +0200
commit55e757928e493ce93056822d510482e4ffcaac2d (patch)
tree292bc4fac0e4daef4c39109c752b241684d8c48e /lib/git/async/channel.py
parent01eac1a959c1fa5894a86bf11e6b92f96762bdd8 (diff)
downloadgitpython-55e757928e493ce93056822d510482e4ffcaac2d.tar.gz
channel: Changed design to be more logical - a channel now has any amount of readers and writers, a ready is not connected to its writer anymore. This changes the refcounting of course, which is why the auto-cleanup for the pool is currently broken.
The benefit of this are faster writes to the channel, reading didn't improve, refcounts should be clearer now
Diffstat (limited to 'lib/git/async/channel.py')
-rw-r--r--lib/git/async/channel.py102
1 files changed, 47 insertions, 55 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
index abb31035..9b019707 100644
--- a/lib/git/async/channel.py
+++ b/lib/git/async/channel.py
@@ -21,61 +21,57 @@ class Channel(object):
If the channel is closed, any read operation will result in an exception
This base class is not instantiated directly, but instead serves as constructor
- for RWChannel pairs.
+ for Rwriter pairs.
Create a new channel """
- __slots__ = tuple()
-
-
-class WChannel(Channel):
- """The write end of a channel - it is thread-safe"""
- __slots__ = ('_queue')
+ __slots__ = 'queue'
# The queue to use to store the actual data
QueueCls = AsyncQueue
def __init__(self):
- """initialize this instance, able to hold max_items at once
- Write calls will block if the channel is full, until someone reads from it"""
- self._queue = self.QueueCls()
-
- #{ Interface
- def write(self, item, block=True, timeout=None):
- """Send an item into the channel, it can be read from the read end of the
- channel accordingly
- :param item: Item to send
- :param block: If True, the call will block until there is free space in the
- channel
- :param timeout: timeout in seconds for blocking calls.
- :raise ReadOnly: when writing into closed channel"""
- # let the queue handle the 'closed' attribute, we write much more often
- # to an open channel than to a closed one, saving a few cycles
- self._queue.put(item, block, timeout)
-
+ """initialize this instance with a queue holding the channel contents"""
+ self.queue = self.QueueCls()
+
+
+class SerialChannel(Channel):
+ """A slightly faster version of a Channel, which sacrificed thead-safety for performance"""
+ QueueCls = SyncQueue
+
+
+class Writer(object):
+ """The write end of a channel, a file-like interface for a channel"""
+ __slots__ = ('write', 'channel')
+
+ def __init__(self, channel):
+ """Initialize the writer to use the given channel"""
+ self.channel = channel
+ self.write = channel.queue.put
+
+ #{ Interface
def size(self):
- """:return: approximate number of items that could be read from the read-ends
- of this channel"""
- return self._queue.qsize()
+ return self.channel.queue.qsize()
def close(self):
"""Close the channel. Multiple close calls on a closed channel are no
an error"""
- self._queue.set_writable(False)
+ self.channel.queue.set_writable(False)
def closed(self):
""":return: True if the channel was closed"""
- return not self._queue.writable()
+ return not self.channel.queue.writable()
#} END interface
-class CallbackWChannel(WChannel):
+class CallbackWriter(Writer):
"""The write end of a channel which allows you to setup a callback to be
called after an item was written to the channel"""
__slots__ = ('_pre_cb')
- def __init__(self):
- WChannel.__init__(self)
+ def __init__(self, channel):
+ Writer.__init__(self, channel)
self._pre_cb = None
+ self.write = self._write
def set_pre_cb(self, fun = lambda item: item):
"""Install a callback to be called before the given item is written.
@@ -88,25 +84,19 @@ class CallbackWChannel(WChannel):
self._pre_cb = fun
return prev
- def write(self, item, block=True, timeout=None):
+ def _write(self, item, block=True, timeout=None):
if self._pre_cb:
item = self._pre_cb(item)
- WChannel.write(self, item, block, timeout)
+ self.channel.queue.put(item, block, timeout)
-
-class SerialWChannel(WChannel):
- """A slightly faster version of a WChannel, which sacrificed thead-safety for
- performance"""
- QueueCls = SyncQueue
-
-class RChannel(Channel):
- """The read-end of a corresponding write channel"""
- __slots__ = '_wc'
+class Reader(object):
+ """Allows reading from a channel"""
+ __slots__ = 'channel'
- def __init__(self, wchannel):
+ def __init__(self, channel):
"""Initialize this instance from its parent write channel"""
- self._wc = wchannel
+ self.channel = channel
#{ Interface
@@ -135,7 +125,7 @@ class RChannel(Channel):
# in non-blocking mode, its all not a problem
out = list()
- queue = self._wc._queue
+ queue = self.channel.queue
if not block:
# be as fast as possible in non-blocking mode, hence
# its a bit 'unrolled'
@@ -198,12 +188,12 @@ class RChannel(Channel):
#} END interface
-class CallbackRChannel(RChannel):
+class CallbackReader(Reader):
"""A channel which sends a callback before items are read from the channel"""
__slots__ = "_pre_cb"
- def __init__(self, wc):
- RChannel.__init__(self, wc)
+ def __init__(self, channel):
+ Reader.__init__(self, channel)
self._pre_cb = None
def set_pre_cb(self, fun = lambda count: None):
@@ -220,18 +210,20 @@ class CallbackRChannel(RChannel):
def read(self, count=0, block=True, timeout=None):
if self._pre_cb:
self._pre_cb(count)
- return RChannel.read(self, count, block, timeout)
+ return Reader.read(self, count, block, timeout)
#} END classes
#{ Constructors
-def mkchannel(wctype = WChannel, rctype = RChannel):
- """Create a channel, which consists of one write end and one read end
- :return: tuple(write_channel, read_channel)
+def mkchannel(ctype = Channel, wtype = Writer, rtype = Reader):
+ """Create a channel, with a reader and a writer
+ :return: tuple(reader, writer)
+ :param ctype: Channel to instantiate
:param wctype: The type of the write channel to instantiate
:param rctype: The type of the read channel to instantiate"""
- wc = wctype()
- rc = rctype(wc)
+ c = ctype()
+ wc = wtype(c)
+ rc = rtype(c)
return wc, rc
#} END constructors