summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-04-09 13:52:49 +0000
committerRafael H. Schloming <rhs@apache.org>2010-04-09 13:52:49 +0000
commitf7b7338f798cb9f1ffcbd6b4bec83fd7b08a3e57 (patch)
tree402ddbb701457890712e9c8e1856a2fef9375804
parent92c8f33b363ac2b55ce0f1498d86cc12a09981a2 (diff)
downloadqpid-python-f7b7338f798cb9f1ffcbd6b4bec83fd7b08a3e57.tar.gz
Sender.pending() -> Sender.unsettled(); Receiver.pending() -> Receiver.available(); added Sender.available() and Receiver.unsettled()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@932415 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/qpid/messaging/endpoints.py20
-rw-r--r--qpid/python/qpid/tests/messaging/__init__.py4
-rw-r--r--qpid/python/qpid/tests/messaging/endpoints.py44
3 files changed, 42 insertions, 26 deletions
diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py
index be657173fe..3f5cf3b9bd 100644
--- a/qpid/python/qpid/messaging/endpoints.py
+++ b/qpid/python/qpid/messaging/endpoints.py
@@ -717,7 +717,7 @@ class Sender:
return result
@synchronized
- def pending(self):
+ def unsettled(self):
"""
Returns the number of messages awaiting acknowledgment.
@rtype: int
@@ -726,6 +726,13 @@ class Sender:
return self.queued - self.acked
@synchronized
+ def available(self):
+ if self.capacity is UNLIMITED:
+ return UNLIMITED
+ else:
+ return self.capacity - self.unsettled()
+
+ @synchronized
def send(self, object, sync=True, timeout=None):
"""
Send a message. If the object passed in is of type L{unicode},
@@ -763,7 +770,7 @@ class Sender:
if self.capacity is not UNLIMITED:
if self.capacity <= 0:
raise InsufficientCapacity("capacity = %s" % self.capacity)
- if not self._ewait(lambda: self.pending() < self.capacity, timeout=timeout):
+ if not self._ewait(self.available, timeout=timeout):
raise InsufficientCapacity("capacity = %s" % self.capacity)
# XXX: what if we send the same message to multiple senders?
@@ -853,7 +860,14 @@ class Receiver(object):
return result
@synchronized
- def pending(self):
+ def unsettled(self):
+ """
+ Returns the number of acknowledged messages awaiting confirmation.
+ """
+ return len([m for m in self.acked if m._receiver is self])
+
+ @synchronized
+ def available(self):
"""
Returns the number of messages available to be fetched by the
application.
diff --git a/qpid/python/qpid/tests/messaging/__init__.py b/qpid/python/qpid/tests/messaging/__init__.py
index 42e88dfd72..147dbb8de5 100644
--- a/qpid/python/qpid/tests/messaging/__init__.py
+++ b/qpid/python/qpid/tests/messaging/__init__.py
@@ -124,8 +124,8 @@ class Base(Test):
contents = self.drain(rcv)
assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents)
- def assertPending(self, rcv, expected):
- p = rcv.pending()
+ def assertAvailable(self, rcv, expected):
+ p = rcv.available()
assert p == expected, "expected %s, got %s" % (expected, p)
def sleep(self):
diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py
index 320f88aa3b..c46a3f6fa9 100644
--- a/qpid/python/qpid/tests/messaging/endpoints.py
+++ b/qpid/python/qpid/tests/messaging/endpoints.py
@@ -271,7 +271,7 @@ class SessionTests(Base):
while True:
rcv = self.ssn.next_receiver(timeout=self.delay())
assert rcv in (rcv1, rcv2, rcv3)
- assert rcv.pending() > 0
+ assert rcv.available() > 0
fetched.append(rcv.fetch().content)
except Empty:
pass
@@ -540,20 +540,20 @@ class ReceiverTests(Base):
def testCapacityIncrease(self):
content = self.send("testCapacityIncrease")
self.sleep()
- assert self.rcv.pending() == 0
+ assert self.rcv.available() == 0
self.rcv.capacity = UNLIMITED
self.sleep()
- assert self.rcv.pending() == 1
+ assert self.rcv.available() == 1
msg = self.rcv.fetch(0)
assert msg.content == content
- assert self.rcv.pending() == 0
+ assert self.rcv.available() == 0
self.ssn.acknowledge()
def testCapacityDecrease(self):
self.rcv.capacity = UNLIMITED
one = self.send("testCapacityDecrease", 1)
self.sleep()
- assert self.rcv.pending() == 1
+ assert self.rcv.available() == 1
msg = self.rcv.fetch(0)
assert msg.content == one
@@ -561,7 +561,7 @@ class ReceiverTests(Base):
two = self.send("testCapacityDecrease", 2)
self.sleep()
- assert self.rcv.pending() == 0
+ assert self.rcv.available() == 0
msg = self.rcv.fetch(0)
assert msg.content == two
@@ -569,56 +569,56 @@ class ReceiverTests(Base):
def testCapacity(self):
self.rcv.capacity = 5
- self.assertPending(self.rcv, 0)
+ self.assertAvailable(self.rcv, 0)
for i in range(15):
self.send("testCapacity", i)
self.sleep()
- self.assertPending(self.rcv, 5)
+ self.assertAvailable(self.rcv, 5)
self.drain(self.rcv, limit = 5)
self.sleep()
- self.assertPending(self.rcv, 5)
+ self.assertAvailable(self.rcv, 5)
drained = self.drain(self.rcv)
assert len(drained) == 10, "%s, %s" % (len(drained), drained)
- self.assertPending(self.rcv, 0)
+ self.assertAvailable(self.rcv, 0)
self.ssn.acknowledge()
def testCapacityUNLIMITED(self):
self.rcv.capacity = UNLIMITED
- self.assertPending(self.rcv, 0)
+ self.assertAvailable(self.rcv, 0)
for i in range(10):
self.send("testCapacityUNLIMITED", i)
self.sleep()
- self.assertPending(self.rcv, 10)
+ self.assertAvailable(self.rcv, 10)
self.drain(self.rcv)
- self.assertPending(self.rcv, 0)
+ self.assertAvailable(self.rcv, 0)
self.ssn.acknowledge()
- def testPending(self):
+ def testAvailable(self):
self.rcv.capacity = UNLIMITED
- assert self.rcv.pending() == 0
+ assert self.rcv.available() == 0
for i in range(3):
- self.send("testPending", i)
+ self.send("testAvailable", i)
self.sleep()
- assert self.rcv.pending() == 3
+ assert self.rcv.available() == 3
for i in range(3, 10):
- self.send("testPending", i)
+ self.send("testAvailable", i)
self.sleep()
- assert self.rcv.pending() == 10
+ assert self.rcv.available() == 10
self.drain(self.rcv, limit=3)
- assert self.rcv.pending() == 7
+ assert self.rcv.available() == 7
self.drain(self.rcv)
- assert self.rcv.pending() == 0
+ assert self.rcv.available() == 0
self.ssn.acknowledge()
@@ -662,6 +662,8 @@ class ReceiverTests(Base):
self.assertEmpty(rb2)
self.drain(self.rcv, expected=[])
+ # XXX: need testUnsettled()
+
class AddressTests(Base):
def setup_connection(self):