summaryrefslogtreecommitdiff
path: root/lib/git/async/channel.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async/channel.py')
-rw-r--r--lib/git/async/channel.py118
1 files changed, 55 insertions, 63 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
index 2d5ab79c..c05f7383 100644
--- a/lib/git/async/channel.py
+++ b/lib/git/async/channel.py
@@ -4,7 +4,11 @@ from Queue import (
Full
)
-from util import AsyncQueue
+from util import (
+ AsyncQueue,
+ ReadOnly
+ )
+
from time import time
import sys
@@ -23,12 +27,9 @@ class Channel(object):
def __new__(cls, *args):
if cls is Channel:
- max_items = 0
- if len(args) == 1:
- max_items = args[0]
- if len(args) > 1:
- raise ValueError("Specify not more than the number of items the channel should take")
- wc = WChannel(max_items)
+ if len(args) > 0:
+ raise ValueError("Cannot take any arguments when creating a new channel")
+ wc = WChannel()
rc = RChannel(wc)
return wc, rc
# END constructor mode
@@ -39,11 +40,11 @@ class WChannel(Channel):
"""The write end of a channel"""
__slots__ = ('_closed', '_queue')
- def __init__(self, max_items=0):
+ def __init__(self):
"""initialize this instance, able to hold max_items at once
Write calls will block if the channel is full, until someone reads from it"""
self._closed = False
- self._queue = AsyncQueue(max_items)
+ self._queue = AsyncQueue()
#{ Interface
@@ -55,15 +56,13 @@ class WChannel(Channel):
channel
:param timeout: timeout in seconds for blocking calls.
:raise IOError: when writing into closed file
- :raise EOFError: when writing into a non-blocking full channel
- :note: may block if the channel has a limited capacity"""
- if self._closed:
- raise IOError("Cannot write to a closed channel")
-
+ :raise EOFError: when writing into a non-blocking full channel"""
+ # let the queue handle the 'closed' attribute, we write much more often
+ # to an open channel than to a closed one, saving a few cycles
try:
self._queue.put(item, block, timeout)
- except Full:
- raise EOFError("Capacity of the channel was exeeded")
+ except ReadOnly:
+ raise IOError("Cannot write to a closed channel")
# END exception handling
def size(self):
@@ -74,7 +73,11 @@ class WChannel(Channel):
def close(self):
"""Close the channel. Multiple close calls on a closed channel are no
an error"""
+ # yes, close it a little too early, better than having anyone put
+ # additional items
+ # print "closing channel", self
self._closed = True
+ self._queue.set_writable(False)
@property
def closed(self):
@@ -101,7 +104,7 @@ class RChannel(Channel):
:param count: given amount of items to read. If < 1, all items will be read
:param block: if True, the call will block until an item is available
:param timeout: if positive and block is True, it will block only for the
- given amount of seconds.
+ given amount of seconds, returning the items it received so far.
:return: single item in a list if count is 1, or a list of count items.
If the channel was empty and count was 1, an empty list will be returned.
If count was greater 1, a list with less than count items will be
@@ -109,6 +112,7 @@ class RChannel(Channel):
If count was < 1, a list with all items that could be read will be
returned."""
# if the channel is closed for writing, we never block
+ # NOTE: is handled by the queue
if self._wc.closed or timeout == 0:
block = False
@@ -134,59 +138,47 @@ class RChannel(Channel):
pass
# END handle exceptions
else:
- # if we have really bad timing, the source of the channel
- # marks itself closed, but before setting it, the thread
- # switches to us. We read it, read False, and try to fetch
- # something, and never return. The whole closed channel thing
- # is not atomic ( of course )
- # This is why we never block for long, to get a chance to recheck
- # for closed channels.
- # We blend this into the timeout of the user
- ourtimeout = 0.25 # the smaller, the more responsive, but the slower
- wc = self._wc
- timeout = (timeout is None and sys.maxint) or timeout # make sure we can compute with it
- assert timeout != 0.0, "shouldn't block if timeout is 0" # okay safe
- if timeout and ourtimeout > timeout:
- ourtimeout = timeout
- # END setup timeout
-
# to get everything into one loop, we set the count accordingly
if count == 0:
count = sys.maxint
# END handle count
+ endtime = sys.maxint # allows timeout for whole operation
+ if timeout is not None:
+ endtime = time() + timeout
+ # could be improved by a separate: no-endtime branch, saving the time calls
for i in xrange(count):
- have_timeout = False
- st = time()
- while True:
- try:
- if wc.closed:
- have_timeout = True
- # its about the 'in the meanwhile' :) - get everything
- # we can in non-blocking mode. This will raise
- try:
- while True:
- out.append(queue.get(False))
- # END until it raises Empty
- except Empty:
- break
- # END finally, out of here
- # END don't continue on closed channels
-
- # END abort reading if it was closed ( in the meanwhile )
- out.append(queue.get(block, ourtimeout))
- break # breakout right away
- except Empty:
- if timeout - (time() - st) <= 0:
- # hitting timeout
- have_timeout = True
- break
- # END abort if the user wants no more time spent here
- # END handle timeout
- # END endless timer loop
- if have_timeout:
+ try:
+ out.append(queue.get(block, timeout))
+ except Empty:
+ # here we are only if there is nothing on the queue,
+ # and if we are blocking. If we are not blocking, this
+ # indiccates that the queue was set unwritable in the meanwhile.
+ # hence we can abort now to prevent reading (possibly) forever
+ # Besides, this is racy as all threads will rip on the channel
+ # without waiting until its empty
+ if not block:
+ break
+ # END ignore empty
+
+ # if we have been unblocked because the closed state changed
+ # in the meanwhile, stop trying
+ # NOTE: must NOT cache _wc
+ if self._wc.closed:
+ # If we were closed, we drop out even if there might still
+ # be items. Now its time to get these items, according to
+ # our count. Just switch to unblocking mode.
+ # If we are to read unlimited items, this would run forever,
+ # but the EmptyException handler takes care of this
+ block = False
+
+ # we don't continue, but let the timer decide whether
+ # it wants to abort
+ # END handle channel cloased
+
+ if time() >= endtime:
break
- # END stop on timeout
+ # END stop operation on timeout
# END for each item
# END handle blocking
return out