summaryrefslogtreecommitdiff
path: root/lib/git/async/channel.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-05 20:03:09 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-05 20:03:09 +0200
commitab59f78341f1dd188aaf4c30526f6295c63438b1 (patch)
tree697bc8a11201494d560f1ea65fd77b7ef09b238e /lib/git/async/channel.py
parent61138f2ece0cb864b933698174315c34a78835d1 (diff)
downloadgitpython-ab59f78341f1dd188aaf4c30526f6295c63438b1.tar.gz
Renamed mp to async, as this is a much better name for what is actually going on. The default implementation uses threads, which ends up being nothing more than async, as they are all locked down by internal and the global interpreter lock
Diffstat (limited to 'lib/git/async/channel.py')
-rw-r--r--lib/git/async/channel.py108
1 files changed, 108 insertions, 0 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
new file mode 100644
index 00000000..c9cbfb87
--- /dev/null
+++ b/lib/git/async/channel.py
@@ -0,0 +1,108 @@
+"""Contains a queue based channel implementation"""
+from Queue import (
+ Queue,
+ Empty,
+ Full
+ )
+
+#{ Classes
+class Channel(object):
+ """A channel is similar to a system pipe. It has a write end as well as one or
+ more read ends. If Data is in the channel, it can be read, if not the read operation
+ will block until data becomes available.
+ If the channel is closed, any read operation will result in an exception
+
+ This base class is not instantiated directly, but instead serves as constructor
+ for RWChannel pairs.
+
+ Create a new channel """
+ __slots__ = tuple()
+
+ def __new__(cls, *args):
+ if cls is Channel:
+ max_items = 0
+ if len(args) == 1:
+ max_items = args[0]
+ if len(args) > 1:
+ raise ValueError("Specify not more than the number of items the channel should take")
+ wc = WChannel(max_items)
+ rc = RChannel(wc)
+ return wc, rc
+ # END constructor mode
+ return object.__new__(cls)
+
+
+class WChannel(Channel):
+ """The write end of a channel"""
+ __slots__ = ('_closed', '_queue')
+
+ def __init__(self, max_items=0):
+ """initialize this instance, able to hold max_items at once
+ Write calls will block if the channel is full, until someone reads from it"""
+ self._closed = False
+ self._queue = Queue(max_items)
+
+
+ #{ Interface
+ def write(self, item, block=True, timeout=None):
+ """Send an item into the channel, it can be read from the read end of the
+ channel accordingly
+ :param item: Item to send
+ :param block: If True, the call will block until there is free space in the
+ channel
+ :param timeout: timeout in seconds for blocking calls.
+ :raise IOError: when writing into closed file or when writing into a non-blocking
+ full channel
+ :note: may block if the channel has a limited capacity"""
+ if self._closed:
+ raise IOError("Cannot write to a closed channel")
+
+ try:
+ self._queue.put(item, block, timeout)
+ except Full:
+ raise IOError("Capacity of the channel was exeeded")
+ # END exception handling
+
+ def close(self):
+ """Close the channel. Multiple close calls on a closed channel are no
+ an error"""
+ self._closed = True
+
+ @property
+ def closed(self):
+ """:return: True if the channel was closed"""
+ return self._closed
+ #} END interface
+
+
+class RChannel(Channel):
+ """The read-end of a corresponding write channel"""
+ __slots__ = '_wc'
+
+ def __init__(self, wchannel):
+ """Initialize this instance from its parent write channel"""
+ self._wc = wchannel
+
+
+ #{ Interface
+
+ def read(self, block=True, timeout=None):
+ """:return: an item read from the channel
+ :param block: if True, the call will block until an item is available
+ :param timeout: if positive and block is True, it will block only for the
+ given amount of seconds.
+ :raise IOError: When reading from an empty channel ( if non-blocking, or
+ if the channel is still empty after the timeout"""
+ # if the channel is closed for writing, we never block
+ if self._wc.closed:
+ block = False
+
+ try:
+ return self._wc._queue.get(block, timeout)
+ except Empty:
+ raise IOError("Error reading from an empty channel")
+ # END handle reading
+
+ #} END interface
+
+#} END classes