diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/qpid/messaging.py | 34 | ||||
-rw-r--r-- | python/qpid/tests/messaging.py | 55 |
2 files changed, 70 insertions, 19 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index 931784024e..84331134a9 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -90,6 +90,17 @@ def default(value, default): AMQP_PORT = 5672 AMQPS_PORT = 5671 +class Constant: + + def __init__(self, name, value=None): + self.name = name + self.value = value + + def __repr__(self): + return self.name + +UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL) + class Connection(Lockable): """ @@ -622,6 +633,7 @@ class Receiver(Lockable): self.source = source self.filter = filter self.started = started + self.capacity = UNLIMITED self.closed = False self.listener = None self._ssn = None @@ -658,6 +670,14 @@ class Receiver(Lockable): def pending(self): return self.session._count(self._pred) + def _capacity(self): + if not self.started: + return 0 + elif self.capacity is UNLIMITED: + return self.capacity.value + else: + return self.capacity + @synchronized def listen(self, listener=None): """ @@ -681,14 +701,14 @@ class Receiver(Lockable): @type timeout: float @param timeout: the time to wait for a message to be available """ - self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, - 0xFFFFFFFFL) - self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 1) + if self.capacity is not UNLIMITED or not self.started: + self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, + UNLIMITED.value) + self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 1) msg = self.session._get(self._pred, timeout=timeout) if msg is None: self._ssn.message_flush(self.destination) - self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, - 0xFFFFFFFFL, sync=True) + self._start() self._ssn.sync() msg = self.session._get(self._pred, timeout=0) if msg is None: @@ -696,8 +716,8 @@ class Receiver(Lockable): return msg def _start(self): - self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, 0xFFFFFFFFL) - self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 0xFFFFFFFFL) + self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, UNLIMITED.value) + self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, self._capacity()) @synchronized def start(self): diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py index ef82d87f13..fea61f766b 100644 --- a/python/qpid/tests/messaging.py +++ b/python/qpid/tests/messaging.py @@ -22,7 +22,7 @@ import time from qpid.tests import Test -from qpid.messaging import Connection, Disconnected, Empty, Message, uuid4 +from qpid.messaging import Connection, Disconnected, Empty, Message, UNLIMITED, uuid4 from Queue import Queue, Empty as QueueEmpty class Base(Test): @@ -69,6 +69,9 @@ class Base(Test): pass return msgs + def delay(self): + time.sleep(2) + class SetupTests(Base): def testOpen(self): @@ -107,7 +110,6 @@ class ConnectionTests(Base): ssn = self.conn.session() self.ping(ssn) self.conn.disconnect() - import socket try: self.ping(ssn) assert False, "ping succeeded" @@ -296,10 +298,10 @@ class ReceiverTests(Base): def testStart(self): content = self.send("testStart") - time.sleep(2) + self.delay() assert self.rcv.pending() == 0 self.rcv.start() - time.sleep(2) + self.delay() assert self.rcv.pending() == 1 msg = self.rcv.fetch(0) assert msg.content == content @@ -309,7 +311,7 @@ class ReceiverTests(Base): def testStop(self): self.rcv.start() one = self.send("testStop", 1) - time.sleep(2) + self.delay() assert self.rcv.pending() == 1 msg = self.rcv.fetch(0) assert msg.content == one @@ -317,7 +319,7 @@ class ReceiverTests(Base): self.rcv.stop() two = self.send("testStop", 2) - time.sleep(2) + self.delay() assert self.rcv.pending() == 0 msg = self.rcv.fetch(0) assert msg.content == two @@ -326,27 +328,56 @@ class ReceiverTests(Base): def testPending(self): self.rcv.start() - assert self.rcv.pending() == 0 for i in range(3): self.send("testPending", i) - time.sleep(2) - + self.delay() assert self.rcv.pending() == 3 for i in range(3, 10): self.send("testPending", i) - time.sleep(2) - + self.delay() assert self.rcv.pending() == 10 self.drain(self.rcv, limit=3) - assert self.rcv.pending() == 7 self.drain(self.rcv) + assert self.rcv.pending() == 0 + + self.ssn.acknowledge() + def testCapacity(self): + self.rcv.capacity = 5 + self.rcv.start() + assert self.rcv.pending() == 0 + + for i in range(15): + self.send("testCapacity", i) + self.delay() + assert self.rcv.pending() == 5 + + self.drain(self.rcv, limit = 5) + self.delay() + assert self.rcv.pending() == 5 + + self.drain(self.rcv) + assert self.rcv.pending() == 0 + + self.ssn.acknowledge() + + def testCapacityUNLIMITED(self): + self.rcv.capacity = UNLIMITED + self.rcv.start() + assert self.rcv.pending() == 0 + + for i in range(10): + self.send("testCapacityUNLIMITED", i) + self.delay() + assert self.rcv.pending() == 10 + + self.drain(self.rcv) assert self.rcv.pending() == 0 self.ssn.acknowledge() |