summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/qpid/messaging.py34
-rw-r--r--python/qpid/tests/messaging.py55
2 files changed, 70 insertions, 19 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py
index 931784024e..84331134a9 100644
--- a/python/qpid/messaging.py
+++ b/python/qpid/messaging.py
@@ -90,6 +90,17 @@ def default(value, default):
AMQP_PORT = 5672
AMQPS_PORT = 5671
+class Constant:
+
+ def __init__(self, name, value=None):
+ self.name = name
+ self.value = value
+
+ def __repr__(self):
+ return self.name
+
+UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL)
+
class Connection(Lockable):
"""
@@ -622,6 +633,7 @@ class Receiver(Lockable):
self.source = source
self.filter = filter
self.started = started
+ self.capacity = UNLIMITED
self.closed = False
self.listener = None
self._ssn = None
@@ -658,6 +670,14 @@ class Receiver(Lockable):
def pending(self):
return self.session._count(self._pred)
+ def _capacity(self):
+ if not self.started:
+ return 0
+ elif self.capacity is UNLIMITED:
+ return self.capacity.value
+ else:
+ return self.capacity
+
@synchronized
def listen(self, listener=None):
"""
@@ -681,14 +701,14 @@ class Receiver(Lockable):
@type timeout: float
@param timeout: the time to wait for a message to be available
"""
- self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte,
- 0xFFFFFFFFL)
- self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 1)
+ if self.capacity is not UNLIMITED or not self.started:
+ self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte,
+ UNLIMITED.value)
+ self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 1)
msg = self.session._get(self._pred, timeout=timeout)
if msg is None:
self._ssn.message_flush(self.destination)
- self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte,
- 0xFFFFFFFFL, sync=True)
+ self._start()
self._ssn.sync()
msg = self.session._get(self._pred, timeout=0)
if msg is None:
@@ -696,8 +716,8 @@ class Receiver(Lockable):
return msg
def _start(self):
- self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, 0xFFFFFFFFL)
- self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 0xFFFFFFFFL)
+ self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, UNLIMITED.value)
+ self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, self._capacity())
@synchronized
def start(self):
diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py
index ef82d87f13..fea61f766b 100644
--- a/python/qpid/tests/messaging.py
+++ b/python/qpid/tests/messaging.py
@@ -22,7 +22,7 @@
import time
from qpid.tests import Test
-from qpid.messaging import Connection, Disconnected, Empty, Message, uuid4
+from qpid.messaging import Connection, Disconnected, Empty, Message, UNLIMITED, uuid4
from Queue import Queue, Empty as QueueEmpty
class Base(Test):
@@ -69,6 +69,9 @@ class Base(Test):
pass
return msgs
+ def delay(self):
+ time.sleep(2)
+
class SetupTests(Base):
def testOpen(self):
@@ -107,7 +110,6 @@ class ConnectionTests(Base):
ssn = self.conn.session()
self.ping(ssn)
self.conn.disconnect()
- import socket
try:
self.ping(ssn)
assert False, "ping succeeded"
@@ -296,10 +298,10 @@ class ReceiverTests(Base):
def testStart(self):
content = self.send("testStart")
- time.sleep(2)
+ self.delay()
assert self.rcv.pending() == 0
self.rcv.start()
- time.sleep(2)
+ self.delay()
assert self.rcv.pending() == 1
msg = self.rcv.fetch(0)
assert msg.content == content
@@ -309,7 +311,7 @@ class ReceiverTests(Base):
def testStop(self):
self.rcv.start()
one = self.send("testStop", 1)
- time.sleep(2)
+ self.delay()
assert self.rcv.pending() == 1
msg = self.rcv.fetch(0)
assert msg.content == one
@@ -317,7 +319,7 @@ class ReceiverTests(Base):
self.rcv.stop()
two = self.send("testStop", 2)
- time.sleep(2)
+ self.delay()
assert self.rcv.pending() == 0
msg = self.rcv.fetch(0)
assert msg.content == two
@@ -326,27 +328,56 @@ class ReceiverTests(Base):
def testPending(self):
self.rcv.start()
-
assert self.rcv.pending() == 0
for i in range(3):
self.send("testPending", i)
- time.sleep(2)
-
+ self.delay()
assert self.rcv.pending() == 3
for i in range(3, 10):
self.send("testPending", i)
- time.sleep(2)
-
+ self.delay()
assert self.rcv.pending() == 10
self.drain(self.rcv, limit=3)
-
assert self.rcv.pending() == 7
self.drain(self.rcv)
+ assert self.rcv.pending() == 0
+
+ self.ssn.acknowledge()
+ def testCapacity(self):
+ self.rcv.capacity = 5
+ self.rcv.start()
+ assert self.rcv.pending() == 0
+
+ for i in range(15):
+ self.send("testCapacity", i)
+ self.delay()
+ assert self.rcv.pending() == 5
+
+ self.drain(self.rcv, limit = 5)
+ self.delay()
+ assert self.rcv.pending() == 5
+
+ self.drain(self.rcv)
+ assert self.rcv.pending() == 0
+
+ self.ssn.acknowledge()
+
+ def testCapacityUNLIMITED(self):
+ self.rcv.capacity = UNLIMITED
+ self.rcv.start()
+ assert self.rcv.pending() == 0
+
+ for i in range(10):
+ self.send("testCapacityUNLIMITED", i)
+ self.delay()
+ assert self.rcv.pending() == 10
+
+ self.drain(self.rcv)
assert self.rcv.pending() == 0
self.ssn.acknowledge()