summaryrefslogtreecommitdiff
path: root/python/qpid/queue.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-02-25 21:29:55 +0000
committerRafael H. Schloming <rhs@apache.org>2008-02-25 21:29:55 +0000
commit07eb942a521e951581e7a0d4558e08e4b29b6057 (patch)
treeb20027151b0be8ae6833416133ddd5ed7107af3c /python/qpid/queue.py
parent773cc35a38cd34095f8800259ee7a2165a817053 (diff)
downloadqpid-python-07eb942a521e951581e7a0d4558e08e4b29b6057.tar.gz
put queue listeners in their own thread
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@631002 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/queue.py')
-rw-r--r--python/qpid/queue.py47
1 files changed, 31 insertions, 16 deletions
diff --git a/python/qpid/queue.py b/python/qpid/queue.py
index af0565b6cc..00946a9156 100644
--- a/python/qpid/queue.py
+++ b/python/qpid/queue.py
@@ -24,36 +24,51 @@ content of a queue can be notified if the queue is no longer in use.
"""
from Queue import Queue as BaseQueue, Empty, Full
+from threading import Thread
class Closed(Exception): pass
class Queue(BaseQueue):
END = object()
+ STOP = object()
def __init__(self, *args, **kwargs):
BaseQueue.__init__(self, *args, **kwargs)
- self._real_put = self.put
- self.listener = self._real_put
+ self.listener = None
+ self.thread = None
def close(self):
self.put(Queue.END)
def get(self, block = True, timeout = None):
- self.put = self._real_put
- try:
- result = BaseQueue.get(self, block, timeout)
- if result == Queue.END:
- # this guarantees that any other waiting threads or any future
- # calls to get will also result in a Closed exception
- self.put(Queue.END)
- raise Closed()
- else:
- return result
- finally:
- self.put = self.listener
- pass
+ result = BaseQueue.get(self, block, timeout)
+ if result == Queue.END:
+ # this guarantees that any other waiting threads or any future
+ # calls to get will also result in a Closed exception
+ self.put(Queue.END)
+ raise Closed()
+ else:
+ return result
def listen(self, listener):
self.listener = listener
- self.put = self.listener
+ if listener == None:
+ if self.thread != None:
+ self.put(Queue.STOP)
+ self.thread.join()
+ self.thread = None
+ else:
+ if self.thread == None:
+ self.thread = Thread(target = self.run)
+ self.thread.setDaemon(True)
+ self.thread.start()
+
+ def run(self):
+ while True:
+ try:
+ o = self.get()
+ if o == Queue.STOP: break
+ self.listener(o)
+ except Closed:
+ break