summaryrefslogtreecommitdiff
path: root/lib/git
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-06 23:08:06 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-06 23:08:06 +0200
commit867129e2950458ab75523b920a5e227e3efa8bbc (patch)
tree714a0ec16d915d04c69e91a3e222a79cdc9532be /lib/git
parent1b27292936c81637f6b9a7141dafaad1126f268e (diff)
downloadgitpython-867129e2950458ab75523b920a5e227e3efa8bbc.tar.gz
channel.read: enhanced to be sure we don't run into non-atomicity issues related to our channel closed flag, which is the only way not to block forever on read(0) channels which were closed by a thread 'in the meanwhile'
Diffstat (limited to 'lib/git')
-rw-r--r--lib/git/async/channel.py89
1 files changed, 72 insertions, 17 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
index 70daed24..0a1db26b 100644
--- a/lib/git/async/channel.py
+++ b/lib/git/async/channel.py
@@ -5,6 +5,9 @@ from Queue import (
Full
)
+from time import time
+import sys
+
#{ Classes
class Channel(object):
"""A channel is similar to a file like object. It has a write end as well as one or
@@ -106,26 +109,78 @@ class RChannel(Channel):
If count was < 1, a list with all items that could be read will be
returned."""
# if the channel is closed for writing, we never block
- if self._wc.closed:
+ if self._wc.closed or timeout == 0:
block = False
-
+
+ # in non-blocking mode, its all not a problem
out = list()
- try:
- if count == 1:
- out.append(self._wc._queue.get(block, timeout))
- elif count < 1:
- while True:
- out.append(self._wc._queue.get(block, timeout))
- # END for each item
- return out
- else:
- for i in xrange(count):
- out.append(self._wc._queue.get(block, timeout))
- # END for each item
+ queue = self._wc._queue
+ if not block:
+ # be as fast as possible in non-blocking mode, hence
+ # its a bit 'unrolled'
+ try:
+ if count == 1:
+ out.append(queue.get(False))
+ elif count < 1:
+ while True:
+ out.append(queue.get(False))
+ # END for each item
+ else:
+ for i in xrange(count):
+ out.append(queue.get(False))
+ # END for each item
+ # END handle count
+ except Empty:
+ pass
+ # END handle exceptions
+ else:
+ # if we have really bad timing, the source of the channel
+ # marks itself closed, but before setting it, the thread
+ # switches to us. We read it, read False, and try to fetch
+ # something, and never return. The whole closed channel thing
+ # is not atomic ( of course )
+ # This is why we never block for long, to get a chance to recheck
+ # for closed channels.
+ # We blend this into the timeout of the user
+ ourtimeout = 0.25 # the smaller, the more responsive, but the slower
+ wc = self._wc
+ timeout = (timeout is None and sys.maxint) or timeout # make sure we can compute with it
+ assert timeout != 0.0, "shouldn't block if timeout is 0" # okay safe
+ if timeout and ourtimeout > timeout:
+ ourtimeout = timeout
+ # END setup timeout
+
+ # to get everything into one loop, we set the count accordingly
+ if count == 0:
+ count = sys.maxint
# END handle count
- except Empty:
- pass
- # END handle exceptions
+
+ for i in xrange(count):
+ have_timeout = False
+ st = time()
+ while True:
+ try:
+ if wc.closed:
+ have_timeout = True
+ break
+ # END don't continue on closed channels
+
+ # END abort reading if it was closed ( in the meanwhile )
+ out.append(queue.get(block, ourtimeout))
+ break # breakout right away
+ except Empty:
+ if timeout - (time() - st) <= 0:
+ # hitting timeout
+ have_timeout = True
+ break
+ # END abort if the user wants no more time spent here
+ # END handle timeout
+ # END endless timer loop
+ if have_timeout:
+ break
+ # END stop on timeout
+ # END for each item
+ # END handle blocking
return out
#} END interface