diff options
Diffstat (limited to 'python/tests_0-10_preview/message.py')
| -rw-r--r-- | python/tests_0-10_preview/message.py | 834 |
1 files changed, 834 insertions, 0 deletions
diff --git a/python/tests_0-10_preview/message.py b/python/tests_0-10_preview/message.py new file mode 100644 index 0000000000..a3d32bdb2d --- /dev/null +++ b/python/tests_0-10_preview/message.py @@ -0,0 +1,834 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase +from qpid.reference import Reference, ReferenceId + +class MessageTests(TestBase): + """Tests for 'methods' on the amqp message 'class'""" + + def test_consume_no_local(self): + """ + Test that the no_local flag is honoured in the consume method + """ + channel = self.channel + #setup, declare two queues: + channel.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True) + channel.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True) + #establish two consumers one of which excludes delivery of locally sent messages + self.subscribe(destination="local_included", queue="test-queue-1a") + self.subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True) + + #send a message + channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1b"}, body="consume_no_local")) + + #check the queues of the two consumers + excluded = self.client.queue("local_excluded") + included = self.client.queue("local_included") + msg = included.get(timeout=1) + self.assertEqual("consume_no_local", msg.content.body) + try: + excluded.get(timeout=1) + self.fail("Received locally published message though no_local=true") + except Empty: None + + def test_consume_no_local_awkward(self): + + """ + If an exclusive queue gets a no-local delivered to it, that + message could 'block' delivery of subsequent messages or it + could be left on the queue, possibly never being consumed + (this is the case for example in the qpid JMS mapping of + topics). This test excercises a Qpid C++ broker hack that + deletes such messages. + """ + + channel = self.channel + #setup: + channel.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) + #establish consumer which excludes delivery of locally sent messages + self.subscribe(destination="local_excluded", queue="test-queue", no_local=True) + + #send a 'local' message + channel.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="local")) + + #send a non local message + other = self.connect() + channel2 = other.channel(1) + channel2.session_open() + channel2.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="foreign")) + channel2.session_close() + other.close() + + #check that the second message only is delivered + excluded = self.client.queue("local_excluded") + msg = excluded.get(timeout=1) + self.assertEqual("foreign", msg.content.body) + try: + excluded.get(timeout=1) + self.fail("Received extra message") + except Empty: None + #check queue is empty + self.assertEqual(0, channel.queue_query(queue="test-queue").message_count) + + + def test_consume_exclusive(self): + """ + Test that the exclusive flag is honoured in the consume method + """ + channel = self.channel + #setup, declare a queue: + channel.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True) + + #check that an exclusive consumer prevents other consumer being created: + self.subscribe(destination="first", queue="test-queue-2", exclusive=True) + try: + self.subscribe(destination="second", queue="test-queue-2") + self.fail("Expected consume request to fail due to previous exclusive consumer") + except Closed, e: + self.assertChannelException(403, e.args[0]) + + #open new channel and cleanup last consumer: + channel = self.client.channel(2) + channel.session_open() + + #check that an exclusive consumer cannot be created if a consumer already exists: + self.subscribe(channel, destination="first", queue="test-queue-2") + try: + self.subscribe(destination="second", queue="test-queue-2", exclusive=True) + self.fail("Expected exclusive consume request to fail due to previous consumer") + except Closed, e: + self.assertChannelException(403, e.args[0]) + + def test_consume_queue_errors(self): + """ + Test error conditions associated with the queue field of the consume method: + """ + channel = self.channel + try: + #queue specified but doesn't exist: + self.subscribe(queue="invalid-queue", destination="") + self.fail("Expected failure when consuming from non-existent queue") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + channel = self.client.channel(2) + channel.session_open() + try: + #queue not specified and none previously declared for channel: + self.subscribe(channel, queue="", destination="") + self.fail("Expected failure when consuming from unspecified queue") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + + def test_consume_unique_consumers(self): + """ + Ensure unique consumer tags are enforced + """ + channel = self.channel + #setup, declare a queue: + channel.queue_declare(queue="test-queue-3", exclusive=True, auto_delete=True) + + #check that attempts to use duplicate tags are detected and prevented: + self.subscribe(destination="first", queue="test-queue-3") + try: + self.subscribe(destination="first", queue="test-queue-3") + self.fail("Expected consume request to fail due to non-unique tag") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + + def test_cancel(self): + """ + Test compliance of the basic.cancel method + """ + channel = self.channel + #setup, declare a queue: + channel.queue_declare(queue="test-queue-4", exclusive=True, auto_delete=True) + self.subscribe(destination="my-consumer", queue="test-queue-4") + channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="One")) + + #cancel should stop messages being delivered + channel.message_cancel(destination="my-consumer") + channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="Two")) + myqueue = self.client.queue("my-consumer") + msg = myqueue.get(timeout=1) + self.assertEqual("One", msg.content.body) + try: + msg = myqueue.get(timeout=1) + self.fail("Got message after cancellation: " + msg) + except Empty: None + + #cancellation of non-existant consumers should be handled without error + channel.message_cancel(destination="my-consumer") + channel.message_cancel(destination="this-never-existed") + + + def test_ack(self): + """ + Test basic ack/recover behaviour + """ + channel = self.channel + channel.queue_declare(queue="test-ack-queue", exclusive=True, auto_delete=True) + + self.subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1) + queue = self.client.queue("consumer_tag") + + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="One")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Two")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Three")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Four")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Five")) + + msg1 = queue.get(timeout=1) + msg2 = queue.get(timeout=1) + msg3 = queue.get(timeout=1) + msg4 = queue.get(timeout=1) + msg5 = queue.get(timeout=1) + + self.assertEqual("One", msg1.content.body) + self.assertEqual("Two", msg2.content.body) + self.assertEqual("Three", msg3.content.body) + self.assertEqual("Four", msg4.content.body) + self.assertEqual("Five", msg5.content.body) + + msg2.complete(cumulative=True)#One and Two + msg4.complete(cumulative=False) + + channel.message_recover(requeue=False) + + msg3b = queue.get(timeout=1) + msg5b = queue.get(timeout=1) + + self.assertEqual("Three", msg3b.content.body) + self.assertEqual("Five", msg5b.content.body) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + + def test_recover(self): + """ + Test recover behaviour + """ + channel = self.channel + channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True) + channel.queue_bind(exchange="amq.fanout", queue="queue-a") + channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True) + channel.queue_bind(exchange="amq.fanout", queue="queue-b") + + self.subscribe(queue="queue-a", destination="unconfirmed", confirm_mode=1) + self.subscribe(queue="queue-b", destination="confirmed", confirm_mode=0) + confirmed = self.client.queue("confirmed") + unconfirmed = self.client.queue("unconfirmed") + + data = ["One", "Two", "Three", "Four", "Five"] + for d in data: + channel.message_transfer(destination="amq.fanout", content=Content(body=d)) + + for q in [confirmed, unconfirmed]: + for d in data: + self.assertEqual(d, q.get(timeout=1).content.body) + self.assertEmpty(q) + + channel.message_recover(requeue=False) + + self.assertEmpty(confirmed) + + while len(data): + msg = None + for d in data: + msg = unconfirmed.get(timeout=1) + self.assertEqual(d, msg.content.body) + self.assertEqual(True, msg.content['redelivered']) + self.assertEmpty(unconfirmed) + data.remove(msg.content.body) + msg.complete(cumulative=False) + channel.message_recover(requeue=False) + + + def test_recover_requeue(self): + """ + Test requeing on recovery + """ + channel = self.channel + channel.queue_declare(queue="test-requeue", exclusive=True, auto_delete=True) + + self.subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1) + queue = self.client.queue("consumer_tag") + + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="One")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Two")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Three")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Four")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Five")) + + msg1 = queue.get(timeout=1) + msg2 = queue.get(timeout=1) + msg3 = queue.get(timeout=1) + msg4 = queue.get(timeout=1) + msg5 = queue.get(timeout=1) + + self.assertEqual("One", msg1.content.body) + self.assertEqual("Two", msg2.content.body) + self.assertEqual("Three", msg3.content.body) + self.assertEqual("Four", msg4.content.body) + self.assertEqual("Five", msg5.content.body) + + msg2.complete(cumulative=True) #One and Two + msg4.complete(cumulative=False) #Four + + channel.message_cancel(destination="consumer_tag") + + #publish a new message + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Six")) + #requeue unacked messages (Three and Five) + channel.message_recover(requeue=True) + + self.subscribe(queue="test-requeue", destination="consumer_tag") + queue2 = self.client.queue("consumer_tag") + + msg3b = queue2.get(timeout=1) + msg5b = queue2.get(timeout=1) + + self.assertEqual("Three", msg3b.content.body) + self.assertEqual("Five", msg5b.content.body) + + self.assertEqual(True, msg3b.content['redelivered']) + self.assertEqual(True, msg5b.content['redelivered']) + + self.assertEqual("Six", queue2.get(timeout=1).content.body) + + try: + extra = queue2.get(timeout=1) + self.fail("Got unexpected message in second queue: " + extra.content.body) + except Empty: None + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in original queue: " + extra.content.body) + except Empty: None + + + def test_qos_prefetch_count(self): + """ + Test that the prefetch count specified is honoured + """ + #setup: declare queue and subscribe + channel = self.channel + channel.queue_declare(queue="test-prefetch-count", exclusive=True, auto_delete=True) + subscription = self.subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1) + queue = self.client.queue("consumer_tag") + + #set prefetch to 5: + channel.message_qos(prefetch_count=5) + + #publish 10 messages: + for i in range(1, 11): + channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-count"}, body="Message %d" % i)) + + #only 5 messages should have been delivered: + for i in range(1, 6): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 6th message in original queue: " + extra.content.body) + except Empty: None + + #ack messages and check that the next set arrive ok: + msg.complete() + + for i in range(6, 11): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + msg.complete() + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 11th message in original queue: " + extra.content.body) + except Empty: None + + + + def test_qos_prefetch_size(self): + """ + Test that the prefetch size specified is honoured + """ + #setup: declare queue and subscribe + channel = self.channel + channel.queue_declare(queue="test-prefetch-size", exclusive=True, auto_delete=True) + subscription = self.subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1) + queue = self.client.queue("consumer_tag") + + #set prefetch to 50 bytes (each message is 9 or 10 bytes): + channel.message_qos(prefetch_size=50) + + #publish 10 messages: + for i in range(1, 11): + channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body="Message %d" % i)) + + #only 5 messages should have been delivered (i.e. 45 bytes worth): + for i in range(1, 6): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 6th message in original queue: " + extra.content.body) + except Empty: None + + #ack messages and check that the next set arrive ok: + msg.complete() + + for i in range(6, 11): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + msg.complete() + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 11th message in original queue: " + extra.content.body) + except Empty: None + + #make sure that a single oversized message still gets delivered + large = "abcdefghijklmnopqrstuvwxyz" + large = large + "-" + large; + channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body=large)) + msg = queue.get(timeout=1) + self.assertEqual(large, msg.content.body) + + def test_reject(self): + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout") + channel.queue_declare(queue = "r", exclusive=True, auto_delete=True) + channel.queue_bind(queue = "r", exchange = "amq.fanout") + + self.subscribe(queue = "q", destination = "consumer", confirm_mode = 1) + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body="blah, blah")) + msg = self.client.queue("consumer").get(timeout = 1) + self.assertEquals(msg.content.body, "blah, blah") + channel.message_reject([msg.command_id, msg.command_id]) + + self.subscribe(queue = "r", destination = "checker") + msg = self.client.queue("checker").get(timeout = 1) + self.assertEquals(msg.content.body, "blah, blah") + + def test_credit_flow_messages(self): + """ + Test basic credit based flow control with unit = message + """ + #declare an exclusive queue + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + #create consumer (for now that defaults to infinite credit) + channel.message_subscribe(queue = "q", destination = "c") + channel.message_flow_mode(mode = 0, destination = "c") + #send batch of messages to queue + for i in range(1, 11): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) + + #set message credit to finite amount (less than enough for all messages) + channel.message_flow(unit = 0, value = 5, destination = "c") + #set infinite byte credit + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") + #check that expected number were received + q = self.client.queue("c") + for i in range(1, 6): + self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i) + self.assertEmpty(q) + + #increase credit again and check more are received + for i in range(6, 11): + channel.message_flow(unit = 0, value = 1, destination = "c") + self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i) + self.assertEmpty(q) + + def test_credit_flow_bytes(self): + """ + Test basic credit based flow control with unit = bytes + """ + #declare an exclusive queue + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + #create consumer (for now that defaults to infinite credit) + channel.message_subscribe(queue = "q", destination = "c") + channel.message_flow_mode(mode = 0, destination = "c") + #send batch of messages to queue + for i in range(10): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) + + #each message is currently interpreted as requiring msg_size bytes of credit + msg_size = 35 + + #set byte credit to finite amount (less than enough for all messages) + channel.message_flow(unit = 1, value = msg_size*5, destination = "c") + #set infinite message credit + channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c") + #check that expected number were received + q = self.client.queue("c") + for i in range(5): + self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") + self.assertEmpty(q) + + #increase credit again and check more are received + for i in range(5): + channel.message_flow(unit = 1, value = msg_size, destination = "c") + self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") + self.assertEmpty(q) + + + def test_window_flow_messages(self): + """ + Test basic window based flow control with unit = message + """ + #declare an exclusive queue + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + #create consumer (for now that defaults to infinite credit) + channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1) + channel.message_flow_mode(mode = 1, destination = "c") + #send batch of messages to queue + for i in range(1, 11): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) + + #set message credit to finite amount (less than enough for all messages) + channel.message_flow(unit = 0, value = 5, destination = "c") + #set infinite byte credit + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") + #check that expected number were received + q = self.client.queue("c") + for i in range(1, 6): + msg = q.get(timeout = 1) + self.assertDataEquals(channel, msg, "Message %d" % i) + self.assertEmpty(q) + + #acknowledge messages and check more are received + msg.complete(cumulative=True) + for i in range(6, 11): + self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i) + self.assertEmpty(q) + + + def test_window_flow_bytes(self): + """ + Test basic window based flow control with unit = bytes + """ + #declare an exclusive queue + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + #create consumer (for now that defaults to infinite credit) + channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1) + channel.message_flow_mode(mode = 1, destination = "c") + #send batch of messages to queue + for i in range(10): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) + + #each message is currently interpreted as requiring msg_size bytes of credit + msg_size = 40 + + #set byte credit to finite amount (less than enough for all messages) + channel.message_flow(unit = 1, value = msg_size*5, destination = "c") + #set infinite message credit + channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c") + #check that expected number were received + q = self.client.queue("c") + msgs = [] + for i in range(5): + msg = q.get(timeout = 1) + msgs.append(msg) + self.assertDataEquals(channel, msg, "abcdefgh") + self.assertEmpty(q) + + #ack each message individually and check more are received + for i in range(5): + msg = msgs.pop() + msg.complete(cumulative=False) + self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") + self.assertEmpty(q) + + def test_subscribe_not_acquired(self): + """ + Test the not-acquired modes works as expected for a simple case + """ + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + for i in range(1, 6): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i)) + + self.subscribe(queue = "q", destination = "a", acquire_mode = 1) + self.subscribe(queue = "q", destination = "b", acquire_mode = 1) + + for i in range(6, 11): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i)) + + #both subscribers should see all messages + qA = self.client.queue("a") + qB = self.client.queue("b") + for i in range(1, 11): + for q in [qA, qB]: + msg = q.get(timeout = 1) + self.assertEquals("Message %s" % i, msg.content.body) + msg.complete() + + #messages should still be on the queue: + self.assertEquals(10, channel.queue_query(queue = "q").message_count) + + def test_acquire(self): + """ + Test explicit acquire function + """ + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me")) + + self.subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1) + msg = self.client.queue("a").get(timeout = 1) + #message should still be on the queue: + self.assertEquals(1, channel.queue_query(queue = "q").message_count) + + channel.message_acquire([msg.command_id, msg.command_id]) + #check that we get notification (i.e. message_acquired) + response = channel.control_queue.get(timeout=1) + self.assertEquals(response.transfers, [msg.command_id, msg.command_id]) + #message should have been removed from the queue: + self.assertEquals(0, channel.queue_query(queue = "q").message_count) + msg.complete() + + + + + def test_release(self): + """ + Test explicit release function + """ + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "release me")) + + self.subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1) + msg = self.client.queue("a").get(timeout = 1) + channel.message_cancel(destination = "a") + channel.message_release([msg.command_id, msg.command_id]) + msg.complete() + + #message should not have been removed from the queue: + self.assertEquals(1, channel.queue_query(queue = "q").message_count) + + def test_release_ordering(self): + """ + Test order of released messages is as expected + """ + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + for i in range (1, 11): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "released message %s" % (i))) + + channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1) + channel.message_flow(unit = 0, value = 10, destination = "a") + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + queue = self.client.queue("a") + first = queue.get(timeout = 1) + for i in range (2, 10): + self.assertEquals("released message %s" % (i), queue.get(timeout = 1).content.body) + last = queue.get(timeout = 1) + self.assertEmpty(queue) + channel.message_release([first.command_id, last.command_id]) + last.complete()#will re-allocate credit, as in window mode + for i in range (1, 11): + self.assertEquals("released message %s" % (i), queue.get(timeout = 1).content.body) + + def test_ranged_ack(self): + """ + Test acking of messages ranges + """ + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + for i in range (1, 11): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message %s" % (i))) + + channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1) + channel.message_flow(unit = 0, value = 10, destination = "a") + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + queue = self.client.queue("a") + for i in range (1, 11): + self.assertEquals("message %s" % (i), queue.get(timeout = 1).content.body) + self.assertEmpty(queue) + + #ack all but the third message (command id 2) + channel.execution_complete(cumulative_execution_mark=0xFFFFFFFF, ranged_execution_set=[0,1,3,6,7,7,8,9]) + channel.message_recover() + self.assertEquals("message 3", queue.get(timeout = 1).content.body) + self.assertEmpty(queue) + + def test_subscribe_not_acquired_2(self): + channel = self.channel + + #publish some messages + self.queue_declare(queue = "q", exclusive=True, auto_delete=True) + for i in range(1, 11): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i))) + + #consume some of them + channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1) + channel.message_flow_mode(mode = 0, destination = "a") + channel.message_flow(unit = 0, value = 5, destination = "a") + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + + queue = self.client.queue("a") + for i in range(1, 6): + msg = queue.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.content.body) + msg.complete() + self.assertEmpty(queue) + + #now create a not-acquired subscriber + channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1) + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") + + #check it gets those not consumed + queue = self.client.queue("b") + channel.message_flow(unit = 0, value = 1, destination = "b") + for i in range(6, 11): + msg = queue.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.content.body) + msg.complete() + channel.message_flow(unit = 0, value = 1, destination = "b") + self.assertEmpty(queue) + + #check all 'browsed' messages are still on the queue + self.assertEqual(5, channel.queue_query(queue="q").message_count) + + def test_subscribe_not_acquired_3(self): + channel = self.channel + + #publish some messages + self.queue_declare(queue = "q", exclusive=True, auto_delete=True) + for i in range(1, 11): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i))) + + #create a not-acquired subscriber + channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1) + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + channel.message_flow(unit = 0, value = 10, destination = "a") + + #browse through messages + queue = self.client.queue("a") + for i in range(1, 11): + msg = queue.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.content.body) + if (i % 2): + #try to acquire every second message + channel.message_acquire([msg.command_id, msg.command_id]) + #check that acquire succeeds + response = channel.control_queue.get(timeout=1) + self.assertEquals(response.transfers, [msg.command_id, msg.command_id]) + msg.complete() + self.assertEmpty(queue) + + #create a second not-acquired subscriber + channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1) + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") + channel.message_flow(unit = 0, value = 1, destination = "b") + #check it gets those not consumed + queue = self.client.queue("b") + for i in [2,4,6,8,10]: + msg = queue.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.content.body) + msg.complete() + channel.message_flow(unit = 0, value = 1, destination = "b") + self.assertEmpty(queue) + + #check all 'browsed' messages are still on the queue + self.assertEqual(5, channel.queue_query(queue="q").message_count) + + def test_release_unacquired(self): + channel = self.channel + + #create queue + self.queue_declare(queue = "q", exclusive=True, auto_delete=True, durable=True) + + #send message + channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message")) + + #create two 'browsers' + channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1) + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + channel.message_flow(unit = 0, value = 10, destination = "a") + queueA = self.client.queue("a") + + channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1) + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") + channel.message_flow(unit = 0, value = 10, destination = "b") + queueB = self.client.queue("b") + + #have each browser release the message + msgA = queueA.get(timeout = 1) + channel.message_release([msgA.command_id, msgA.command_id]) + + msgB = queueB.get(timeout = 1) + channel.message_release([msgB.command_id, msgB.command_id]) + + #cancel browsers + channel.message_cancel(destination = "a") + channel.message_cancel(destination = "b") + + #create consumer + channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1, acquire_mode=0) + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") + channel.message_flow(unit = 0, value = 10, destination = "c") + queueC = self.client.queue("c") + #consume the message then ack it + msgC = queueC.get(timeout = 1) + msgC.complete() + #ensure there are no other messages + self.assertEmpty(queueC) + + def test_no_size(self): + self.queue_declare(queue = "q", exclusive=True, auto_delete=True) + + ch = self.channel + ch.message_transfer(content=SizelessContent(properties={'routing_key' : "q"}, body="message-body")) + + ch.message_subscribe(queue = "q", destination="d", confirm_mode = 0) + ch.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "d") + ch.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "d") + + queue = self.client.queue("d") + msg = queue.get(timeout = 3) + self.assertEquals("message-body", msg.content.body) + + def assertDataEquals(self, channel, msg, expected): + self.assertEquals(expected, msg.content.body) + + def assertEmpty(self, queue): + try: + extra = queue.get(timeout=1) + self.fail("Queue not empty, contains: " + extra.content.body) + except Empty: None + +class SizelessContent(Content): + + def size(self): + return None |
