diff options
Diffstat (limited to 'lib/git/async')
-rw-r--r-- | lib/git/async/channel.py | 108 | ||||
-rw-r--r-- | lib/git/async/task.py | 6 | ||||
-rw-r--r-- | lib/git/async/util.py | 32 |
3 files changed, 79 insertions, 67 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 2d5ab79c..655024fe 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -4,7 +4,11 @@ from Queue import ( Full ) -from util import AsyncQueue +from util import ( + AsyncQueue, + DummyLock + ) + from time import time import sys @@ -23,12 +27,9 @@ class Channel(object): def __new__(cls, *args): if cls is Channel: - max_items = 0 - if len(args) == 1: - max_items = args[0] - if len(args) > 1: - raise ValueError("Specify not more than the number of items the channel should take") - wc = WChannel(max_items) + if len(args) > 0: + raise ValueError("Cannot take any arguments when creating a new channel") + wc = WChannel() rc = RChannel(wc) return wc, rc # END constructor mode @@ -39,11 +40,11 @@ class WChannel(Channel): """The write end of a channel""" __slots__ = ('_closed', '_queue') - def __init__(self, max_items=0): + def __init__(self): """initialize this instance, able to hold max_items at once Write calls will block if the channel is full, until someone reads from it""" self._closed = False - self._queue = AsyncQueue(max_items) + self._queue = AsyncQueue() #{ Interface @@ -74,7 +75,21 @@ class WChannel(Channel): def close(self): """Close the channel. Multiple close calls on a closed channel are no an error""" + mutex = self._queue.mutex + mutex.acquire() + # this is atomic already, due to the GIL - no need to get the queue's mutex + print "channel.close()" self._closed = True + # now make sure that the people waiting for an item are released now + # As we it could be that some readers are already on their way to initiate + # a blocking get, we must make sure that locks never block before that happens + + # now we are the only one accessing the queue, so change it + self._queue.mutex = DummyLock() + print self._queue.not_empty._waiters + self._queue.not_empty.notify_all() + print self._queue.not_empty._waiters + mutex.release() @property def closed(self): @@ -134,58 +149,47 @@ class RChannel(Channel): pass # END handle exceptions else: - # if we have really bad timing, the source of the channel - # marks itself closed, but before setting it, the thread - # switches to us. We read it, read False, and try to fetch - # something, and never return. The whole closed channel thing - # is not atomic ( of course ) - # This is why we never block for long, to get a chance to recheck - # for closed channels. - # We blend this into the timeout of the user - ourtimeout = 0.25 # the smaller, the more responsive, but the slower - wc = self._wc - timeout = (timeout is None and sys.maxint) or timeout # make sure we can compute with it - assert timeout != 0.0, "shouldn't block if timeout is 0" # okay safe - if timeout and ourtimeout > timeout: - ourtimeout = timeout - # END setup timeout - # to get everything into one loop, we set the count accordingly if count == 0: count = sys.maxint # END handle count + endtime = sys.maxint # allows timeout for whole operation + if timeout is not None: + endtime = time() + timeout + # could be improved by a separate: no-endtime branch, saving the time calls for i in xrange(count): - have_timeout = False - st = time() - while True: + try: + print "about to read", i, count, block, timeout + out.append(queue.get(block, timeout)) + print "got one" + except Empty: + pass + # END ignore empty + + # if we have been unblocked because the closed state changed + # in the meanwhile, stop trying + # NOTE: must NOT cache _wc + if self._wc.closed: + # its racing time - all threads waiting for the queue + # are awake now, and we actually can't be sure its empty + # Hence we pop it empty without blocking, getting as much + # as we can. This effectively lets us race ( with mutexes ) + # of the other threads. + print "stopped because it was closed" try: - if wc.closed: - have_timeout = True - # its about the 'in the meanwhile' :) - get everything - # we can in non-blocking mode. This will raise - try: - while True: - out.append(queue.get(False)) - # END until it raises Empty - except Empty: - break - # END finally, out of here - # END don't continue on closed channels - - # END abort reading if it was closed ( in the meanwhile ) - out.append(queue.get(block, ourtimeout)) - break # breakout right away + while True: + out.append(queue.get(False)) + # END pop it empty except Empty: - if timeout - (time() - st) <= 0: - # hitting timeout - have_timeout = True - break - # END abort if the user wants no more time spent here - # END handle timeout - # END endless timer loop - if have_timeout: + pass + # END ignore emptyness, we have all + break + # END handle cloased + + if time() >= endtime: + break # END stop on timeout # END for each item # END handle blocking diff --git a/lib/git/async/task.py b/lib/git/async/task.py index cf486f48..ce701c86 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -140,10 +140,10 @@ class OutputChannelTask(Node): # If we appear to be the only one left with our output channel, and are # closed ( this could have been set in another thread as well ), make # sure to close the output channel. - # The count is: 1 = wc itself, 2 = first reader channel, and we have only - # one, 3 is ours + x for every thread having its copy on the stack + # The count is: 1 = wc itself, 2 = first reader channel, + x for every + # thread having its copy on the stack # + 1 for the instance we provide to refcount - if self.is_done() and sys.getrefcount(self._out_wc) < 5: + if self.is_done() and sys.getrefcount(self._out_wc) < 4: self.close() # END handle channel closure #{ Configuration diff --git a/lib/git/async/util.py b/lib/git/async/util.py index fb63ccaa..01073f6d 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -73,21 +73,22 @@ class SyncQueue(deque): class HSCondition(object): """An attempt to make conditions less blocking, which gains performance in return by sleeping less""" - __slots__ = ("acquire", "release", "_lock", '_waiters') + # __slots__ = ("acquire", "release", "_lock", '_waiters') + __slots__ = ("_lock", '_waiters') delay = 0.00002 # reduces wait times, but increases overhead def __init__(self, lock=None): if lock is None: lock = Lock() self._lock = lock - self.acquire = lock.acquire - self.release = lock.release + #self.acquire = lock.acquire + #self.release = lock.release self._waiters = list() - def __release(self): + def release(self): return self._lock.release() - def __acquire(self, block=None): + def acquire(self, block=None): if block is None: self._lock.acquire() else: @@ -156,7 +157,7 @@ class HSCondition(object): self.notify(len(self._waiters)) -class AsyncQueue(Queue): +class _AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance""" def __init__(self, maxsize=0): Queue.__init__(self, maxsize) @@ -166,7 +167,7 @@ class AsyncQueue(Queue): self.all_tasks_done = HSCondition(self.mutex) -class _AsyncQueue(Queue): +class AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance""" __slots__ = ('mutex', 'not_empty', 'queue') @@ -194,9 +195,9 @@ class _AsyncQueue(Queue): self.queue.append(item) self.mutex.release() self.not_empty.notify() - + def get(self, block=True, timeout=None): - self.not_empty.acquire() + self.not_empty.acquire() # == self.mutex.acquire in that case q = self.queue try: if not block: @@ -205,16 +206,23 @@ class _AsyncQueue(Queue): elif timeout is None: while not len(q): self.not_empty.wait() - elif timeout < 0: - raise ValueError("'timeout' must be a positive number") else: + print "with timeout", timeout + import traceback + traceback.print_stack() endtime = _time() + timeout while not len(q): remaining = endtime - _time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) - return q.popleft() + # END handle block + # can happen if someone else woke us up + try: + return q.popleft() + except IndexError: + raise Empty + # END handle unblocking reason finally: self.not_empty.release() |