summaryrefslogtreecommitdiff
path: root/python/tests_0-10_preview/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/tests_0-10_preview/message.py')
-rw-r--r--python/tests_0-10_preview/message.py834
1 files changed, 834 insertions, 0 deletions
diff --git a/python/tests_0-10_preview/message.py b/python/tests_0-10_preview/message.py
new file mode 100644
index 0000000000..a3d32bdb2d
--- /dev/null
+++ b/python/tests_0-10_preview/message.py
@@ -0,0 +1,834 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+from qpid.reference import Reference, ReferenceId
+
+class MessageTests(TestBase):
+ """Tests for 'methods' on the amqp message 'class'"""
+
+ def test_consume_no_local(self):
+ """
+ Test that the no_local flag is honoured in the consume method
+ """
+ channel = self.channel
+ #setup, declare two queues:
+ channel.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True)
+ channel.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True)
+ #establish two consumers one of which excludes delivery of locally sent messages
+ self.subscribe(destination="local_included", queue="test-queue-1a")
+ self.subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True)
+
+ #send a message
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1b"}, body="consume_no_local"))
+
+ #check the queues of the two consumers
+ excluded = self.client.queue("local_excluded")
+ included = self.client.queue("local_included")
+ msg = included.get(timeout=1)
+ self.assertEqual("consume_no_local", msg.content.body)
+ try:
+ excluded.get(timeout=1)
+ self.fail("Received locally published message though no_local=true")
+ except Empty: None
+
+ def test_consume_no_local_awkward(self):
+
+ """
+ If an exclusive queue gets a no-local delivered to it, that
+ message could 'block' delivery of subsequent messages or it
+ could be left on the queue, possibly never being consumed
+ (this is the case for example in the qpid JMS mapping of
+ topics). This test excercises a Qpid C++ broker hack that
+ deletes such messages.
+ """
+
+ channel = self.channel
+ #setup:
+ channel.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
+ #establish consumer which excludes delivery of locally sent messages
+ self.subscribe(destination="local_excluded", queue="test-queue", no_local=True)
+
+ #send a 'local' message
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="local"))
+
+ #send a non local message
+ other = self.connect()
+ channel2 = other.channel(1)
+ channel2.session_open()
+ channel2.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="foreign"))
+ channel2.session_close()
+ other.close()
+
+ #check that the second message only is delivered
+ excluded = self.client.queue("local_excluded")
+ msg = excluded.get(timeout=1)
+ self.assertEqual("foreign", msg.content.body)
+ try:
+ excluded.get(timeout=1)
+ self.fail("Received extra message")
+ except Empty: None
+ #check queue is empty
+ self.assertEqual(0, channel.queue_query(queue="test-queue").message_count)
+
+
+ def test_consume_exclusive(self):
+ """
+ Test that the exclusive flag is honoured in the consume method
+ """
+ channel = self.channel
+ #setup, declare a queue:
+ channel.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True)
+
+ #check that an exclusive consumer prevents other consumer being created:
+ self.subscribe(destination="first", queue="test-queue-2", exclusive=True)
+ try:
+ self.subscribe(destination="second", queue="test-queue-2")
+ self.fail("Expected consume request to fail due to previous exclusive consumer")
+ except Closed, e:
+ self.assertChannelException(403, e.args[0])
+
+ #open new channel and cleanup last consumer:
+ channel = self.client.channel(2)
+ channel.session_open()
+
+ #check that an exclusive consumer cannot be created if a consumer already exists:
+ self.subscribe(channel, destination="first", queue="test-queue-2")
+ try:
+ self.subscribe(destination="second", queue="test-queue-2", exclusive=True)
+ self.fail("Expected exclusive consume request to fail due to previous consumer")
+ except Closed, e:
+ self.assertChannelException(403, e.args[0])
+
+ def test_consume_queue_errors(self):
+ """
+ Test error conditions associated with the queue field of the consume method:
+ """
+ channel = self.channel
+ try:
+ #queue specified but doesn't exist:
+ self.subscribe(queue="invalid-queue", destination="")
+ self.fail("Expected failure when consuming from non-existent queue")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ channel = self.client.channel(2)
+ channel.session_open()
+ try:
+ #queue not specified and none previously declared for channel:
+ self.subscribe(channel, queue="", destination="")
+ self.fail("Expected failure when consuming from unspecified queue")
+ except Closed, e:
+ self.assertConnectionException(530, e.args[0])
+
+ def test_consume_unique_consumers(self):
+ """
+ Ensure unique consumer tags are enforced
+ """
+ channel = self.channel
+ #setup, declare a queue:
+ channel.queue_declare(queue="test-queue-3", exclusive=True, auto_delete=True)
+
+ #check that attempts to use duplicate tags are detected and prevented:
+ self.subscribe(destination="first", queue="test-queue-3")
+ try:
+ self.subscribe(destination="first", queue="test-queue-3")
+ self.fail("Expected consume request to fail due to non-unique tag")
+ except Closed, e:
+ self.assertConnectionException(530, e.args[0])
+
+ def test_cancel(self):
+ """
+ Test compliance of the basic.cancel method
+ """
+ channel = self.channel
+ #setup, declare a queue:
+ channel.queue_declare(queue="test-queue-4", exclusive=True, auto_delete=True)
+ self.subscribe(destination="my-consumer", queue="test-queue-4")
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="One"))
+
+ #cancel should stop messages being delivered
+ channel.message_cancel(destination="my-consumer")
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="Two"))
+ myqueue = self.client.queue("my-consumer")
+ msg = myqueue.get(timeout=1)
+ self.assertEqual("One", msg.content.body)
+ try:
+ msg = myqueue.get(timeout=1)
+ self.fail("Got message after cancellation: " + msg)
+ except Empty: None
+
+ #cancellation of non-existant consumers should be handled without error
+ channel.message_cancel(destination="my-consumer")
+ channel.message_cancel(destination="this-never-existed")
+
+
+ def test_ack(self):
+ """
+ Test basic ack/recover behaviour
+ """
+ channel = self.channel
+ channel.queue_declare(queue="test-ack-queue", exclusive=True, auto_delete=True)
+
+ self.subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1)
+ queue = self.client.queue("consumer_tag")
+
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="One"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Two"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Three"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Four"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Five"))
+
+ msg1 = queue.get(timeout=1)
+ msg2 = queue.get(timeout=1)
+ msg3 = queue.get(timeout=1)
+ msg4 = queue.get(timeout=1)
+ msg5 = queue.get(timeout=1)
+
+ self.assertEqual("One", msg1.content.body)
+ self.assertEqual("Two", msg2.content.body)
+ self.assertEqual("Three", msg3.content.body)
+ self.assertEqual("Four", msg4.content.body)
+ self.assertEqual("Five", msg5.content.body)
+
+ msg2.complete(cumulative=True)#One and Two
+ msg4.complete(cumulative=False)
+
+ channel.message_recover(requeue=False)
+
+ msg3b = queue.get(timeout=1)
+ msg5b = queue.get(timeout=1)
+
+ self.assertEqual("Three", msg3b.content.body)
+ self.assertEqual("Five", msg5b.content.body)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+
+ def test_recover(self):
+ """
+ Test recover behaviour
+ """
+ channel = self.channel
+ channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True)
+ channel.queue_bind(exchange="amq.fanout", queue="queue-a")
+ channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True)
+ channel.queue_bind(exchange="amq.fanout", queue="queue-b")
+
+ self.subscribe(queue="queue-a", destination="unconfirmed", confirm_mode=1)
+ self.subscribe(queue="queue-b", destination="confirmed", confirm_mode=0)
+ confirmed = self.client.queue("confirmed")
+ unconfirmed = self.client.queue("unconfirmed")
+
+ data = ["One", "Two", "Three", "Four", "Five"]
+ for d in data:
+ channel.message_transfer(destination="amq.fanout", content=Content(body=d))
+
+ for q in [confirmed, unconfirmed]:
+ for d in data:
+ self.assertEqual(d, q.get(timeout=1).content.body)
+ self.assertEmpty(q)
+
+ channel.message_recover(requeue=False)
+
+ self.assertEmpty(confirmed)
+
+ while len(data):
+ msg = None
+ for d in data:
+ msg = unconfirmed.get(timeout=1)
+ self.assertEqual(d, msg.content.body)
+ self.assertEqual(True, msg.content['redelivered'])
+ self.assertEmpty(unconfirmed)
+ data.remove(msg.content.body)
+ msg.complete(cumulative=False)
+ channel.message_recover(requeue=False)
+
+
+ def test_recover_requeue(self):
+ """
+ Test requeing on recovery
+ """
+ channel = self.channel
+ channel.queue_declare(queue="test-requeue", exclusive=True, auto_delete=True)
+
+ self.subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1)
+ queue = self.client.queue("consumer_tag")
+
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="One"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Two"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Three"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Four"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Five"))
+
+ msg1 = queue.get(timeout=1)
+ msg2 = queue.get(timeout=1)
+ msg3 = queue.get(timeout=1)
+ msg4 = queue.get(timeout=1)
+ msg5 = queue.get(timeout=1)
+
+ self.assertEqual("One", msg1.content.body)
+ self.assertEqual("Two", msg2.content.body)
+ self.assertEqual("Three", msg3.content.body)
+ self.assertEqual("Four", msg4.content.body)
+ self.assertEqual("Five", msg5.content.body)
+
+ msg2.complete(cumulative=True) #One and Two
+ msg4.complete(cumulative=False) #Four
+
+ channel.message_cancel(destination="consumer_tag")
+
+ #publish a new message
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Six"))
+ #requeue unacked messages (Three and Five)
+ channel.message_recover(requeue=True)
+
+ self.subscribe(queue="test-requeue", destination="consumer_tag")
+ queue2 = self.client.queue("consumer_tag")
+
+ msg3b = queue2.get(timeout=1)
+ msg5b = queue2.get(timeout=1)
+
+ self.assertEqual("Three", msg3b.content.body)
+ self.assertEqual("Five", msg5b.content.body)
+
+ self.assertEqual(True, msg3b.content['redelivered'])
+ self.assertEqual(True, msg5b.content['redelivered'])
+
+ self.assertEqual("Six", queue2.get(timeout=1).content.body)
+
+ try:
+ extra = queue2.get(timeout=1)
+ self.fail("Got unexpected message in second queue: " + extra.content.body)
+ except Empty: None
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in original queue: " + extra.content.body)
+ except Empty: None
+
+
+ def test_qos_prefetch_count(self):
+ """
+ Test that the prefetch count specified is honoured
+ """
+ #setup: declare queue and subscribe
+ channel = self.channel
+ channel.queue_declare(queue="test-prefetch-count", exclusive=True, auto_delete=True)
+ subscription = self.subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1)
+ queue = self.client.queue("consumer_tag")
+
+ #set prefetch to 5:
+ channel.message_qos(prefetch_count=5)
+
+ #publish 10 messages:
+ for i in range(1, 11):
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-count"}, body="Message %d" % i))
+
+ #only 5 messages should have been delivered:
+ for i in range(1, 6):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
+ except Empty: None
+
+ #ack messages and check that the next set arrive ok:
+ msg.complete()
+
+ for i in range(6, 11):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ msg.complete()
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
+ except Empty: None
+
+
+
+ def test_qos_prefetch_size(self):
+ """
+ Test that the prefetch size specified is honoured
+ """
+ #setup: declare queue and subscribe
+ channel = self.channel
+ channel.queue_declare(queue="test-prefetch-size", exclusive=True, auto_delete=True)
+ subscription = self.subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1)
+ queue = self.client.queue("consumer_tag")
+
+ #set prefetch to 50 bytes (each message is 9 or 10 bytes):
+ channel.message_qos(prefetch_size=50)
+
+ #publish 10 messages:
+ for i in range(1, 11):
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body="Message %d" % i))
+
+ #only 5 messages should have been delivered (i.e. 45 bytes worth):
+ for i in range(1, 6):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
+ except Empty: None
+
+ #ack messages and check that the next set arrive ok:
+ msg.complete()
+
+ for i in range(6, 11):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ msg.complete()
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
+ except Empty: None
+
+ #make sure that a single oversized message still gets delivered
+ large = "abcdefghijklmnopqrstuvwxyz"
+ large = large + "-" + large;
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body=large))
+ msg = queue.get(timeout=1)
+ self.assertEqual(large, msg.content.body)
+
+ def test_reject(self):
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout")
+ channel.queue_declare(queue = "r", exclusive=True, auto_delete=True)
+ channel.queue_bind(queue = "r", exchange = "amq.fanout")
+
+ self.subscribe(queue = "q", destination = "consumer", confirm_mode = 1)
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body="blah, blah"))
+ msg = self.client.queue("consumer").get(timeout = 1)
+ self.assertEquals(msg.content.body, "blah, blah")
+ channel.message_reject([msg.command_id, msg.command_id])
+
+ self.subscribe(queue = "r", destination = "checker")
+ msg = self.client.queue("checker").get(timeout = 1)
+ self.assertEquals(msg.content.body, "blah, blah")
+
+ def test_credit_flow_messages(self):
+ """
+ Test basic credit based flow control with unit = message
+ """
+ #declare an exclusive queue
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ #create consumer (for now that defaults to infinite credit)
+ channel.message_subscribe(queue = "q", destination = "c")
+ channel.message_flow_mode(mode = 0, destination = "c")
+ #send batch of messages to queue
+ for i in range(1, 11):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
+
+ #set message credit to finite amount (less than enough for all messages)
+ channel.message_flow(unit = 0, value = 5, destination = "c")
+ #set infinite byte credit
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
+ #check that expected number were received
+ q = self.client.queue("c")
+ for i in range(1, 6):
+ self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
+ self.assertEmpty(q)
+
+ #increase credit again and check more are received
+ for i in range(6, 11):
+ channel.message_flow(unit = 0, value = 1, destination = "c")
+ self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
+ self.assertEmpty(q)
+
+ def test_credit_flow_bytes(self):
+ """
+ Test basic credit based flow control with unit = bytes
+ """
+ #declare an exclusive queue
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ #create consumer (for now that defaults to infinite credit)
+ channel.message_subscribe(queue = "q", destination = "c")
+ channel.message_flow_mode(mode = 0, destination = "c")
+ #send batch of messages to queue
+ for i in range(10):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
+
+ #each message is currently interpreted as requiring msg_size bytes of credit
+ msg_size = 35
+
+ #set byte credit to finite amount (less than enough for all messages)
+ channel.message_flow(unit = 1, value = msg_size*5, destination = "c")
+ #set infinite message credit
+ channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
+ #check that expected number were received
+ q = self.client.queue("c")
+ for i in range(5):
+ self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
+ self.assertEmpty(q)
+
+ #increase credit again and check more are received
+ for i in range(5):
+ channel.message_flow(unit = 1, value = msg_size, destination = "c")
+ self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
+ self.assertEmpty(q)
+
+
+ def test_window_flow_messages(self):
+ """
+ Test basic window based flow control with unit = message
+ """
+ #declare an exclusive queue
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ #create consumer (for now that defaults to infinite credit)
+ channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
+ channel.message_flow_mode(mode = 1, destination = "c")
+ #send batch of messages to queue
+ for i in range(1, 11):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
+
+ #set message credit to finite amount (less than enough for all messages)
+ channel.message_flow(unit = 0, value = 5, destination = "c")
+ #set infinite byte credit
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
+ #check that expected number were received
+ q = self.client.queue("c")
+ for i in range(1, 6):
+ msg = q.get(timeout = 1)
+ self.assertDataEquals(channel, msg, "Message %d" % i)
+ self.assertEmpty(q)
+
+ #acknowledge messages and check more are received
+ msg.complete(cumulative=True)
+ for i in range(6, 11):
+ self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
+ self.assertEmpty(q)
+
+
+ def test_window_flow_bytes(self):
+ """
+ Test basic window based flow control with unit = bytes
+ """
+ #declare an exclusive queue
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ #create consumer (for now that defaults to infinite credit)
+ channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
+ channel.message_flow_mode(mode = 1, destination = "c")
+ #send batch of messages to queue
+ for i in range(10):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
+
+ #each message is currently interpreted as requiring msg_size bytes of credit
+ msg_size = 40
+
+ #set byte credit to finite amount (less than enough for all messages)
+ channel.message_flow(unit = 1, value = msg_size*5, destination = "c")
+ #set infinite message credit
+ channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
+ #check that expected number were received
+ q = self.client.queue("c")
+ msgs = []
+ for i in range(5):
+ msg = q.get(timeout = 1)
+ msgs.append(msg)
+ self.assertDataEquals(channel, msg, "abcdefgh")
+ self.assertEmpty(q)
+
+ #ack each message individually and check more are received
+ for i in range(5):
+ msg = msgs.pop()
+ msg.complete(cumulative=False)
+ self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
+ self.assertEmpty(q)
+
+ def test_subscribe_not_acquired(self):
+ """
+ Test the not-acquired modes works as expected for a simple case
+ """
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ for i in range(1, 6):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
+
+ self.subscribe(queue = "q", destination = "a", acquire_mode = 1)
+ self.subscribe(queue = "q", destination = "b", acquire_mode = 1)
+
+ for i in range(6, 11):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
+
+ #both subscribers should see all messages
+ qA = self.client.queue("a")
+ qB = self.client.queue("b")
+ for i in range(1, 11):
+ for q in [qA, qB]:
+ msg = q.get(timeout = 1)
+ self.assertEquals("Message %s" % i, msg.content.body)
+ msg.complete()
+
+ #messages should still be on the queue:
+ self.assertEquals(10, channel.queue_query(queue = "q").message_count)
+
+ def test_acquire(self):
+ """
+ Test explicit acquire function
+ """
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me"))
+
+ self.subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1)
+ msg = self.client.queue("a").get(timeout = 1)
+ #message should still be on the queue:
+ self.assertEquals(1, channel.queue_query(queue = "q").message_count)
+
+ channel.message_acquire([msg.command_id, msg.command_id])
+ #check that we get notification (i.e. message_acquired)
+ response = channel.control_queue.get(timeout=1)
+ self.assertEquals(response.transfers, [msg.command_id, msg.command_id])
+ #message should have been removed from the queue:
+ self.assertEquals(0, channel.queue_query(queue = "q").message_count)
+ msg.complete()
+
+
+
+
+ def test_release(self):
+ """
+ Test explicit release function
+ """
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "release me"))
+
+ self.subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1)
+ msg = self.client.queue("a").get(timeout = 1)
+ channel.message_cancel(destination = "a")
+ channel.message_release([msg.command_id, msg.command_id])
+ msg.complete()
+
+ #message should not have been removed from the queue:
+ self.assertEquals(1, channel.queue_query(queue = "q").message_count)
+
+ def test_release_ordering(self):
+ """
+ Test order of released messages is as expected
+ """
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ for i in range (1, 11):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "released message %s" % (i)))
+
+ channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1)
+ channel.message_flow(unit = 0, value = 10, destination = "a")
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ queue = self.client.queue("a")
+ first = queue.get(timeout = 1)
+ for i in range (2, 10):
+ self.assertEquals("released message %s" % (i), queue.get(timeout = 1).content.body)
+ last = queue.get(timeout = 1)
+ self.assertEmpty(queue)
+ channel.message_release([first.command_id, last.command_id])
+ last.complete()#will re-allocate credit, as in window mode
+ for i in range (1, 11):
+ self.assertEquals("released message %s" % (i), queue.get(timeout = 1).content.body)
+
+ def test_ranged_ack(self):
+ """
+ Test acking of messages ranges
+ """
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ for i in range (1, 11):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message %s" % (i)))
+
+ channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1)
+ channel.message_flow(unit = 0, value = 10, destination = "a")
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ queue = self.client.queue("a")
+ for i in range (1, 11):
+ self.assertEquals("message %s" % (i), queue.get(timeout = 1).content.body)
+ self.assertEmpty(queue)
+
+ #ack all but the third message (command id 2)
+ channel.execution_complete(cumulative_execution_mark=0xFFFFFFFF, ranged_execution_set=[0,1,3,6,7,7,8,9])
+ channel.message_recover()
+ self.assertEquals("message 3", queue.get(timeout = 1).content.body)
+ self.assertEmpty(queue)
+
+ def test_subscribe_not_acquired_2(self):
+ channel = self.channel
+
+ #publish some messages
+ self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ for i in range(1, 11):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i)))
+
+ #consume some of them
+ channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1)
+ channel.message_flow_mode(mode = 0, destination = "a")
+ channel.message_flow(unit = 0, value = 5, destination = "a")
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+
+ queue = self.client.queue("a")
+ for i in range(1, 6):
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.content.body)
+ msg.complete()
+ self.assertEmpty(queue)
+
+ #now create a not-acquired subscriber
+ channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1)
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
+
+ #check it gets those not consumed
+ queue = self.client.queue("b")
+ channel.message_flow(unit = 0, value = 1, destination = "b")
+ for i in range(6, 11):
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.content.body)
+ msg.complete()
+ channel.message_flow(unit = 0, value = 1, destination = "b")
+ self.assertEmpty(queue)
+
+ #check all 'browsed' messages are still on the queue
+ self.assertEqual(5, channel.queue_query(queue="q").message_count)
+
+ def test_subscribe_not_acquired_3(self):
+ channel = self.channel
+
+ #publish some messages
+ self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ for i in range(1, 11):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i)))
+
+ #create a not-acquired subscriber
+ channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1)
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ channel.message_flow(unit = 0, value = 10, destination = "a")
+
+ #browse through messages
+ queue = self.client.queue("a")
+ for i in range(1, 11):
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.content.body)
+ if (i % 2):
+ #try to acquire every second message
+ channel.message_acquire([msg.command_id, msg.command_id])
+ #check that acquire succeeds
+ response = channel.control_queue.get(timeout=1)
+ self.assertEquals(response.transfers, [msg.command_id, msg.command_id])
+ msg.complete()
+ self.assertEmpty(queue)
+
+ #create a second not-acquired subscriber
+ channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1)
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
+ channel.message_flow(unit = 0, value = 1, destination = "b")
+ #check it gets those not consumed
+ queue = self.client.queue("b")
+ for i in [2,4,6,8,10]:
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.content.body)
+ msg.complete()
+ channel.message_flow(unit = 0, value = 1, destination = "b")
+ self.assertEmpty(queue)
+
+ #check all 'browsed' messages are still on the queue
+ self.assertEqual(5, channel.queue_query(queue="q").message_count)
+
+ def test_release_unacquired(self):
+ channel = self.channel
+
+ #create queue
+ self.queue_declare(queue = "q", exclusive=True, auto_delete=True, durable=True)
+
+ #send message
+ channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message"))
+
+ #create two 'browsers'
+ channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1)
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ channel.message_flow(unit = 0, value = 10, destination = "a")
+ queueA = self.client.queue("a")
+
+ channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1)
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
+ channel.message_flow(unit = 0, value = 10, destination = "b")
+ queueB = self.client.queue("b")
+
+ #have each browser release the message
+ msgA = queueA.get(timeout = 1)
+ channel.message_release([msgA.command_id, msgA.command_id])
+
+ msgB = queueB.get(timeout = 1)
+ channel.message_release([msgB.command_id, msgB.command_id])
+
+ #cancel browsers
+ channel.message_cancel(destination = "a")
+ channel.message_cancel(destination = "b")
+
+ #create consumer
+ channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1, acquire_mode=0)
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
+ channel.message_flow(unit = 0, value = 10, destination = "c")
+ queueC = self.client.queue("c")
+ #consume the message then ack it
+ msgC = queueC.get(timeout = 1)
+ msgC.complete()
+ #ensure there are no other messages
+ self.assertEmpty(queueC)
+
+ def test_no_size(self):
+ self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+ ch = self.channel
+ ch.message_transfer(content=SizelessContent(properties={'routing_key' : "q"}, body="message-body"))
+
+ ch.message_subscribe(queue = "q", destination="d", confirm_mode = 0)
+ ch.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "d")
+ ch.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "d")
+
+ queue = self.client.queue("d")
+ msg = queue.get(timeout = 3)
+ self.assertEquals("message-body", msg.content.body)
+
+ def assertDataEquals(self, channel, msg, expected):
+ self.assertEquals(expected, msg.content.body)
+
+ def assertEmpty(self, queue):
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Queue not empty, contains: " + extra.content.body)
+ except Empty: None
+
+class SizelessContent(Content):
+
+ def size(self):
+ return None