diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2009-11-12 18:33:25 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2009-11-12 18:33:25 +0000 |
| commit | 48572ba0ee68195b39b932909fcb99a57c9cf826 (patch) | |
| tree | 8545375b67e8e43cd4db1b2badc7de04f7c7c3e9 /python/qpid/messaging.py | |
| parent | 2cb5195f485b3d2e312c8cc9cf8c9dab4cd980ed (diff) | |
| download | qpid-python-48572ba0ee68195b39b932909fcb99a57c9cf826.tar.gz | |
removed listeners in favor of next_receiver
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@835488 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging.py')
| -rw-r--r-- | python/qpid/messaging.py | 55 |
1 files changed, 11 insertions, 44 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index 2fe7d33ca9..ed0bd14f9c 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -295,10 +295,6 @@ class Session: self.closed = False self._lock = connection._lock - self.running = True - self.thread = Thread(target = self.run) - self.thread.setDaemon(True) - self.thread.start() def __repr__(self): return "<Session %s>" % self.name @@ -342,8 +338,8 @@ class Session: @synchronized def receiver(self, source, **options): """ - Creates a receiver that may be used to actively fetch or to listen - for the arrival of L{Messages<Message>} from the specified source. + Creates a receiver that may be used to fetch L{Messages<Message>} + from the specified source. @type source: str @param source: the source of L{Messages<Message>} @@ -392,6 +388,13 @@ class Session: return None @synchronized + def next_receiver(self, timeout=None): + if self._ewait(lambda: self.incoming, timeout): + return self.incoming[0]._receiver + else: + raise Empty + + @synchronized def acknowledge(self, message=None, sync=True): """ Acknowledge the given L{Message}. If message is None, then all @@ -465,29 +468,8 @@ class Session: """ for rcv in self.receivers: rcv.stop() - # TODO: think about stopping individual receivers in listen mode - self._wait(lambda: self._peek(self._pred) is None) self.started = False - def _pred(self, m): - return m._receiver.listener is not None - - @synchronized - def run(self): - self.running = True - try: - while True: - msg = self._get(self._pred) - if msg is None: - break; - else: - msg._receiver.listener(msg) - if self._peek(self._pred) is None: - self.connection._waiter.notifyAll() - finally: - self.running = False - self.connection._waiter.notifyAll() - @synchronized def close(self): """ @@ -498,10 +480,7 @@ class Session: self.closing = True self._wakeup() - self._ewait(lambda: self.closed and not self.running) - while self.thread.isAlive(): - self.thread.join(3) - self.thread = None + self._ewait(lambda: self.closed) # XXX: should be able to express this condition through API calls self._ewait(lambda: not self.outgoing and not self.acked) self.connection._remove_session(self) @@ -636,8 +615,7 @@ class Receiver: """ Receives incoming messages from a remote source. Messages may be - actively fetched with L{fetch} or a listener may be installed with - L{listen}. + fetched with L{fetch}. """ def __init__(self, session, index, source, options, started): @@ -659,7 +637,6 @@ class Receiver: self.linked = False self.closing = False self.closed = False - self.listener = None self._lock = self.session._lock def _wakeup(self): @@ -694,16 +671,6 @@ class Receiver: else: return self.capacity - @synchronized - def listen(self, listener=None): - """ - Sets the message listener for this receiver. - - @type listener: callable - @param listener: a callable object to be notified on message arrival - """ - self.listener = listener - def _pred(self, msg): return msg._receiver == self |
