diff options
author | Gordon Sim <gsim@apache.org> | 2011-11-23 16:01:25 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2011-11-23 16:01:25 +0000 |
commit | c8f2e652489ca0fab6b4ce15416b16afdcd6b556 (patch) | |
tree | a434a0aaf73630fafebd5038e112d7e4efd7f5bc /tests | |
parent | ceca56258e7d3b8aabd2eadbd00857eb69e825f5 (diff) | |
download | qpid-python-c8f2e652489ca0fab6b4ce15416b16afdcd6b556.tar.gz |
QPID-3629: Changed management of credit window
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1205467 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'tests')
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/message.py | 78 |
1 files changed, 78 insertions, 0 deletions
diff --git a/tests/src/py/qpid_tests/broker_0_10/message.py b/tests/src/py/qpid_tests/broker_0_10/message.py index 6c864bcd13..204b6ebd23 100644 --- a/tests/src/py/qpid_tests/broker_0_10/message.py +++ b/tests/src/py/qpid_tests/broker_0_10/message.py @@ -611,6 +611,84 @@ class MessageTests(TestBase010): msg = q.get(timeout = 1) self.assertDataEquals(session, msg, "Message %d" % (i+6)) + def test_credit_window_after_messagestop(self): + """ + Tests that the broker's credit window size doesnt exceed the requested value when completing + previous messageTransfer commands after a message_stop and message_flow. + """ + + session = self.session + + #create queue + session.queue_declare(queue = self.test_queue_name, exclusive=True, auto_delete=True) + + #send 11 messages + for i in range(1, 12): + session.message_transfer(message=Message(session.delivery_properties(routing_key=self.test_queue_name), "message-%d" % (i))) + + + #subscribe: + session.message_subscribe(queue=self.test_queue_name, destination="a") + a = session.incoming("a") + session.message_set_flow_mode(flow_mode = 1, destination = "a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + # issue 5 message credits + session.message_flow(unit = session.credit_unit.message, value = 5, destination = "a") + + # get 5 messages + ids = RangedSet() + for i in range(1, 6): + msg = a.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.body) + ids.add(msg.id) + + # now try and read a 6th message. we expect this to fail due to exhausted message credit. + try: + extra = a.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + session.message_stop(destination = "a") + + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 5, destination = "a") + + # complete earlier messages after setting the window to 5 message credits + session.channel.session_completed(ids) + + # Now continue to read the next 5 messages + for i in range(6, 11): + msg = a.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.body) + + # now try and read the 11th message. we expect this to fail due to exhausted message credit. If we receive an + # 11th this indicates the broker is not respecting the client's requested window size. + try: + extra = a.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + def test_no_credit_wrap(self): + """ + Ensure that adding credit does not result in wrapround, lowering the balance. + """ + session = self.session + + session.queue_declare(queue = self.test_queue_name, exclusive=True, auto_delete=True) + session.message_subscribe(queue=self.test_queue_name, destination="a") + a = session.incoming("a") + session.message_set_flow_mode(flow_mode = session.flow_mode.credit, destination = "a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFAL, destination = "a") + #test wraparound of credit balance does not occur + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") + for i in range(1, 50): + session.message_transfer(message=Message(session.delivery_properties(routing_key=self.test_queue_name), "message-%d" % (i))) + session.message_flush(destination = "a") + for i in range(1, 50): + msg = a.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.body) + def test_subscribe_not_acquired(self): """ |