diff options
| -rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_10/message.py | 41 |
1 files changed, 41 insertions, 0 deletions
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/message.py b/qpid/tests/src/py/qpid_tests/broker_0_10/message.py index b46c446833..89ba936b05 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_10/message.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/message.py @@ -508,6 +508,47 @@ class MessageTests(TestBase010): msgB = q.get(timeout=10) + def test_window_stop(self): + """ + Ensure window based flow control reacts to stop correctly + """ + session = self.session + #setup subscriber on a test queue + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + session.message_subscribe(queue = "q", destination = "c") + session.message_set_flow_mode(flow_mode = 1, destination = "c") + session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c") + + + #send batch of messages to queue + for i in range(0, 10): + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % (i+1))) + + #retrieve all delivered messages + q = session.incoming("c") + for i in range(0, 5): + msg = q.get(timeout = 1) + session.receiver._completed.add(msg.id)#TODO: this may be done automatically + self.assertDataEquals(session, msg, "Message %d" % (i+1)) + + session.message_stop(destination = "c") + + #now send completions, normally used to move window forward, + #but after a stop should not do so + session.channel.session_completed(session.receiver._completed) + + #check no more messages are sent + self.assertEmpty(q) + + #re-establish window and check remaining messages + session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c") + for i in range(0, 5): + msg = q.get(timeout = 1) + self.assertDataEquals(session, msg, "Message %d" % (i+6)) + + def test_subscribe_not_acquired(self): """ Test the not-acquired modes works as expected for a simple case |
