summaryrefslogtreecommitdiff
path: root/lib/git
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-09 14:47:41 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-09 14:50:39 +0200
commit0974f8737a3c56a7c076f9d0b757c6cb106324fb (patch)
treed05394903b8dfc63e34806bac7bccb700628a531 /lib/git
parent4e6bece08aea01859a232e99a1e1ad8cc1eb7d36 (diff)
downloadgitpython-0974f8737a3c56a7c076f9d0b757c6cb106324fb.tar.gz
Channel: Read method revised - now it really really doesn't block anymore, and it runs faster as well, about 2/3 of the performance we have when being in serial mode
Diffstat (limited to 'lib/git')
-rw-r--r--lib/git/async/channel.py85
-rw-r--r--lib/git/async/task.py2
2 files changed, 38 insertions, 49 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
index 58c35f96..3a277e7e 100644
--- a/lib/git/async/channel.py
+++ b/lib/git/async/channel.py
@@ -38,12 +38,11 @@ class Channel(object):
class WChannel(Channel):
"""The write end of a channel"""
- __slots__ = ('_closed', '_queue')
+ __slots__ = ('_queue')
def __init__(self):
"""initialize this instance, able to hold max_items at once
Write calls will block if the channel is full, until someone reads from it"""
- self._closed = False
self._queue = AsyncQueue()
@@ -55,15 +54,10 @@ class WChannel(Channel):
:param block: If True, the call will block until there is free space in the
channel
:param timeout: timeout in seconds for blocking calls.
- :raise IOError: when writing into closed file
- :raise EOFError: when writing into a non-blocking full channel"""
+ :raise ReadOnly: when writing into closed channel"""
# let the queue handle the 'closed' attribute, we write much more often
# to an open channel than to a closed one, saving a few cycles
- try:
- self._queue.put(item, block, timeout)
- except ReadOnly:
- raise IOError("Cannot write to a closed channel")
- # END exception handling
+ self._queue.put(item, block, timeout)
def size(self):
""":return: approximate number of items that could be read from the read-ends
@@ -73,15 +67,11 @@ class WChannel(Channel):
def close(self):
"""Close the channel. Multiple close calls on a closed channel are no
an error"""
- # yes, close it a little too early, better than having anyone put
- # additional items
- self._closed = True
self._queue.set_writable(False)
- @property
def closed(self):
""":return: True if the channel was closed"""
- return self._closed
+ return not self._queue.writable()
#} END interface
@@ -104,6 +94,7 @@ class RChannel(Channel):
:param block: if True, the call will block until an item is available
:param timeout: if positive and block is True, it will block only for the
given amount of seconds, returning the items it received so far.
+ The timeout is applied to each read item, not for the whole operation.
:return: single item in a list if count is 1, or a list of count items.
If the channel was empty and count was 1, an empty list will be returned.
If count was greater 1, a list with less than count items will be
@@ -112,9 +103,11 @@ class RChannel(Channel):
returned."""
# if the channel is closed for writing, we never block
# NOTE: is handled by the queue
- if self._wc.closed or timeout == 0:
- block = False
-
+ # We don't check for a closed state here has it costs time - most of
+ # the time, it will not be closed, and will bail out automatically once
+ # it gets closed
+
+
# in non-blocking mode, its all not a problem
out = list()
queue = self._wc._queue
@@ -142,42 +135,38 @@ class RChannel(Channel):
count = sys.maxint
# END handle count
- endtime = sys.maxint # allows timeout for whole operation
- if timeout is not None:
- endtime = time() + timeout
- # could be improved by a separate: no-endtime branch, saving the time calls
- for i in xrange(count):
+ i = 0
+ while i < count:
try:
out.append(queue.get(block, timeout))
+ i += 1
except Empty:
- # here we are only if there is nothing on the queue,
- # and if we are blocking. If we are not blocking, this
- # indiccates that the queue was set unwritable in the meanwhile.
- # hence we can abort now to prevent reading (possibly) forever
- # Besides, this is racy as all threads will rip on the channel
- # without waiting until its empty
- if not block:
- break
- # END ignore empty
-
- # if we have been unblocked because the closed state changed
- # in the meanwhile, stop trying
- # NOTE: must NOT cache _wc
- if self._wc.closed:
- # If we were closed, we drop out even if there might still
- # be items. Now its time to get these items, according to
- # our count. Just switch to unblocking mode.
- # If we are to read unlimited items, this would run forever,
- # but the EmptyException handler takes care of this
- block = False
+ # here we are only if
+ # someone woke us up to inform us about the queue that changed
+ # its writable state
+ # The following branch checks for closed channels, and pulls
+ # as many items as we need and as possible, before
+ # leaving the loop.
+ if not queue.writable():
+ try:
+ while i < count:
+ out.append(queue.get(False, None))
+ i += 1
+ # END count loop
+ except Empty:
+ break # out of count loop
+ # END handle absolutely empty queue
+ # END handle closed channel
- # we don't continue, but let the timer decide whether
- # it wants to abort
- # END handle channel cloased
-
- if time() >= endtime:
+ # if we are here, we woke up and the channel is not closed
+ # Either the queue became writable again, which currently shouldn't
+ # be able to happen in the channel, or someone read with a timeout
+ # that actually timed out.
+ # As it timed out, which is the only reason we are here,
+ # we have to abort
break
- # END stop operation on timeout
+ # END ignore empty
+
# END for each item
# END handle blocking
return out
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index d18cedca..539b240f 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -66,7 +66,7 @@ class OutputChannelTask(Node):
def is_closed(self):
""":return: True if the task's write channel is closed"""
- return self._out_wc.closed
+ return self._out_wc.closed()
def error(self):
""":return: Exception caught during last processing or None"""