From bc25543ddcb44c4389025d8470751c0c790a9bbe Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Tue, 2 Jun 2009 19:25:57 +0000 Subject: modified start and stop to function independently of fetch vs listen, added Receiver.pending() and added tests for Receiver.start(), Receiver.stop(), and Receiver.pending() git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@781132 13f79535-47bb-0310-9956-ffa450edef68 --- python/qpid/messaging.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) (limited to 'python/qpid/messaging.py') diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index 7063dfe684..931784024e 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -365,6 +365,14 @@ class Session(Lockable): receiver._link() return receiver + @synchronized + def _count(self, predicate): + result = 0 + for msg in self.incoming: + if predicate(msg): + result += 1 + return result + def _peek(self, predicate): for msg in self.incoming: if predicate(msg): @@ -646,6 +654,10 @@ class Receiver(Lockable): def _disconnected(self): self._ssn = None + @synchronized + def pending(self): + return self.session._count(self._pred) + @synchronized def listen(self, listener=None): """ @@ -655,13 +667,6 @@ class Receiver(Lockable): @param listener: a callable object to be notified on message arrival """ self.listener = listener - if self.listener is None: - self._ssn.message_stop(self.destination) - self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, 0xFFFFFFFFL, - sync=True) - self._ssn.sync() - else: - self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 0xFFFFFFFFL) def _pred(self, msg): return msg._receiver == self @@ -692,8 +697,7 @@ class Receiver(Lockable): def _start(self): self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, 0xFFFFFFFFL) - if self.listener is not None: - self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 0xFFFFFFFFL) + self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 0xFFFFFFFFL) @synchronized def start(self): -- cgit v1.2.1