diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-08 14:23:58 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-08 14:23:58 +0200 |
commit | 53152a824f5186452504f0b68306d10ebebee416 (patch) | |
tree | 9861c0337b4aa9a9600ce840910f7aac2de7425c /lib/git | |
parent | 3776f7a766851058f6435b9f606b16766425d7ca (diff) | |
download | gitpython-53152a824f5186452504f0b68306d10ebebee416.tar.gz |
queue: adjusted queue to be closable ( without own testing yet, except for the pool which runs it ) - its not yet stable, but should be solvable.
Diffstat (limited to 'lib/git')
-rw-r--r-- | lib/git/async/channel.py | 44 | ||||
-rw-r--r-- | lib/git/async/util.py | 68 |
2 files changed, 62 insertions, 50 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 655024fe..08323582 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -6,7 +6,6 @@ from Queue import ( from util import ( AsyncQueue, - DummyLock ) from time import time @@ -56,15 +55,13 @@ class WChannel(Channel): channel :param timeout: timeout in seconds for blocking calls. :raise IOError: when writing into closed file - :raise EOFError: when writing into a non-blocking full channel - :note: may block if the channel has a limited capacity""" - if self._closed: - raise IOError("Cannot write to a closed channel") - + :raise EOFError: when writing into a non-blocking full channel""" + # let the queue handle the 'closed' attribute, we write much more often + # to an open channel than to a closed one, saving a few cycles try: self._queue.put(item, block, timeout) - except Full: - raise EOFError("Capacity of the channel was exeeded") + except ReadOnly: + raise IOError("Cannot write to a closed channel") # END exception handling def size(self): @@ -75,21 +72,10 @@ class WChannel(Channel): def close(self): """Close the channel. Multiple close calls on a closed channel are no an error""" - mutex = self._queue.mutex - mutex.acquire() - # this is atomic already, due to the GIL - no need to get the queue's mutex - print "channel.close()" + # yes, close it a little too early, better than having anyone put + # additional items self._closed = True - # now make sure that the people waiting for an item are released now - # As we it could be that some readers are already on their way to initiate - # a blocking get, we must make sure that locks never block before that happens - - # now we are the only one accessing the queue, so change it - self._queue.mutex = DummyLock() - print self._queue.not_empty._waiters - self._queue.not_empty.notify_all() - print self._queue.not_empty._waiters - mutex.release() + self._queue.set_writable(False) @property def closed(self): @@ -124,6 +110,7 @@ class RChannel(Channel): If count was < 1, a list with all items that could be read will be returned.""" # if the channel is closed for writing, we never block + # NOTE: is handled by the queue if self._wc.closed or timeout == 0: block = False @@ -160,9 +147,7 @@ class RChannel(Channel): # could be improved by a separate: no-endtime branch, saving the time calls for i in xrange(count): try: - print "about to read", i, count, block, timeout out.append(queue.get(block, timeout)) - print "got one" except Empty: pass # END ignore empty @@ -176,7 +161,6 @@ class RChannel(Channel): # Hence we pop it empty without blocking, getting as much # as we can. This effectively lets us race ( with mutexes ) # of the other threads. - print "stopped because it was closed" try: while True: out.append(queue.get(False)) @@ -186,11 +170,11 @@ class RChannel(Channel): # END ignore emptyness, we have all break - # END handle cloased - - if time() >= endtime: - break - # END stop on timeout + # END handle channel cloased + + if time() >= endtime: + break + # END stop operation on timeout # END for each item # END handle blocking return out diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 01073f6d..51219cc4 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -166,15 +166,21 @@ class _AsyncQueue(Queue): self.not_full = HSCondition(self.mutex) self.all_tasks_done = HSCondition(self.mutex) - + +class ReadOnly(Exception): + """Thrown when trying to write to a read-only queue""" + class AsyncQueue(Queue): - """A queue using different condition objects to gain multithreading performance""" - __slots__ = ('mutex', 'not_empty', 'queue') + """A queue using different condition objects to gain multithreading performance. + Additionally it has a threadsafe writable flag, which will alert all readers + that there is nothing more to get here.""" + __slots__ = ('mutex', 'not_empty', 'queue', '_writable') def __init__(self, maxsize=0): self.queue = deque() self.mutex = Lock() self.not_empty = HSCondition(self.mutex) + self._writable = True def qsize(self): self.mutex.acquire() @@ -183,6 +189,29 @@ class AsyncQueue(Queue): finally: self.mutex.release() + def writable(self): + self.mutex.acquire() + try: + return self._writable + finally: + self.mutex.release() + + def set_writable(self, state): + """Set the writable flag of this queue to True or False + :return: The previous state""" + self.mutex.acquire() + try: + old = self._writable + self._writable = state + return old + finally: + # if we won't receive anymore items, inform the getters + if not state: + self.not_empty.notify_all() + # END tell everyone + self.mutex.release() + # END handle locking + def empty(self): self.mutex.acquire() try: @@ -192,6 +221,9 @@ class AsyncQueue(Queue): def put(self, item, block=True, timeout=None): self.mutex.acquire() + if not self._writable: + raise ReadOnly + # END handle read-only self.queue.append(item) self.mutex.release() self.not_empty.notify() @@ -200,24 +232,20 @@ class AsyncQueue(Queue): self.not_empty.acquire() # == self.mutex.acquire in that case q = self.queue try: - if not block: - if not len(q): - raise Empty - elif timeout is None: - while not len(q): - self.not_empty.wait() - else: - print "with timeout", timeout - import traceback - traceback.print_stack() - endtime = _time() + timeout - while not len(q): - remaining = endtime - _time() - if remaining <= 0.0: - raise Empty - self.not_empty.wait(remaining) + if block: + if timeout is None: + while not len(q) and self._writable: + self.not_empty.wait() + else: + endtime = _time() + timeout + while not len(q) and self._writable: + remaining = endtime - _time() + if remaining <= 0.0: + raise Empty + self.not_empty.wait(remaining) + # END handle timeout mode # END handle block - # can happen if someone else woke us up + # can happen if we woke up because we are not writable anymore try: return q.popleft() except IndexError: |