diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-08 14:23:58 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-08 14:23:58 +0200 |
commit | 53152a824f5186452504f0b68306d10ebebee416 (patch) | |
tree | 9861c0337b4aa9a9600ce840910f7aac2de7425c /lib/git/async/util.py | |
parent | 3776f7a766851058f6435b9f606b16766425d7ca (diff) | |
download | gitpython-53152a824f5186452504f0b68306d10ebebee416.tar.gz |
queue: adjusted queue to be closable ( without own testing yet, except for the pool which runs it ) - its not yet stable, but should be solvable.
Diffstat (limited to 'lib/git/async/util.py')
-rw-r--r-- | lib/git/async/util.py | 68 |
1 files changed, 48 insertions, 20 deletions
diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 01073f6d..51219cc4 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -166,15 +166,21 @@ class _AsyncQueue(Queue): self.not_full = HSCondition(self.mutex) self.all_tasks_done = HSCondition(self.mutex) - + +class ReadOnly(Exception): + """Thrown when trying to write to a read-only queue""" + class AsyncQueue(Queue): - """A queue using different condition objects to gain multithreading performance""" - __slots__ = ('mutex', 'not_empty', 'queue') + """A queue using different condition objects to gain multithreading performance. + Additionally it has a threadsafe writable flag, which will alert all readers + that there is nothing more to get here.""" + __slots__ = ('mutex', 'not_empty', 'queue', '_writable') def __init__(self, maxsize=0): self.queue = deque() self.mutex = Lock() self.not_empty = HSCondition(self.mutex) + self._writable = True def qsize(self): self.mutex.acquire() @@ -183,6 +189,29 @@ class AsyncQueue(Queue): finally: self.mutex.release() + def writable(self): + self.mutex.acquire() + try: + return self._writable + finally: + self.mutex.release() + + def set_writable(self, state): + """Set the writable flag of this queue to True or False + :return: The previous state""" + self.mutex.acquire() + try: + old = self._writable + self._writable = state + return old + finally: + # if we won't receive anymore items, inform the getters + if not state: + self.not_empty.notify_all() + # END tell everyone + self.mutex.release() + # END handle locking + def empty(self): self.mutex.acquire() try: @@ -192,6 +221,9 @@ class AsyncQueue(Queue): def put(self, item, block=True, timeout=None): self.mutex.acquire() + if not self._writable: + raise ReadOnly + # END handle read-only self.queue.append(item) self.mutex.release() self.not_empty.notify() @@ -200,24 +232,20 @@ class AsyncQueue(Queue): self.not_empty.acquire() # == self.mutex.acquire in that case q = self.queue try: - if not block: - if not len(q): - raise Empty - elif timeout is None: - while not len(q): - self.not_empty.wait() - else: - print "with timeout", timeout - import traceback - traceback.print_stack() - endtime = _time() + timeout - while not len(q): - remaining = endtime - _time() - if remaining <= 0.0: - raise Empty - self.not_empty.wait(remaining) + if block: + if timeout is None: + while not len(q) and self._writable: + self.not_empty.wait() + else: + endtime = _time() + timeout + while not len(q) and self._writable: + remaining = endtime - _time() + if remaining <= 0.0: + raise Empty + self.not_empty.wait(remaining) + # END handle timeout mode # END handle block - # can happen if someone else woke us up + # can happen if we woke up because we are not writable anymore try: return q.popleft() except IndexError: |