summaryrefslogtreecommitdiff
path: root/python/qpid/messaging.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-11-12 18:33:25 +0000
committerRafael H. Schloming <rhs@apache.org>2009-11-12 18:33:25 +0000
commit48572ba0ee68195b39b932909fcb99a57c9cf826 (patch)
tree8545375b67e8e43cd4db1b2badc7de04f7c7c3e9 /python/qpid/messaging.py
parent2cb5195f485b3d2e312c8cc9cf8c9dab4cd980ed (diff)
downloadqpid-python-48572ba0ee68195b39b932909fcb99a57c9cf826.tar.gz
removed listeners in favor of next_receiver
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@835488 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r--python/qpid/messaging.py55
1 files changed, 11 insertions, 44 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py
index 2fe7d33ca9..ed0bd14f9c 100644
--- a/python/qpid/messaging.py
+++ b/python/qpid/messaging.py
@@ -295,10 +295,6 @@ class Session:
self.closed = False
self._lock = connection._lock
- self.running = True
- self.thread = Thread(target = self.run)
- self.thread.setDaemon(True)
- self.thread.start()
def __repr__(self):
return "<Session %s>" % self.name
@@ -342,8 +338,8 @@ class Session:
@synchronized
def receiver(self, source, **options):
"""
- Creates a receiver that may be used to actively fetch or to listen
- for the arrival of L{Messages<Message>} from the specified source.
+ Creates a receiver that may be used to fetch L{Messages<Message>}
+ from the specified source.
@type source: str
@param source: the source of L{Messages<Message>}
@@ -392,6 +388,13 @@ class Session:
return None
@synchronized
+ def next_receiver(self, timeout=None):
+ if self._ewait(lambda: self.incoming, timeout):
+ return self.incoming[0]._receiver
+ else:
+ raise Empty
+
+ @synchronized
def acknowledge(self, message=None, sync=True):
"""
Acknowledge the given L{Message}. If message is None, then all
@@ -465,29 +468,8 @@ class Session:
"""
for rcv in self.receivers:
rcv.stop()
- # TODO: think about stopping individual receivers in listen mode
- self._wait(lambda: self._peek(self._pred) is None)
self.started = False
- def _pred(self, m):
- return m._receiver.listener is not None
-
- @synchronized
- def run(self):
- self.running = True
- try:
- while True:
- msg = self._get(self._pred)
- if msg is None:
- break;
- else:
- msg._receiver.listener(msg)
- if self._peek(self._pred) is None:
- self.connection._waiter.notifyAll()
- finally:
- self.running = False
- self.connection._waiter.notifyAll()
-
@synchronized
def close(self):
"""
@@ -498,10 +480,7 @@ class Session:
self.closing = True
self._wakeup()
- self._ewait(lambda: self.closed and not self.running)
- while self.thread.isAlive():
- self.thread.join(3)
- self.thread = None
+ self._ewait(lambda: self.closed)
# XXX: should be able to express this condition through API calls
self._ewait(lambda: not self.outgoing and not self.acked)
self.connection._remove_session(self)
@@ -636,8 +615,7 @@ class Receiver:
"""
Receives incoming messages from a remote source. Messages may be
- actively fetched with L{fetch} or a listener may be installed with
- L{listen}.
+ fetched with L{fetch}.
"""
def __init__(self, session, index, source, options, started):
@@ -659,7 +637,6 @@ class Receiver:
self.linked = False
self.closing = False
self.closed = False
- self.listener = None
self._lock = self.session._lock
def _wakeup(self):
@@ -694,16 +671,6 @@ class Receiver:
else:
return self.capacity
- @synchronized
- def listen(self, listener=None):
- """
- Sets the message listener for this receiver.
-
- @type listener: callable
- @param listener: a callable object to be notified on message arrival
- """
- self.listener = listener
-
def _pred(self, msg):
return msg._receiver == self