summaryrefslogtreecommitdiff
path: root/lib/git/async
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-08 13:24:44 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-08 13:24:44 +0200
commit3776f7a766851058f6435b9f606b16766425d7ca (patch)
tree8096d6d84090f4abc5aad374c1fe6f64088572a6 /lib/git/async
parent09c3f39ceb545e1198ad7a3f470d4ec896ce1add (diff)
downloadgitpython-3776f7a766851058f6435b9f606b16766425d7ca.tar.gz
The new channeldesign actually works, but it also shows that its located at the wrong spot. The channel is nothing more than an adapter allowing to read multiple items from a thread-safe queue, the queue itself though must be 'closable' for writing, or needs something like a writable flag.
Diffstat (limited to 'lib/git/async')
-rw-r--r--lib/git/async/channel.py108
-rw-r--r--lib/git/async/task.py6
-rw-r--r--lib/git/async/util.py32
3 files changed, 79 insertions, 67 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
index 2d5ab79c..655024fe 100644
--- a/lib/git/async/channel.py
+++ b/lib/git/async/channel.py
@@ -4,7 +4,11 @@ from Queue import (
Full
)
-from util import AsyncQueue
+from util import (
+ AsyncQueue,
+ DummyLock
+ )
+
from time import time
import sys
@@ -23,12 +27,9 @@ class Channel(object):
def __new__(cls, *args):
if cls is Channel:
- max_items = 0
- if len(args) == 1:
- max_items = args[0]
- if len(args) > 1:
- raise ValueError("Specify not more than the number of items the channel should take")
- wc = WChannel(max_items)
+ if len(args) > 0:
+ raise ValueError("Cannot take any arguments when creating a new channel")
+ wc = WChannel()
rc = RChannel(wc)
return wc, rc
# END constructor mode
@@ -39,11 +40,11 @@ class WChannel(Channel):
"""The write end of a channel"""
__slots__ = ('_closed', '_queue')
- def __init__(self, max_items=0):
+ def __init__(self):
"""initialize this instance, able to hold max_items at once
Write calls will block if the channel is full, until someone reads from it"""
self._closed = False
- self._queue = AsyncQueue(max_items)
+ self._queue = AsyncQueue()
#{ Interface
@@ -74,7 +75,21 @@ class WChannel(Channel):
def close(self):
"""Close the channel. Multiple close calls on a closed channel are no
an error"""
+ mutex = self._queue.mutex
+ mutex.acquire()
+ # this is atomic already, due to the GIL - no need to get the queue's mutex
+ print "channel.close()"
self._closed = True
+ # now make sure that the people waiting for an item are released now
+ # As we it could be that some readers are already on their way to initiate
+ # a blocking get, we must make sure that locks never block before that happens
+
+ # now we are the only one accessing the queue, so change it
+ self._queue.mutex = DummyLock()
+ print self._queue.not_empty._waiters
+ self._queue.not_empty.notify_all()
+ print self._queue.not_empty._waiters
+ mutex.release()
@property
def closed(self):
@@ -134,58 +149,47 @@ class RChannel(Channel):
pass
# END handle exceptions
else:
- # if we have really bad timing, the source of the channel
- # marks itself closed, but before setting it, the thread
- # switches to us. We read it, read False, and try to fetch
- # something, and never return. The whole closed channel thing
- # is not atomic ( of course )
- # This is why we never block for long, to get a chance to recheck
- # for closed channels.
- # We blend this into the timeout of the user
- ourtimeout = 0.25 # the smaller, the more responsive, but the slower
- wc = self._wc
- timeout = (timeout is None and sys.maxint) or timeout # make sure we can compute with it
- assert timeout != 0.0, "shouldn't block if timeout is 0" # okay safe
- if timeout and ourtimeout > timeout:
- ourtimeout = timeout
- # END setup timeout
-
# to get everything into one loop, we set the count accordingly
if count == 0:
count = sys.maxint
# END handle count
+ endtime = sys.maxint # allows timeout for whole operation
+ if timeout is not None:
+ endtime = time() + timeout
+ # could be improved by a separate: no-endtime branch, saving the time calls
for i in xrange(count):
- have_timeout = False
- st = time()
- while True:
+ try:
+ print "about to read", i, count, block, timeout
+ out.append(queue.get(block, timeout))
+ print "got one"
+ except Empty:
+ pass
+ # END ignore empty
+
+ # if we have been unblocked because the closed state changed
+ # in the meanwhile, stop trying
+ # NOTE: must NOT cache _wc
+ if self._wc.closed:
+ # its racing time - all threads waiting for the queue
+ # are awake now, and we actually can't be sure its empty
+ # Hence we pop it empty without blocking, getting as much
+ # as we can. This effectively lets us race ( with mutexes )
+ # of the other threads.
+ print "stopped because it was closed"
try:
- if wc.closed:
- have_timeout = True
- # its about the 'in the meanwhile' :) - get everything
- # we can in non-blocking mode. This will raise
- try:
- while True:
- out.append(queue.get(False))
- # END until it raises Empty
- except Empty:
- break
- # END finally, out of here
- # END don't continue on closed channels
-
- # END abort reading if it was closed ( in the meanwhile )
- out.append(queue.get(block, ourtimeout))
- break # breakout right away
+ while True:
+ out.append(queue.get(False))
+ # END pop it empty
except Empty:
- if timeout - (time() - st) <= 0:
- # hitting timeout
- have_timeout = True
- break
- # END abort if the user wants no more time spent here
- # END handle timeout
- # END endless timer loop
- if have_timeout:
+ pass
+ # END ignore emptyness, we have all
+
break
+ # END handle cloased
+
+ if time() >= endtime:
+ break
# END stop on timeout
# END for each item
# END handle blocking
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index cf486f48..ce701c86 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -140,10 +140,10 @@ class OutputChannelTask(Node):
# If we appear to be the only one left with our output channel, and are
# closed ( this could have been set in another thread as well ), make
# sure to close the output channel.
- # The count is: 1 = wc itself, 2 = first reader channel, and we have only
- # one, 3 is ours + x for every thread having its copy on the stack
+ # The count is: 1 = wc itself, 2 = first reader channel, + x for every
+ # thread having its copy on the stack
# + 1 for the instance we provide to refcount
- if self.is_done() and sys.getrefcount(self._out_wc) < 5:
+ if self.is_done() and sys.getrefcount(self._out_wc) < 4:
self.close()
# END handle channel closure
#{ Configuration
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
index fb63ccaa..01073f6d 100644
--- a/lib/git/async/util.py
+++ b/lib/git/async/util.py
@@ -73,21 +73,22 @@ class SyncQueue(deque):
class HSCondition(object):
"""An attempt to make conditions less blocking, which gains performance
in return by sleeping less"""
- __slots__ = ("acquire", "release", "_lock", '_waiters')
+ # __slots__ = ("acquire", "release", "_lock", '_waiters')
+ __slots__ = ("_lock", '_waiters')
delay = 0.00002 # reduces wait times, but increases overhead
def __init__(self, lock=None):
if lock is None:
lock = Lock()
self._lock = lock
- self.acquire = lock.acquire
- self.release = lock.release
+ #self.acquire = lock.acquire
+ #self.release = lock.release
self._waiters = list()
- def __release(self):
+ def release(self):
return self._lock.release()
- def __acquire(self, block=None):
+ def acquire(self, block=None):
if block is None:
self._lock.acquire()
else:
@@ -156,7 +157,7 @@ class HSCondition(object):
self.notify(len(self._waiters))
-class AsyncQueue(Queue):
+class _AsyncQueue(Queue):
"""A queue using different condition objects to gain multithreading performance"""
def __init__(self, maxsize=0):
Queue.__init__(self, maxsize)
@@ -166,7 +167,7 @@ class AsyncQueue(Queue):
self.all_tasks_done = HSCondition(self.mutex)
-class _AsyncQueue(Queue):
+class AsyncQueue(Queue):
"""A queue using different condition objects to gain multithreading performance"""
__slots__ = ('mutex', 'not_empty', 'queue')
@@ -194,9 +195,9 @@ class _AsyncQueue(Queue):
self.queue.append(item)
self.mutex.release()
self.not_empty.notify()
-
+
def get(self, block=True, timeout=None):
- self.not_empty.acquire()
+ self.not_empty.acquire() # == self.mutex.acquire in that case
q = self.queue
try:
if not block:
@@ -205,16 +206,23 @@ class _AsyncQueue(Queue):
elif timeout is None:
while not len(q):
self.not_empty.wait()
- elif timeout < 0:
- raise ValueError("'timeout' must be a positive number")
else:
+ print "with timeout", timeout
+ import traceback
+ traceback.print_stack()
endtime = _time() + timeout
while not len(q):
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
- return q.popleft()
+ # END handle block
+ # can happen if someone else woke us up
+ try:
+ return q.popleft()
+ except IndexError:
+ raise Empty
+ # END handle unblocking reason
finally:
self.not_empty.release()