diff options
Diffstat (limited to 'lib/git/async/channel.py')
-rw-r--r-- | lib/git/async/channel.py | 35 |
1 files changed, 20 insertions, 15 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 08323582..5c52b1dc 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -6,6 +6,7 @@ from Queue import ( from util import ( AsyncQueue, + ReadOnly ) from time import time @@ -59,6 +60,7 @@ class WChannel(Channel): # let the queue handle the 'closed' attribute, we write much more often # to an open channel than to a closed one, saving a few cycles try: + print "putting item", item, id(self._queue.queue) self._queue.put(item, block, timeout) except ReadOnly: raise IOError("Cannot write to a closed channel") @@ -74,6 +76,7 @@ class WChannel(Channel): an error""" # yes, close it a little too early, better than having anyone put # additional items + print "closing channel", self self._closed = True self._queue.set_writable(False) @@ -102,7 +105,7 @@ class RChannel(Channel): :param count: given amount of items to read. If < 1, all items will be read :param block: if True, the call will block until an item is available :param timeout: if positive and block is True, it will block only for the - given amount of seconds. + given amount of seconds, returning the items it received so far. :return: single item in a list if count is 1, or a list of count items. If the channel was empty and count was 1, an empty list will be returned. If count was greater 1, a list with less than count items will be @@ -149,27 +152,29 @@ class RChannel(Channel): try: out.append(queue.get(block, timeout)) except Empty: - pass + # here we are only if there is nothing on the queue, + # and if we are blocking. If we are not blocking, this + # indiccates that the queue was set unwritable in the meanwhile. + # hence we can abort now to prevent reading (possibly) forever + # Besides, this is racy as all threads will rip on the channel + # without waiting until its empty + if not block: + break # END ignore empty # if we have been unblocked because the closed state changed # in the meanwhile, stop trying # NOTE: must NOT cache _wc if self._wc.closed: - # its racing time - all threads waiting for the queue - # are awake now, and we actually can't be sure its empty - # Hence we pop it empty without blocking, getting as much - # as we can. This effectively lets us race ( with mutexes ) - # of the other threads. - try: - while True: - out.append(queue.get(False)) - # END pop it empty - except Empty: - pass - # END ignore emptyness, we have all + # If we were closed, we drop out even if there might still + # be items. Now its time to get these items, according to + # our count. Just switch to unblocking mode. + # If we are to read unlimited items, this would run forever, + # but the EmptyException handler takes care of this + block = False - break + # we don't continue, but let the timer decide whether + # it wants to abort # END handle channel cloased if time() >= endtime: |