summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/message.py78
1 files changed, 78 insertions, 0 deletions
diff --git a/tests/src/py/qpid_tests/broker_0_10/message.py b/tests/src/py/qpid_tests/broker_0_10/message.py
index 6c864bcd13..204b6ebd23 100644
--- a/tests/src/py/qpid_tests/broker_0_10/message.py
+++ b/tests/src/py/qpid_tests/broker_0_10/message.py
@@ -611,6 +611,84 @@ class MessageTests(TestBase010):
msg = q.get(timeout = 1)
self.assertDataEquals(session, msg, "Message %d" % (i+6))
+ def test_credit_window_after_messagestop(self):
+ """
+ Tests that the broker's credit window size doesnt exceed the requested value when completing
+ previous messageTransfer commands after a message_stop and message_flow.
+ """
+
+ session = self.session
+
+ #create queue
+ session.queue_declare(queue = self.test_queue_name, exclusive=True, auto_delete=True)
+
+ #send 11 messages
+ for i in range(1, 12):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key=self.test_queue_name), "message-%d" % (i)))
+
+
+ #subscribe:
+ session.message_subscribe(queue=self.test_queue_name, destination="a")
+ a = session.incoming("a")
+ session.message_set_flow_mode(flow_mode = 1, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+ # issue 5 message credits
+ session.message_flow(unit = session.credit_unit.message, value = 5, destination = "a")
+
+ # get 5 messages
+ ids = RangedSet()
+ for i in range(1, 6):
+ msg = a.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.body)
+ ids.add(msg.id)
+
+ # now try and read a 6th message. we expect this to fail due to exhausted message credit.
+ try:
+ extra = a.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.body)
+ except Empty: None
+
+ session.message_stop(destination = "a")
+
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 5, destination = "a")
+
+ # complete earlier messages after setting the window to 5 message credits
+ session.channel.session_completed(ids)
+
+ # Now continue to read the next 5 messages
+ for i in range(6, 11):
+ msg = a.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.body)
+
+ # now try and read the 11th message. we expect this to fail due to exhausted message credit. If we receive an
+ # 11th this indicates the broker is not respecting the client's requested window size.
+ try:
+ extra = a.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.body)
+ except Empty: None
+
+ def test_no_credit_wrap(self):
+ """
+ Ensure that adding credit does not result in wrapround, lowering the balance.
+ """
+ session = self.session
+
+ session.queue_declare(queue = self.test_queue_name, exclusive=True, auto_delete=True)
+ session.message_subscribe(queue=self.test_queue_name, destination="a")
+ a = session.incoming("a")
+ session.message_set_flow_mode(flow_mode = session.flow_mode.credit, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFAL, destination = "a")
+ #test wraparound of credit balance does not occur
+ session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
+ for i in range(1, 50):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key=self.test_queue_name), "message-%d" % (i)))
+ session.message_flush(destination = "a")
+ for i in range(1, 50):
+ msg = a.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.body)
+
def test_subscribe_not_acquired(self):
"""