diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-06 23:08:06 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-06 23:08:06 +0200 |
commit | 867129e2950458ab75523b920a5e227e3efa8bbc (patch) | |
tree | 714a0ec16d915d04c69e91a3e222a79cdc9532be /lib/git/async/channel.py | |
parent | 1b27292936c81637f6b9a7141dafaad1126f268e (diff) | |
download | gitpython-867129e2950458ab75523b920a5e227e3efa8bbc.tar.gz |
channel.read: enhanced to be sure we don't run into non-atomicity issues related to our channel closed flag, which is the only way not to block forever on read(0) channels which were closed by a thread 'in the meanwhile'
Diffstat (limited to 'lib/git/async/channel.py')
-rw-r--r-- | lib/git/async/channel.py | 89 |
1 files changed, 72 insertions, 17 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 70daed24..0a1db26b 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -5,6 +5,9 @@ from Queue import ( Full ) +from time import time +import sys + #{ Classes class Channel(object): """A channel is similar to a file like object. It has a write end as well as one or @@ -106,26 +109,78 @@ class RChannel(Channel): If count was < 1, a list with all items that could be read will be returned.""" # if the channel is closed for writing, we never block - if self._wc.closed: + if self._wc.closed or timeout == 0: block = False - + + # in non-blocking mode, its all not a problem out = list() - try: - if count == 1: - out.append(self._wc._queue.get(block, timeout)) - elif count < 1: - while True: - out.append(self._wc._queue.get(block, timeout)) - # END for each item - return out - else: - for i in xrange(count): - out.append(self._wc._queue.get(block, timeout)) - # END for each item + queue = self._wc._queue + if not block: + # be as fast as possible in non-blocking mode, hence + # its a bit 'unrolled' + try: + if count == 1: + out.append(queue.get(False)) + elif count < 1: + while True: + out.append(queue.get(False)) + # END for each item + else: + for i in xrange(count): + out.append(queue.get(False)) + # END for each item + # END handle count + except Empty: + pass + # END handle exceptions + else: + # if we have really bad timing, the source of the channel + # marks itself closed, but before setting it, the thread + # switches to us. We read it, read False, and try to fetch + # something, and never return. The whole closed channel thing + # is not atomic ( of course ) + # This is why we never block for long, to get a chance to recheck + # for closed channels. + # We blend this into the timeout of the user + ourtimeout = 0.25 # the smaller, the more responsive, but the slower + wc = self._wc + timeout = (timeout is None and sys.maxint) or timeout # make sure we can compute with it + assert timeout != 0.0, "shouldn't block if timeout is 0" # okay safe + if timeout and ourtimeout > timeout: + ourtimeout = timeout + # END setup timeout + + # to get everything into one loop, we set the count accordingly + if count == 0: + count = sys.maxint # END handle count - except Empty: - pass - # END handle exceptions + + for i in xrange(count): + have_timeout = False + st = time() + while True: + try: + if wc.closed: + have_timeout = True + break + # END don't continue on closed channels + + # END abort reading if it was closed ( in the meanwhile ) + out.append(queue.get(block, ourtimeout)) + break # breakout right away + except Empty: + if timeout - (time() - st) <= 0: + # hitting timeout + have_timeout = True + break + # END abort if the user wants no more time spent here + # END handle timeout + # END endless timer loop + if have_timeout: + break + # END stop on timeout + # END for each item + # END handle blocking return out #} END interface |