From 75cd78bc5af13d6ddddd07cc1ff7f36d0807eed4 Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Tue, 2 Jun 2009 20:49:42 +0000 Subject: added receiver capacity and tests git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@781161 13f79535-47bb-0310-9956-ffa450edef68 --- python/qpid/messaging.py | 34 +++++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 7 deletions(-) (limited to 'python/qpid/messaging.py') diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index 931784024e..84331134a9 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -90,6 +90,17 @@ def default(value, default): AMQP_PORT = 5672 AMQPS_PORT = 5671 +class Constant: + + def __init__(self, name, value=None): + self.name = name + self.value = value + + def __repr__(self): + return self.name + +UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL) + class Connection(Lockable): """ @@ -622,6 +633,7 @@ class Receiver(Lockable): self.source = source self.filter = filter self.started = started + self.capacity = UNLIMITED self.closed = False self.listener = None self._ssn = None @@ -658,6 +670,14 @@ class Receiver(Lockable): def pending(self): return self.session._count(self._pred) + def _capacity(self): + if not self.started: + return 0 + elif self.capacity is UNLIMITED: + return self.capacity.value + else: + return self.capacity + @synchronized def listen(self, listener=None): """ @@ -681,14 +701,14 @@ class Receiver(Lockable): @type timeout: float @param timeout: the time to wait for a message to be available """ - self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, - 0xFFFFFFFFL) - self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 1) + if self.capacity is not UNLIMITED or not self.started: + self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, + UNLIMITED.value) + self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 1) msg = self.session._get(self._pred, timeout=timeout) if msg is None: self._ssn.message_flush(self.destination) - self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, - 0xFFFFFFFFL, sync=True) + self._start() self._ssn.sync() msg = self.session._get(self._pred, timeout=0) if msg is None: @@ -696,8 +716,8 @@ class Receiver(Lockable): return msg def _start(self): - self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, 0xFFFFFFFFL) - self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 0xFFFFFFFFL) + self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, UNLIMITED.value) + self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, self._capacity()) @synchronized def start(self): -- cgit v1.2.1