diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-09 14:47:41 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-09 14:50:39 +0200 |
commit | 0974f8737a3c56a7c076f9d0b757c6cb106324fb (patch) | |
tree | d05394903b8dfc63e34806bac7bccb700628a531 /lib/git | |
parent | 4e6bece08aea01859a232e99a1e1ad8cc1eb7d36 (diff) | |
download | gitpython-0974f8737a3c56a7c076f9d0b757c6cb106324fb.tar.gz |
Channel: Read method revised - now it really really doesn't block anymore, and it runs faster as well, about 2/3 of the performance we have when being in serial mode
Diffstat (limited to 'lib/git')
-rw-r--r-- | lib/git/async/channel.py | 85 | ||||
-rw-r--r-- | lib/git/async/task.py | 2 |
2 files changed, 38 insertions, 49 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 58c35f96..3a277e7e 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -38,12 +38,11 @@ class Channel(object): class WChannel(Channel): """The write end of a channel""" - __slots__ = ('_closed', '_queue') + __slots__ = ('_queue') def __init__(self): """initialize this instance, able to hold max_items at once Write calls will block if the channel is full, until someone reads from it""" - self._closed = False self._queue = AsyncQueue() @@ -55,15 +54,10 @@ class WChannel(Channel): :param block: If True, the call will block until there is free space in the channel :param timeout: timeout in seconds for blocking calls. - :raise IOError: when writing into closed file - :raise EOFError: when writing into a non-blocking full channel""" + :raise ReadOnly: when writing into closed channel""" # let the queue handle the 'closed' attribute, we write much more often # to an open channel than to a closed one, saving a few cycles - try: - self._queue.put(item, block, timeout) - except ReadOnly: - raise IOError("Cannot write to a closed channel") - # END exception handling + self._queue.put(item, block, timeout) def size(self): """:return: approximate number of items that could be read from the read-ends @@ -73,15 +67,11 @@ class WChannel(Channel): def close(self): """Close the channel. Multiple close calls on a closed channel are no an error""" - # yes, close it a little too early, better than having anyone put - # additional items - self._closed = True self._queue.set_writable(False) - @property def closed(self): """:return: True if the channel was closed""" - return self._closed + return not self._queue.writable() #} END interface @@ -104,6 +94,7 @@ class RChannel(Channel): :param block: if True, the call will block until an item is available :param timeout: if positive and block is True, it will block only for the given amount of seconds, returning the items it received so far. + The timeout is applied to each read item, not for the whole operation. :return: single item in a list if count is 1, or a list of count items. If the channel was empty and count was 1, an empty list will be returned. If count was greater 1, a list with less than count items will be @@ -112,9 +103,11 @@ class RChannel(Channel): returned.""" # if the channel is closed for writing, we never block # NOTE: is handled by the queue - if self._wc.closed or timeout == 0: - block = False - + # We don't check for a closed state here has it costs time - most of + # the time, it will not be closed, and will bail out automatically once + # it gets closed + + # in non-blocking mode, its all not a problem out = list() queue = self._wc._queue @@ -142,42 +135,38 @@ class RChannel(Channel): count = sys.maxint # END handle count - endtime = sys.maxint # allows timeout for whole operation - if timeout is not None: - endtime = time() + timeout - # could be improved by a separate: no-endtime branch, saving the time calls - for i in xrange(count): + i = 0 + while i < count: try: out.append(queue.get(block, timeout)) + i += 1 except Empty: - # here we are only if there is nothing on the queue, - # and if we are blocking. If we are not blocking, this - # indiccates that the queue was set unwritable in the meanwhile. - # hence we can abort now to prevent reading (possibly) forever - # Besides, this is racy as all threads will rip on the channel - # without waiting until its empty - if not block: - break - # END ignore empty - - # if we have been unblocked because the closed state changed - # in the meanwhile, stop trying - # NOTE: must NOT cache _wc - if self._wc.closed: - # If we were closed, we drop out even if there might still - # be items. Now its time to get these items, according to - # our count. Just switch to unblocking mode. - # If we are to read unlimited items, this would run forever, - # but the EmptyException handler takes care of this - block = False + # here we are only if + # someone woke us up to inform us about the queue that changed + # its writable state + # The following branch checks for closed channels, and pulls + # as many items as we need and as possible, before + # leaving the loop. + if not queue.writable(): + try: + while i < count: + out.append(queue.get(False, None)) + i += 1 + # END count loop + except Empty: + break # out of count loop + # END handle absolutely empty queue + # END handle closed channel - # we don't continue, but let the timer decide whether - # it wants to abort - # END handle channel cloased - - if time() >= endtime: + # if we are here, we woke up and the channel is not closed + # Either the queue became writable again, which currently shouldn't + # be able to happen in the channel, or someone read with a timeout + # that actually timed out. + # As it timed out, which is the only reason we are here, + # we have to abort break - # END stop operation on timeout + # END ignore empty + # END for each item # END handle blocking return out diff --git a/lib/git/async/task.py b/lib/git/async/task.py index d18cedca..539b240f 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -66,7 +66,7 @@ class OutputChannelTask(Node): def is_closed(self): """:return: True if the task's write channel is closed""" - return self._out_wc.closed + return self._out_wc.closed() def error(self): """:return: Exception caught during last processing or None""" |