diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-02-25 21:29:55 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-02-25 21:29:55 +0000 |
commit | 07eb942a521e951581e7a0d4558e08e4b29b6057 (patch) | |
tree | b20027151b0be8ae6833416133ddd5ed7107af3c /python/qpid/queue.py | |
parent | 773cc35a38cd34095f8800259ee7a2165a817053 (diff) | |
download | qpid-python-07eb942a521e951581e7a0d4558e08e4b29b6057.tar.gz |
put queue listeners in their own thread
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@631002 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/queue.py')
-rw-r--r-- | python/qpid/queue.py | 47 |
1 files changed, 31 insertions, 16 deletions
diff --git a/python/qpid/queue.py b/python/qpid/queue.py index af0565b6cc..00946a9156 100644 --- a/python/qpid/queue.py +++ b/python/qpid/queue.py @@ -24,36 +24,51 @@ content of a queue can be notified if the queue is no longer in use. """ from Queue import Queue as BaseQueue, Empty, Full +from threading import Thread class Closed(Exception): pass class Queue(BaseQueue): END = object() + STOP = object() def __init__(self, *args, **kwargs): BaseQueue.__init__(self, *args, **kwargs) - self._real_put = self.put - self.listener = self._real_put + self.listener = None + self.thread = None def close(self): self.put(Queue.END) def get(self, block = True, timeout = None): - self.put = self._real_put - try: - result = BaseQueue.get(self, block, timeout) - if result == Queue.END: - # this guarantees that any other waiting threads or any future - # calls to get will also result in a Closed exception - self.put(Queue.END) - raise Closed() - else: - return result - finally: - self.put = self.listener - pass + result = BaseQueue.get(self, block, timeout) + if result == Queue.END: + # this guarantees that any other waiting threads or any future + # calls to get will also result in a Closed exception + self.put(Queue.END) + raise Closed() + else: + return result def listen(self, listener): self.listener = listener - self.put = self.listener + if listener == None: + if self.thread != None: + self.put(Queue.STOP) + self.thread.join() + self.thread = None + else: + if self.thread == None: + self.thread = Thread(target = self.run) + self.thread.setDaemon(True) + self.thread.start() + + def run(self): + while True: + try: + o = self.get() + if o == Queue.STOP: break + self.listener(o) + except Closed: + break |