diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/message.py | 78 |
1 files changed, 78 insertions, 0 deletions
diff --git a/tests/src/py/qpid_tests/broker_0_10/message.py b/tests/src/py/qpid_tests/broker_0_10/message.py index 6c864bcd13..204b6ebd23 100644 --- a/tests/src/py/qpid_tests/broker_0_10/message.py +++ b/tests/src/py/qpid_tests/broker_0_10/message.py @@ -611,6 +611,84 @@ class MessageTests(TestBase010): msg = q.get(timeout = 1) self.assertDataEquals(session, msg, "Message %d" % (i+6)) + def test_credit_window_after_messagestop(self): + """ + Tests that the broker's credit window size doesnt exceed the requested value when completing + previous messageTransfer commands after a message_stop and message_flow. + """ + + session = self.session + + #create queue + session.queue_declare(queue = self.test_queue_name, exclusive=True, auto_delete=True) + + #send 11 messages + for i in range(1, 12): + session.message_transfer(message=Message(session.delivery_properties(routing_key=self.test_queue_name), "message-%d" % (i))) + + + #subscribe: + session.message_subscribe(queue=self.test_queue_name, destination="a") + a = session.incoming("a") + session.message_set_flow_mode(flow_mode = 1, destination = "a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + # issue 5 message credits + session.message_flow(unit = session.credit_unit.message, value = 5, destination = "a") + + # get 5 messages + ids = RangedSet() + for i in range(1, 6): + msg = a.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.body) + ids.add(msg.id) + + # now try and read a 6th message. we expect this to fail due to exhausted message credit. + try: + extra = a.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + session.message_stop(destination = "a") + + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 5, destination = "a") + + # complete earlier messages after setting the window to 5 message credits + session.channel.session_completed(ids) + + # Now continue to read the next 5 messages + for i in range(6, 11): + msg = a.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.body) + + # now try and read the 11th message. we expect this to fail due to exhausted message credit. If we receive an + # 11th this indicates the broker is not respecting the client's requested window size. + try: + extra = a.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + def test_no_credit_wrap(self): + """ + Ensure that adding credit does not result in wrapround, lowering the balance. + """ + session = self.session + + session.queue_declare(queue = self.test_queue_name, exclusive=True, auto_delete=True) + session.message_subscribe(queue=self.test_queue_name, destination="a") + a = session.incoming("a") + session.message_set_flow_mode(flow_mode = session.flow_mode.credit, destination = "a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFAL, destination = "a") + #test wraparound of credit balance does not occur + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") + for i in range(1, 50): + session.message_transfer(message=Message(session.delivery_properties(routing_key=self.test_queue_name), "message-%d" % (i))) + session.message_flush(destination = "a") + for i in range(1, 50): + msg = a.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.body) + def test_subscribe_not_acquired(self): """ |