diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2010-04-09 13:52:49 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2010-04-09 13:52:49 +0000 |
| commit | f7b7338f798cb9f1ffcbd6b4bec83fd7b08a3e57 (patch) | |
| tree | 402ddbb701457890712e9c8e1856a2fef9375804 | |
| parent | 92c8f33b363ac2b55ce0f1498d86cc12a09981a2 (diff) | |
| download | qpid-python-f7b7338f798cb9f1ffcbd6b4bec83fd7b08a3e57.tar.gz | |
Sender.pending() -> Sender.unsettled(); Receiver.pending() -> Receiver.available(); added Sender.available() and Receiver.unsettled()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@932415 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/python/qpid/messaging/endpoints.py | 20 | ||||
| -rw-r--r-- | qpid/python/qpid/tests/messaging/__init__.py | 4 | ||||
| -rw-r--r-- | qpid/python/qpid/tests/messaging/endpoints.py | 44 |
3 files changed, 42 insertions, 26 deletions
diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py index be657173fe..3f5cf3b9bd 100644 --- a/qpid/python/qpid/messaging/endpoints.py +++ b/qpid/python/qpid/messaging/endpoints.py @@ -717,7 +717,7 @@ class Sender: return result @synchronized - def pending(self): + def unsettled(self): """ Returns the number of messages awaiting acknowledgment. @rtype: int @@ -726,6 +726,13 @@ class Sender: return self.queued - self.acked @synchronized + def available(self): + if self.capacity is UNLIMITED: + return UNLIMITED + else: + return self.capacity - self.unsettled() + + @synchronized def send(self, object, sync=True, timeout=None): """ Send a message. If the object passed in is of type L{unicode}, @@ -763,7 +770,7 @@ class Sender: if self.capacity is not UNLIMITED: if self.capacity <= 0: raise InsufficientCapacity("capacity = %s" % self.capacity) - if not self._ewait(lambda: self.pending() < self.capacity, timeout=timeout): + if not self._ewait(self.available, timeout=timeout): raise InsufficientCapacity("capacity = %s" % self.capacity) # XXX: what if we send the same message to multiple senders? @@ -853,7 +860,14 @@ class Receiver(object): return result @synchronized - def pending(self): + def unsettled(self): + """ + Returns the number of acknowledged messages awaiting confirmation. + """ + return len([m for m in self.acked if m._receiver is self]) + + @synchronized + def available(self): """ Returns the number of messages available to be fetched by the application. diff --git a/qpid/python/qpid/tests/messaging/__init__.py b/qpid/python/qpid/tests/messaging/__init__.py index 42e88dfd72..147dbb8de5 100644 --- a/qpid/python/qpid/tests/messaging/__init__.py +++ b/qpid/python/qpid/tests/messaging/__init__.py @@ -124,8 +124,8 @@ class Base(Test): contents = self.drain(rcv) assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents) - def assertPending(self, rcv, expected): - p = rcv.pending() + def assertAvailable(self, rcv, expected): + p = rcv.available() assert p == expected, "expected %s, got %s" % (expected, p) def sleep(self): diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py index 320f88aa3b..c46a3f6fa9 100644 --- a/qpid/python/qpid/tests/messaging/endpoints.py +++ b/qpid/python/qpid/tests/messaging/endpoints.py @@ -271,7 +271,7 @@ class SessionTests(Base): while True: rcv = self.ssn.next_receiver(timeout=self.delay()) assert rcv in (rcv1, rcv2, rcv3) - assert rcv.pending() > 0 + assert rcv.available() > 0 fetched.append(rcv.fetch().content) except Empty: pass @@ -540,20 +540,20 @@ class ReceiverTests(Base): def testCapacityIncrease(self): content = self.send("testCapacityIncrease") self.sleep() - assert self.rcv.pending() == 0 + assert self.rcv.available() == 0 self.rcv.capacity = UNLIMITED self.sleep() - assert self.rcv.pending() == 1 + assert self.rcv.available() == 1 msg = self.rcv.fetch(0) assert msg.content == content - assert self.rcv.pending() == 0 + assert self.rcv.available() == 0 self.ssn.acknowledge() def testCapacityDecrease(self): self.rcv.capacity = UNLIMITED one = self.send("testCapacityDecrease", 1) self.sleep() - assert self.rcv.pending() == 1 + assert self.rcv.available() == 1 msg = self.rcv.fetch(0) assert msg.content == one @@ -561,7 +561,7 @@ class ReceiverTests(Base): two = self.send("testCapacityDecrease", 2) self.sleep() - assert self.rcv.pending() == 0 + assert self.rcv.available() == 0 msg = self.rcv.fetch(0) assert msg.content == two @@ -569,56 +569,56 @@ class ReceiverTests(Base): def testCapacity(self): self.rcv.capacity = 5 - self.assertPending(self.rcv, 0) + self.assertAvailable(self.rcv, 0) for i in range(15): self.send("testCapacity", i) self.sleep() - self.assertPending(self.rcv, 5) + self.assertAvailable(self.rcv, 5) self.drain(self.rcv, limit = 5) self.sleep() - self.assertPending(self.rcv, 5) + self.assertAvailable(self.rcv, 5) drained = self.drain(self.rcv) assert len(drained) == 10, "%s, %s" % (len(drained), drained) - self.assertPending(self.rcv, 0) + self.assertAvailable(self.rcv, 0) self.ssn.acknowledge() def testCapacityUNLIMITED(self): self.rcv.capacity = UNLIMITED - self.assertPending(self.rcv, 0) + self.assertAvailable(self.rcv, 0) for i in range(10): self.send("testCapacityUNLIMITED", i) self.sleep() - self.assertPending(self.rcv, 10) + self.assertAvailable(self.rcv, 10) self.drain(self.rcv) - self.assertPending(self.rcv, 0) + self.assertAvailable(self.rcv, 0) self.ssn.acknowledge() - def testPending(self): + def testAvailable(self): self.rcv.capacity = UNLIMITED - assert self.rcv.pending() == 0 + assert self.rcv.available() == 0 for i in range(3): - self.send("testPending", i) + self.send("testAvailable", i) self.sleep() - assert self.rcv.pending() == 3 + assert self.rcv.available() == 3 for i in range(3, 10): - self.send("testPending", i) + self.send("testAvailable", i) self.sleep() - assert self.rcv.pending() == 10 + assert self.rcv.available() == 10 self.drain(self.rcv, limit=3) - assert self.rcv.pending() == 7 + assert self.rcv.available() == 7 self.drain(self.rcv) - assert self.rcv.pending() == 0 + assert self.rcv.available() == 0 self.ssn.acknowledge() @@ -662,6 +662,8 @@ class ReceiverTests(Base): self.assertEmpty(rb2) self.drain(self.rcv, expected=[]) + # XXX: need testUnsettled() + class AddressTests(Base): def setup_connection(self): |
