# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # from qpid.client import Client, Closed from qpid.queue import Empty from qpid.content import Content from qpid.testlib import TestBase010 from qpid.datatypes import Message, RangedSet class MessageTests(TestBase010): """Tests for 'methods' on the amqp message 'class'""" def test_consume_no_local(self): """ Test that the no_local flag is honoured in the consume method """ session = self.session #setup, declare two queues: session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True) session.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True) #establish two consumers one of which excludes delivery of locally sent messages self.subscribe(destination="local_included", queue="test-queue-1a") self.subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True) #send a message session.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local")) session.message_transfer(content=Content(properties={'routing_key' : "test-queue-1b"}, body="consume_no_local")) #check the queues of the two consumers excluded = session.incoming("local_excluded") included = session.incoming("local_included") msg = included.get(timeout=1) self.assertEqual("consume_no_local", msg.body) try: excluded.get(timeout=1) self.fail("Received locally published message though no_local=true") except Empty: None def test_consume_no_local_awkward(self): """ If an exclusive queue gets a no-local delivered to it, that message could 'block' delivery of subsequent messages or it could be left on the queue, possibly never being consumed (this is the case for example in the qpid JMS mapping of topics). This test excercises a Qpid C++ broker hack that deletes such messages. """ session = self.session #setup: session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) #establish consumer which excludes delivery of locally sent messages self.subscribe(destination="local_excluded", queue="test-queue", no_local=True) #send a 'local' message session.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="local")) #send a non local message other = self.connect() session2 = other.session(1) session2.session_open() session2.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="foreign")) session2.session_close() other.close() #check that the second message only is delivered excluded = session.incoming("local_excluded") msg = excluded.get(timeout=1) self.assertEqual("foreign", msg.body) try: excluded.get(timeout=1) self.fail("Received extra message") except Empty: None #check queue is empty self.assertEqual(0, session.queue_query(queue="test-queue").message_count) def test_consume_exclusive(self): """ Test that the exclusive flag is honoured in the consume method """ session = self.session #setup, declare a queue: session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True) #check that an exclusive consumer prevents other consumer being created: self.subscribe(destination="first", queue="test-queue-2", exclusive=True) try: self.subscribe(destination="second", queue="test-queue-2") self.fail("Expected consume request to fail due to previous exclusive consumer") except Closed, e: self.assertChannelException(403, e.args[0]) #open new session and cleanup last consumer: session = self.client.session(2) session.session_open() #check that an exclusive consumer cannot be created if a consumer already exists: self.subscribe(session, destination="first", queue="test-queue-2") try: self.subscribe(destination="second", queue="test-queue-2", exclusive=True) self.fail("Expected exclusive consume request to fail due to previous consumer") except Closed, e: self.assertChannelException(403, e.args[0]) def test_consume_queue_errors(self): """ Test error conditions associated with the queue field of the consume method: """ session = self.session try: #queue specified but doesn't exist: self.subscribe(queue="invalid-queue", destination="") self.fail("Expected failure when consuming from non-existent queue") except Closed, e: self.assertChannelException(404, e.args[0]) session = self.client.session(2) session.session_open() try: #queue not specified and none previously declared for channel: self.subscribe(session, queue="", destination="") self.fail("Expected failure when consuming from unspecified queue") except Closed, e: self.assertConnectionException(530, e.args[0]) def test_consume_unique_consumers(self): """ Ensure unique consumer tags are enforced """ session = self.session #setup, declare a queue: session.queue_declare(queue="test-queue-3", exclusive=True, auto_delete=True) #check that attempts to use duplicate tags are detected and prevented: self.subscribe(destination="first", queue="test-queue-3") try: self.subscribe(destination="first", queue="test-queue-3") self.fail("Expected consume request to fail due to non-unique tag") except Closed, e: self.assertConnectionException(530, e.args[0]) def test_cancel(self): """ Test compliance of the basic.cancel method """ session = self.session #setup, declare a queue: session.queue_declare(queue="test-queue-4", exclusive=True, auto_delete=True) session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "One")) session.message_subscribe(destination="my-consumer", queue="test-queue-4") session.message_flow(destination="my-consumer", unit=0, value=0xFFFFFFFF) session.message_flow(destination="my-consumer", unit=1, value=0xFFFFFFFF) #should flush here #cancel should stop messages being delivered session.message_cancel(destination="my-consumer") session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "Two")) myqueue = session.incoming("my-consumer") msg = myqueue.get(timeout=1) self.assertEqual("One", msg.body) try: msg = myqueue.get(timeout=1) self.fail("Got message after cancellation: " + msg) except Empty: None #cancellation of non-existant consumers should be handled without error session.message_cancel(destination="my-consumer") session.message_cancel(destination="this-never-existed") def test_ack(self): """ Test basic ack/recover behaviour """ session = self.conn.session("alternate-session", timeout=10) session.queue_declare(queue="test-ack-queue", auto_delete=True) session.message_subscribe(queue = "test-ack-queue", destination = "consumer") session.message_flow(destination="consumer", unit=0, value=0xFFFFFFFF) session.message_flow(destination="consumer", unit=1, value=0xFFFFFFFF) queue = session.incoming("consumer") delivery_properties = session.delivery_properties(routing_key="test-ack-queue") for i in ["One", "Two", "Three", "Four", "Five"]: session.message_transfer(message=Message(delivery_properties, i)) msg1 = queue.get(timeout=1) msg2 = queue.get(timeout=1) msg3 = queue.get(timeout=1) msg4 = queue.get(timeout=1) msg5 = queue.get(timeout=1) self.assertEqual("One", msg1.body) self.assertEqual("Two", msg2.body) self.assertEqual("Three", msg3.body) self.assertEqual("Four", msg4.body) self.assertEqual("Five", msg5.body) session.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four #subscribe from second session here to ensure queue is not #auto-deleted when alternate session closes (no need to ack on these): self.session.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1) #now close the session, and see that the unacked messages are #then redelivered to another subscriber: session.close(timeout=10) session = self.session session.message_flow(destination="checker", unit=0, value=0xFFFFFFFF) session.message_flow(destination="checker", unit=1, value=0xFFFFFFFF) queue = session.incoming("checker") msg3b = queue.get(timeout=1) msg5b = queue.get(timeout=1) self.assertEqual("Three", msg3b.body) self.assertEqual("Five", msg5b.body) try: extra = queue.get(timeout=1) self.fail("Got unexpected message: " + extra.body) except Empty: None def test_recover(self): """ Test recover behaviour """ session = self.session session.queue_declare(queue="queue-a", exclusive=True, auto_delete=True) session.queue_bind(exchange="amq.fanout", queue="queue-a") session.queue_declare(queue="queue-b", exclusive=True, auto_delete=True) session.queue_bind(exchange="amq.fanout", queue="queue-b") self.subscribe(queue="queue-a", destination="unconfirmed", confirm_mode=1) self.subscribe(queue="queue-b", destination="confirmed", confirm_mode=0) confirmed = session.incoming("confirmed") unconfirmed = session.incoming("unconfirmed") data = ["One", "Two", "Three", "Four", "Five"] for d in data: session.message_transfer(destination="amq.fanout", content=Content(body=d)) for q in [confirmed, unconfirmed]: for d in data: self.assertEqual(d, q.get(timeout=1).content.body) self.assertEmpty(q) session.message_recover(requeue=False) self.assertEmpty(confirmed) while len(data): msg = None for d in data: msg = unconfirmed.get(timeout=1) self.assertEqual(d, msg.body) self.assertEqual(True, msg.content['redelivered']) self.assertEmpty(unconfirmed) data.remove(msg.body) msg.complete(cumulative=False) session.message_recover(requeue=False) def test_recover_requeue(self): """ Test requeing on recovery """ session = self.session session.queue_declare(queue="test-requeue", exclusive=True, auto_delete=True) self.subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1) queue = session.incoming("consumer_tag") session.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="One")) session.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Two")) session.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Three")) session.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Four")) session.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Five")) msg1 = queue.get(timeout=1) msg2 = queue.get(timeout=1) msg3 = queue.get(timeout=1) msg4 = queue.get(timeout=1) msg5 = queue.get(timeout=1) self.assertEqual("One", msg1.content.body) self.assertEqual("Two", msg2.content.body) self.assertEqual("Three", msg3.content.body) self.assertEqual("Four", msg4.content.body) self.assertEqual("Five", msg5.content.body) msg2.complete(cumulative=True) #One and Two msg4.complete(cumulative=False) #Four session.message_cancel(destination="consumer_tag") #publish a new message session.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Six")) #requeue unacked messages (Three and Five) session.message_recover(requeue=True) self.subscribe(queue="test-requeue", destination="consumer_tag") queue2 = session.incoming("consumer_tag") msg3b = queue2.get(timeout=1) msg5b = queue2.get(timeout=1) self.assertEqual("Three", msg3b.content.body) self.assertEqual("Five", msg5b.content.body) self.assertEqual(True, msg3b.content['redelivered']) self.assertEqual(True, msg5b.content['redelivered']) self.assertEqual("Six", queue2.get(timeout=1).content.body) try: extra = queue2.get(timeout=1) self.fail("Got unexpected message in second queue: " + extra.content.body) except Empty: None try: extra = queue.get(timeout=1) self.fail("Got unexpected message in original queue: " + extra.content.body) except Empty: None def test_qos_prefetch_count(self): """ Test that the prefetch count specified is honoured """ #setup: declare queue and subscribe session = self.session session.queue_declare(queue="test-prefetch-count", exclusive=True, auto_delete=True) subscription = self.subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1) queue = session.incoming("consumer_tag") #set prefetch to 5: session.message_qos(prefetch_count=5) #publish 10 messages: for i in range(1, 11): session.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-count"}, body="Message %d" % i)) #only 5 messages should have been delivered: for i in range(1, 6): msg = queue.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) try: extra = queue.get(timeout=1) self.fail("Got unexpected 6th message in original queue: " + extra.content.body) except Empty: None #ack messages and check that the next set arrive ok: msg.complete() for i in range(6, 11): msg = queue.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) msg.complete() try: extra = queue.get(timeout=1) self.fail("Got unexpected 11th message in original queue: " + extra.content.body) except Empty: None def test_qos_prefetch_size(self): """ Test that the prefetch size specified is honoured """ #setup: declare queue and subscribe session = self.session session.queue_declare(queue="test-prefetch-size", exclusive=True, auto_delete=True) subscription = self.subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1) queue = session.incoming("consumer_tag") #set prefetch to 50 bytes (each message is 9 or 10 bytes): session.message_qos(prefetch_size=50) #publish 10 messages: for i in range(1, 11): session.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body="Message %d" % i)) #only 5 messages should have been delivered (i.e. 45 bytes worth): for i in range(1, 6): msg = queue.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) try: extra = queue.get(timeout=1) self.fail("Got unexpected 6th message in original queue: " + extra.content.body) except Empty: None #ack messages and check that the next set arrive ok: msg.complete() for i in range(6, 11): msg = queue.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) msg.complete() try: extra = queue.get(timeout=1) self.fail("Got unexpected 11th message in original queue: " + extra.content.body) except Empty: None #make sure that a single oversized message still gets delivered large = "abcdefghijklmnopqrstuvwxyz" large = large + "-" + large; session.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body=large)) msg = queue.get(timeout=1) self.assertEqual(large, msg.body) def test_reject(self): session = self.session session.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout") session.queue_declare(queue = "r", exclusive=True, auto_delete=True) session.exchange_bind(queue = "r", exchange = "amq.fanout") session.message_subscribe(queue = "q", destination = "consumer") session.message_flow(destination="consumer", unit=0, value=0xFFFFFFFF) session.message_flow(destination="consumer", unit=1, value=0xFFFFFFFF) session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "blah, blah")) msg = session.incoming("consumer").get(timeout = 1) self.assertEquals(msg.body, "blah, blah") session.message_reject(RangedSet(msg.id)) session.message_subscribe(queue = "r", destination = "checker") session.message_flow(destination="checker", unit=0, value=0xFFFFFFFF) session.message_flow(destination="checker", unit=1, value=0xFFFFFFFF) msg = session.incoming("checker").get(timeout = 1) self.assertEquals(msg.body, "blah, blah") def test_credit_flow_messages(self): """ Test basic credit based flow control with unit = message """ #declare an exclusive queue session = self.session session.queue_declare(queue = "q", exclusive=True, auto_delete=True) #create consumer (for now that defaults to infinite credit) session.message_subscribe(queue = "q", destination = "c") session.message_set_flow_mode(flow_mode = 0, destination = "c") #send batch of messages to queue for i in range(1, 11): session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i)) #set message credit to finite amount (less than enough for all messages) session.message_flow(unit = 0, value = 5, destination = "c") #set infinite byte credit session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") #check that expected number were received q = session.incoming("c") for i in range(1, 6): self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) self.assertEmpty(q) #increase credit again and check more are received for i in range(6, 11): session.message_flow(unit = 0, value = 1, destination = "c") self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) self.assertEmpty(q) def test_credit_flow_bytes(self): """ Test basic credit based flow control with unit = bytes """ #declare an exclusive queue session = self.session session.queue_declare(queue = "q", exclusive=True, auto_delete=True) #create consumer (for now that defaults to infinite credit) session.message_subscribe(queue = "q", destination = "c") session.message_set_flow_mode(flow_mode = 0, destination = "c") #send batch of messages to queue for i in range(10): session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh")) #each message is currently interpreted as requiring msg_size bytes of credit msg_size = 21 #set byte credit to finite amount (less than enough for all messages) session.message_flow(unit = 1, value = msg_size*5, destination = "c") #set infinite message credit session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c") #check that expected number were received q = session.incoming("c") for i in range(5): self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") self.assertEmpty(q) #increase credit again and check more are received for i in range(5): session.message_flow(unit = 1, value = msg_size, destination = "c") self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") self.assertEmpty(q) def test_window_flow_messages(self): """ Test basic window based flow control with unit = message """ #declare an exclusive queue session = self.session session.queue_declare(queue = "q", exclusive=True, auto_delete=True) #create consumer (for now that defaults to infinite credit) session.message_subscribe(queue = "q", destination = "c") session.message_set_flow_mode(flow_mode = 1, destination = "c") #send batch of messages to queue for i in range(1, 11): session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i)) #set message credit to finite amount (less than enough for all messages) session.message_flow(unit = 0, value = 5, destination = "c") #set infinite byte credit session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") #check that expected number were received q = session.incoming("c") for i in range(1, 6): msg = q.get(timeout = 1) session.receiver._completed.add(msg.id)#TODO: this may be done automatically self.assertDataEquals(session, msg, "Message %d" % i) self.assertEmpty(q) #acknowledge messages and check more are received #TODO: there may be a nicer way of doing this session.channel.session_completed(session.receiver._completed) for i in range(6, 11): self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) self.assertEmpty(q) def test_window_flow_bytes(self): """ Test basic window based flow control with unit = bytes """ #declare an exclusive queue session = self.session session.queue_declare(queue = "q", exclusive=True, auto_delete=True) #create consumer (for now that defaults to infinite credit) session.message_subscribe(queue = "q", destination = "c") session.message_set_flow_mode(flow_mode = 1, destination = "c") #send batch of messages to queue for i in range(10): session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh")) #each message is currently interpreted as requiring msg_size bytes of credit msg_size = 19 #set byte credit to finite amount (less than enough for all messages) session.message_flow(unit = 1, value = msg_size*5, destination = "c") #set infinite message credit session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c") #check that expected number were received q = session.incoming("c") msgs = [] for i in range(5): msg = q.get(timeout = 1) msgs.append(msg) self.assertDataEquals(session, msg, "abcdefgh") self.assertEmpty(q) #ack each message individually and check more are received for i in range(5): msg = msgs.pop() #TODO: there may be a nicer way of doing this session.receiver._completed.add(msg.id) session.channel.session_completed(session.receiver._completed) self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") self.assertEmpty(q) def test_subscribe_not_acquired(self): """ Test the not-acquired modes works as expected for a simple case """ session = self.session session.queue_declare(queue = "q", exclusive=True, auto_delete=True) for i in range(1, 6): session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i)) session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "a") session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") session.message_subscribe(queue = "q", destination = "b", acquire_mode = 1) session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "b") session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") for i in range(6, 11): session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i)) #both subscribers should see all messages qA = session.incoming("a") qB = session.incoming("b") for i in range(1, 11): for q in [qA, qB]: msg = q.get(timeout = 1) self.assertEquals("Message %s" % i, msg.body) session.receiver._completed.add(msg.id) session.channel.session_completed(session.receiver._completed) #messages should still be on the queue: self.assertEquals(10, session.queue_query(queue = "q").message_count) def test_acquire(self): """ Test explicit acquire function """ session = self.session session.queue_declare(queue = "q", exclusive=True, auto_delete=True) #use fanout for now: session.exchange_bind(exchange="amq.fanout", queue="q") session.message_transfer(destination="amq.fanout", message=Message("acquire me")) #session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me")) session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) session.message_flow(destination="a", unit=0, value=0xFFFFFFFF) session.message_flow(destination="a", unit=1, value=0xFFFFFFFF) msg = session.incoming("a").get(timeout = 1) self.assertEquals("acquire me", msg.body) #message should still be on the queue: self.assertEquals(1, session.queue_query(queue = "q").message_count) response = session.message_acquire(RangedSet(msg.id)) #check that we get notification (i.e. message_acquired) self.assertEquals(response.transfers, [msg.command_id, msg.command_id]) #message should have been removed from the queue: self.assertEquals(0, session.queue_query(queue = "q").message_count) session.message_accept(RangedSet(msg.id)) def test_release(self): """ Test explicit release function """ session = self.session session.queue_declare(queue = "q", exclusive=True, auto_delete=True) session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "release me")) session.message_subscribe(queue = "q", destination = "a") session.message_flow(destination="a", unit=0, value=0xFFFFFFFF) session.message_flow(destination="a", unit=1, value=0xFFFFFFFF) msg = session.incoming("a").get(timeout = 1) self.assertEquals("release me", msg.body) session.message_cancel(destination = "a") session.message_release(RangedSet(msg.id)) #message should not have been removed from the queue: self.assertEquals(1, session.queue_query(queue = "q").message_count) def test_release_ordering(self): """ Test order of released messages is as expected """ session = self.session session.queue_declare(queue = "q", exclusive=True, auto_delete=True) for i in range (1, 11): session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "released message %s" % (i))) session.message_subscribe(queue = "q", destination = "a") session.message_flow(unit = 0, value = 10, destination = "a") session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") queue = session.incoming("a") first = queue.get(timeout = 1) for i in range(2, 10): msg = queue.get(timeout = 1) self.assertEquals("released message %s" % (i), msg.body) last = queue.get(timeout = 1) self.assertEmpty(queue) released = RangedSet() released.add(first.id, last.id) session.message_release(released) #TODO: may want to clean this up... session.receiver._completed.add(first.id, last.id) session.channel.session_completed(session.receiver._completed) for i in range(1, 11): self.assertEquals("released message %s" % (i), queue.get(timeout = 1).body) def test_ranged_ack(self): """ Test acking of messages ranges """ session = self.session session.queue_declare(queue = "q", exclusive=True, auto_delete=True) delivery_properties = session.delivery_properties(routing_key="q") for i in range (1, 11): session.message_transfer(message=Message(delivery_properties, "message %s" % (i))) session.message_subscribe(queue = "q", destination = "a") session.message_flow(unit = 0, value = 10, destination = "a") session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") queue = session.incoming("a") for i in range (1, 11): self.assertEquals("message %s" % (i), queue.get(timeout = 1).body) self.assertEmpty(queue) #ack all but the third message (command id 2) session.execution_complete(cumulative_execution_mark=0xFFFFFFFF, ranged_execution_set=[0,1,3,6,7,7,8,9]) session.message_recover() self.assertEquals("message 3", queue.get(timeout = 1).content.body) self.assertEmpty(queue) def test_subscribe_not_acquired_2(self): session = self.session #publish some messages self.queue_declare(queue = "q", exclusive=True, auto_delete=True) for i in range(1, 11): session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) #consume some of them session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1) session.message_set_flow_mode(flow_mode = 0, destination = "a") session.message_flow(unit = 0, value = 5, destination = "a") session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") queue = session.incoming("a") for i in range(1, 6): msg = queue.get(timeout = 1) self.assertEquals("message-%d" % (i), msg.body) msg.complete() self.assertEmpty(queue) #now create a not-acquired subscriber session.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1) session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") #check it gets those not consumed queue = session.incoming("b") session.message_flow(unit = 0, value = 1, destination = "b") for i in range(6, 11): msg = queue.get(timeout = 1) self.assertEquals("message-%d" % (i), msg.body) msg.complete() session.message_flow(unit = 0, value = 1, destination = "b") self.assertEmpty(queue) #check all 'browsed' messages are still on the queue self.assertEqual(5, session.queue_query(queue="q").message_count) def test_subscribe_not_acquired_3(self): session = self.session #publish some messages self.queue_declare(queue = "q", exclusive=True, auto_delete=True) for i in range(1, 11): session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) #create a not-acquired subscriber session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1) session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") session.message_flow(unit = 0, value = 10, destination = "a") #browse through messages queue = session.incoming("a") for i in range(1, 11): msg = queue.get(timeout = 1) self.assertEquals("message-%d" % (i), msg.body) if (i % 2): #try to acquire every second message session.message_acquire([msg.command_id, msg.command_id]) #check that acquire succeeds response = session.control_queue.get(timeout=1) self.assertEquals(response.transfers, [msg.command_id, msg.command_id]) msg.complete() self.assertEmpty(queue) #create a second not-acquired subscriber session.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1) session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") session.message_flow(unit = 0, value = 1, destination = "b") #check it gets those not consumed queue = session.incoming("b") for i in [2,4,6,8,10]: msg = queue.get(timeout = 1) self.assertEquals("message-%d" % (i), msg.body) msg.complete() session.message_flow(unit = 0, value = 1, destination = "b") self.assertEmpty(queue) #check all 'browsed' messages are still on the queue self.assertEqual(5, session.queue_query(queue="q").message_count) def test_release_unacquired(self): session = self.session #create queue session.queue_declare(queue = "q", exclusive=True, auto_delete=True) #send message session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "my-message")) #create two 'browsers' session.message_subscribe(queue = "q", destination = "a", acquire_mode=1) session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") session.message_flow(unit = 0, value = 10, destination = "a") queueA = session.incoming("a") session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") session.message_flow(unit = 0, value = 10, destination = "b") queueB = session.incoming("b") #have each browser release the message msgA = queueA.get(timeout = 1) session.message_release(RangedSet(msgA.id)) msgB = queueB.get(timeout = 1) session.message_release(RangedSet(msgB.id)) #cancel browsers session.message_cancel(destination = "a") session.message_cancel(destination = "b") #create consumer session.message_subscribe(queue = "q", destination = "c") session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") session.message_flow(unit = 0, value = 10, destination = "c") queueC = session.incoming("c") #consume the message then ack it msgC = queueC.get(timeout = 1) session.message_accept(RangedSet(msgC.id)) #ensure there are no other messages self.assertEmpty(queueC) def test_no_size(self): self.queue_declare(queue = "q", exclusive=True, auto_delete=True) ch = self.session ch.message_transfer(content=SizelessContent(properties={'routing_key' : "q"}, body="message-body")) ch.message_subscribe(queue = "q", destination="d", confirm_mode = 0) ch.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "d") ch.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "d") queue = session.incoming("d") msg = queue.get(timeout = 3) self.assertEquals("message-body", msg.body) def assertDataEquals(self, session, msg, expected): self.assertEquals(expected, msg.body) def assertEmpty(self, queue): try: extra = queue.get(timeout=1) self.fail("Queue not empty, contains: " + extra.body) except Empty: None class SizelessContent(Content): def size(self): return None