summaryrefslogtreecommitdiff
path: root/python/qpid/queue.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-05-09 18:40:13 +0000
committerRafael H. Schloming <rhs@apache.org>2008-05-09 18:40:13 +0000
commit2ebccc4f3ab6e7813ac2179c8318163ffdd22cff (patch)
tree423197bb243ca909e90637fdc24c3a5a9565ad81 /python/qpid/queue.py
parent7f0c95b0e94c964a92c77c7c8c59035ffff35f34 (diff)
downloadqpid-python-2ebccc4f3ab6e7813ac2179c8318163ffdd22cff.tar.gz
QPID-1045: always notify incoming message queues of session closure and provide API for notifying listeners of closure; also preserve connection close code and report in errors
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@654907 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/queue.py')
-rw-r--r--python/qpid/queue.py21
1 files changed, 14 insertions, 7 deletions
diff --git a/python/qpid/queue.py b/python/qpid/queue.py
index ea8f00d091..a8a5c0d9ad 100644
--- a/python/qpid/queue.py
+++ b/python/qpid/queue.py
@@ -25,8 +25,7 @@ content of a queue can be notified if the queue is no longer in use.
from Queue import Queue as BaseQueue, Empty, Full
from threading import Thread
-
-class Closed(Exception): pass
+from exceptions import Closed
class Queue(BaseQueue):
@@ -37,6 +36,7 @@ class Queue(BaseQueue):
BaseQueue.__init__(self, *args, **kwargs)
self.error = None
self.listener = None
+ self.exc_listener = None
self.thread = None
def close(self, error = None):
@@ -53,15 +53,20 @@ class Queue(BaseQueue):
else:
return result
- def listen(self, listener):
+ def listen(self, listener, exc_listener = None):
+ if listener is None and exc_listener is not None:
+ raise ValueError("cannot set exception listener without setting listener")
+
self.listener = listener
- if listener == None:
- if self.thread != None:
+ self.exc_listener = exc_listener
+
+ if listener is None:
+ if self.thread is not None:
self.put(Queue.STOP)
self.thread.join()
self.thread = None
else:
- if self.thread == None:
+ if self.thread is None:
self.thread = Thread(target = self.run)
self.thread.setDaemon(True)
self.thread.start()
@@ -72,5 +77,7 @@ class Queue(BaseQueue):
o = self.get()
if o == Queue.STOP: break
self.listener(o)
- except Closed:
+ except Closed, e:
+ if self.exc_listener is not None:
+ self.exc_listener(e)
break