diff options
| author | Gordon Sim <gsim@apache.org> | 2008-03-07 13:20:02 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-03-07 13:20:02 +0000 |
| commit | 5d8a9df4ec3a4f030ed80e143ce6986c19ab800a (patch) | |
| tree | 8417c3abe9dd81e6a73084aa36371981e06f9e27 /python/tests_0-10 | |
| parent | 9fd4909832e16734c47c13eebbe4aca66640b1b0 (diff) | |
| download | qpid-python-5d8a9df4ec3a4f030ed80e143ce6986c19ab800a.tar.gz | |
Altered management of delivery records to support separateion of completion (which drives flow control) and acceptance.
Converted flow control python tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@634661 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/tests_0-10')
| -rw-r--r-- | python/tests_0-10/message.py | 60 | ||||
| -rw-r--r-- | python/tests_0-10/queue.py | 62 |
2 files changed, 63 insertions, 59 deletions
diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py index 5f97d6c705..64e2bc44c4 100644 --- a/python/tests_0-10/message.py +++ b/python/tests_0-10/message.py @@ -464,10 +464,10 @@ class MessageTests(TestBase010): session.queue_declare(queue = "q", exclusive=True, auto_delete=True) #create consumer (for now that defaults to infinite credit) session.message_subscribe(queue = "q", destination = "c") - session.message_flow_mode(mode = 0, destination = "c") + session.message_set_flow_mode(flow_mode = 0, destination = "c") #send batch of messages to queue for i in range(1, 11): - session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i)) #set message credit to finite amount (less than enough for all messages) session.message_flow(unit = 0, value = 5, destination = "c") @@ -494,13 +494,13 @@ class MessageTests(TestBase010): session.queue_declare(queue = "q", exclusive=True, auto_delete=True) #create consumer (for now that defaults to infinite credit) session.message_subscribe(queue = "q", destination = "c") - session.message_flow_mode(mode = 0, destination = "c") + session.message_set_flow_mode(flow_mode = 0, destination = "c") #send batch of messages to queue for i in range(10): - session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh")) #each message is currently interpreted as requiring msg_size bytes of credit - msg_size = 35 + msg_size = 21 #set byte credit to finite amount (less than enough for all messages) session.message_flow(unit = 1, value = msg_size*5, destination = "c") @@ -527,11 +527,11 @@ class MessageTests(TestBase010): session = self.session session.queue_declare(queue = "q", exclusive=True, auto_delete=True) #create consumer (for now that defaults to infinite credit) - session.message_subscribe(queue = "q", destination = "c", confirm_mode = 1) - session.message_flow_mode(mode = 1, destination = "c") + session.message_subscribe(queue = "q", destination = "c") + session.message_set_flow_mode(flow_mode = 1, destination = "c") #send batch of messages to queue for i in range(1, 11): - session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i)) #set message credit to finite amount (less than enough for all messages) session.message_flow(unit = 0, value = 5, destination = "c") @@ -539,13 +539,16 @@ class MessageTests(TestBase010): session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") #check that expected number were received q = session.incoming("c") - for i in range(1, 6): + for i in range(1, 6): msg = q.get(timeout = 1) + session.receiver._completed.add(msg.id)#TODO: this may be done automatically self.assertDataEquals(session, msg, "Message %d" % i) self.assertEmpty(q) #acknowledge messages and check more are received - msg.complete(cumulative=True) + #TODO: there may be a nicer way of doing this + session.channel.session_completed(session.receiver._completed) + for i in range(6, 11): self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) self.assertEmpty(q) @@ -559,14 +562,14 @@ class MessageTests(TestBase010): session = self.session session.queue_declare(queue = "q", exclusive=True, auto_delete=True) #create consumer (for now that defaults to infinite credit) - session.message_subscribe(queue = "q", destination = "c", confirm_mode = 1) - session.message_flow_mode(mode = 1, destination = "c") + session.message_subscribe(queue = "q", destination = "c") + session.message_set_flow_mode(flow_mode = 1, destination = "c") #send batch of messages to queue for i in range(10): - session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh")) #each message is currently interpreted as requiring msg_size bytes of credit - msg_size = 40 + msg_size = 19 #set byte credit to finite amount (less than enough for all messages) session.message_flow(unit = 1, value = msg_size*5, destination = "c") @@ -584,7 +587,9 @@ class MessageTests(TestBase010): #ack each message individually and check more are received for i in range(5): msg = msgs.pop() - msg.complete(cumulative=False) + #TODO: there may be a nicer way of doing this + session.receiver._completed.add(msg.id) + session.channel.session_completed(session.receiver._completed) self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") self.assertEmpty(q) @@ -595,13 +600,17 @@ class MessageTests(TestBase010): session = self.session session.queue_declare(queue = "q", exclusive=True, auto_delete=True) for i in range(1, 6): - session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i)) + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i)) - self.subscribe(queue = "q", destination = "a", acquire_mode = 1) - self.subscribe(queue = "q", destination = "b", acquire_mode = 1) + session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) + session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "a") + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + session.message_subscribe(queue = "q", destination = "b", acquire_mode = 1) + session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "b") + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") for i in range(6, 11): - session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i)) + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i)) #both subscribers should see all messages qA = session.incoming("a") @@ -610,8 +619,9 @@ class MessageTests(TestBase010): for q in [qA, qB]: msg = q.get(timeout = 1) self.assertEquals("Message %s" % i, msg.body) - msg.complete() + session.receiver._completed.add(msg.id) + session.channel.session_completed(session.receiver._completed) #messages should still be on the queue: self.assertEquals(10, session.queue_query(queue = "q").message_count) @@ -625,7 +635,7 @@ class MessageTests(TestBase010): #use fanout for now: session.exchange_bind(exchange="amq.fanout", queue="q") session.message_transfer(destination="amq.fanout", message=Message("acquire me")) - #session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me")) + #session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me")) session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) session.message_flow(destination="a", unit=0, value=0xFFFFFFFF) @@ -724,11 +734,11 @@ class MessageTests(TestBase010): #publish some messages self.queue_declare(queue = "q", exclusive=True, auto_delete=True) for i in range(1, 11): - session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i))) + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) #consume some of them session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1) - session.message_flow_mode(mode = 0, destination = "a") + session.message_set_flow_mode(flow_mode = 0, destination = "a") session.message_flow(unit = 0, value = 5, destination = "a") session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") @@ -762,7 +772,7 @@ class MessageTests(TestBase010): #publish some messages self.queue_declare(queue = "q", exclusive=True, auto_delete=True) for i in range(1, 11): - session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i))) + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) #create a not-acquired subscriber session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1) @@ -861,7 +871,7 @@ class MessageTests(TestBase010): def assertEmpty(self, queue): try: extra = queue.get(timeout=1) - self.fail("Queue not empty, contains: " + extra.content.body) + self.fail("Queue not empty, contains: " + extra.body) except Empty: None class SizelessContent(Content): diff --git a/python/tests_0-10/queue.py b/python/tests_0-10/queue.py index f192a2af90..b972166325 100644 --- a/python/tests_0-10/queue.py +++ b/python/tests_0-10/queue.py @@ -18,10 +18,10 @@ # from qpid.client import Client, Closed from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase +from qpid.testlib import TestBase010 +from qpid.datatypes import Message -class QueueTests(TestBase): +class QueueTests(TestBase010): """Tests for 'methods' on the amqp queue 'class'""" def test_purge(self): @@ -31,9 +31,9 @@ class QueueTests(TestBase): session = self.session #setup, declare a queue and add some messages to it: session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) - session.message_transfer(content=Content("one", properties={'routing_key':"test-queue"})) - session.message_transfer(content=Content("two", properties={'routing_key':"test-queue"})) - session.message_transfer(content=Content("three", properties={'routing_key':"test-queue"})) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "one")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "two")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "three")) #check that the queue now reports 3 messages: session.queue_declare(queue="test-queue") @@ -46,15 +46,16 @@ class QueueTests(TestBase): self.assertEqual(0, reply.message_count) #send a further message and consume it, ensuring that the other messages are really gone - session.message_transfer(content=Content("four", properties={'routing_key':"test-queue"})) - self.subscribe(queue="test-queue", destination="tag") - queue = self.client.queue("tag") + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "four")) + session.message_subscribe(queue="test-queue", destination="tag") + session.message_flow(destination="tag", unit=0, value=0xFFFFFFFF) + session.message_flow(destination="tag", unit=1, value=0xFFFFFFFF) + queue = session.incoming("tag") msg = queue.get(timeout=1) - self.assertEqual("four", msg.content.body) + self.assertEqual("four", msg.body) #check error conditions (use new sessions): - session = self.client.session(2) - session.session_open() + session = self.conn.session("error-checker") try: #queue specified but doesn't exist: session.queue_purge(queue="invalid-queue") @@ -62,8 +63,7 @@ class QueueTests(TestBase): except Closed, e: self.assertChannelException(404, e.args[0]) - session = self.client.session(3) - session.session_open() + session = self.conn.session("error-checker") try: #queue not specified and none previously declared for channel: session.queue_purge() @@ -71,12 +71,6 @@ class QueueTests(TestBase): except Closed, e: self.assertConnectionException(530, e.args[0]) - #cleanup - other = self.connect() - session = other.session(1) - session.session_open() - session.exchange_delete(exchange="test-exchange") - def test_declare_exclusive(self): """ Test that the exclusive field is honoured in queue.declare @@ -167,32 +161,32 @@ class QueueTests(TestBase): self.subscribe(queue="queue-1", destination="queue-1") self.subscribe(queue="queue-2", destination="queue-2") - queue1 = self.client.queue("queue-1") - queue2 = self.client.queue("queue-2") + queue1 = session.incoming("queue-1") + queue2 = session.incoming("queue-2") session.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) session.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args) #send a message that will match both bindings session.message_transfer(destination=exchange, - content=Content("one", properties={'routing_key':routing_key, 'application_headers':headers})) + message=Message(session.delivery_properties(routing_key=routing_key, application_headers=headers), "one")) #unbind first queue session.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) #send another message session.message_transfer(destination=exchange, - content=Content("two", properties={'routing_key':routing_key, 'application_headers':headers})) + message=Message(session.delivery_properties(routing_key=routing_key, application_headers=headers), "two", )) #check one queue has both messages and the other has only one - self.assertEquals("one", queue1.get(timeout=1).content.body) + self.assertEquals("one", queue1.get(timeout=1).body) try: msg = queue1.get(timeout=1) - self.fail("Got extra message: %s" % msg.content.body) + self.fail("Got extra message: %s" % msg.body) except Empty: pass - self.assertEquals("one", queue2.get(timeout=1).content.body) - self.assertEquals("two", queue2.get(timeout=1).content.body) + self.assertEquals("one", queue2.get(timeout=1).body) + self.assertEquals("two", queue2.get(timeout=1).body) try: msg = queue2.get(timeout=1) self.fail("Got extra message: " + msg) @@ -207,9 +201,9 @@ class QueueTests(TestBase): #straight-forward case: session.queue_declare(queue="delete-me") - session.message_transfer(content=Content("a", properties={'routing_key':"delete-me"})) - session.message_transfer(content=Content("b", properties={'routing_key':"delete-me"})) - session.message_transfer(content=Content("c", properties={'routing_key':"delete-me"})) + session.message_transfer(message=Message("a", session.delivery_properties(routing_key="delete-me"))) + session.message_transfer(message=Message("b", session.delivery_properties(routing_key="delete-me"))) + session.message_transfer(message=Message("c", session.delivery_properties(routing_key="delete-me"))) session.queue_delete(queue="delete-me") #check that it has gone be declaring passively try: @@ -238,7 +232,7 @@ class QueueTests(TestBase): #create a queue and add a message to it (use default binding): session.queue_declare(queue="delete-me-2") session.queue_declare(queue="delete-me-2", passive=True) - session.message_transfer(content=Content("message", properties={'routing_key':"delete-me-2"})) + session.message_transfer(message=Message("message", session.delivery_properties(routing_key="delete-me-2"))) #try to delete, but only if empty: try: @@ -253,9 +247,9 @@ class QueueTests(TestBase): #empty queue: self.subscribe(session, destination="consumer_tag", queue="delete-me-2") - queue = self.client.queue("consumer_tag") + queue = session.incoming("consumer_tag") msg = queue.get(timeout=1) - self.assertEqual("message", msg.content.body) + self.assertEqual("message", msg.body) session.message_cancel(destination="consumer_tag") #retry deletion on empty queue: |
