diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/cpp_failing_0-10.txt | 13 | ||||
-rw-r--r-- | python/qpid/client.py | 2 | ||||
-rw-r--r-- | python/qpid/connection.py | 4 | ||||
-rw-r--r-- | python/qpid/testlib.py | 9 | ||||
-rw-r--r-- | python/tests_0-10/alternate-exchange.py | 25 | ||||
-rw-r--r-- | python/tests_0-10/basic.py | 396 | ||||
-rw-r--r-- | python/tests_0-10/broker.py | 23 | ||||
-rw-r--r-- | python/tests_0-10/dtx.py | 33 | ||||
-rw-r--r-- | python/tests_0-10/example.py | 9 | ||||
-rw-r--r-- | python/tests_0-10/exchange.py | 12 | ||||
-rw-r--r-- | python/tests_0-10/message.py | 410 | ||||
-rw-r--r-- | python/tests_0-10/queue.py | 34 | ||||
-rw-r--r-- | python/tests_0-10/testlib.py | 2 | ||||
-rw-r--r-- | python/tests_0-10/tx.py | 56 |
14 files changed, 165 insertions, 863 deletions
diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt index e68f942d67..97cf420717 100644 --- a/python/cpp_failing_0-10.txt +++ b/python/cpp_failing_0-10.txt @@ -1,15 +1,4 @@ +tests_0-10.alternate-exchange.AlternateExchangeTests.test_immediate tests_0-10.message.MessageTests.test_reject tests_0-10.basic.BasicTests.test_get -tests_0-10.message.MessageTests.test_get -tests_0-10.message.MessageTests.test_checkpoint -tests_0-10.message.MessageTests.test_empty_reference -tests_0-10.message.MessageTests.test_reference_already_opened_error -tests_0-10.message.MessageTests.test_reference_completion -tests_0-10.message.MessageTests.test_reference_large -tests_0-10.message.MessageTests.test_reference_multi_transfer -tests_0-10.message.MessageTests.test_reference_simple -tests_0-10.message.MessageTests.test_reference_unopened_on_append_error -tests_0-10.message.MessageTests.test_reference_unopened_on_close_error -tests_0-10.message.MessageTests.test_reference_unopened_on_transfer_error - diff --git a/python/qpid/client.py b/python/qpid/client.py index 3efd79c389..edcd1b8ad2 100644 --- a/python/qpid/client.py +++ b/python/qpid/client.py @@ -115,8 +115,6 @@ class ClientDelegate(Delegate): self.client.started.set() def message_transfer(self, ch, msg): - if isinstance(msg.body, ReferenceId): - msg.reference = ch.references.get(msg.body.id) self.client.queue(msg.destination).put(msg) def message_open(self, ch, msg): diff --git a/python/qpid/connection.py b/python/qpid/connection.py index 46b58e83b7..39bcde17df 100644 --- a/python/qpid/connection.py +++ b/python/qpid/connection.py @@ -239,9 +239,7 @@ class Response(Frame): return "[%s] Response(%s,%s,%s) %s" % (self.channel, self.id, self.request_id, self.batch_offset, self.method) def uses_struct_encoding(spec): - return (spec.major == 0 and - spec.minor == 10 and - "transitional" not in spec.file) + return (spec.major == 0 and spec.minor == 10) class Header(Frame): diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py index 0b2a1b78d6..28c07ba43a 100644 --- a/python/qpid/testlib.py +++ b/python/qpid/testlib.py @@ -280,18 +280,17 @@ class TestBase(unittest.TestCase): routing_key=routing_key) else: self.channel.message_transfer( - destination=exchange, body=body, - application_headers=properties, - routing_key=routing_key) + destination=exchange, + content=Content(body, properties={'application_headers':properties,'routing_key':routing_key})) msg = queue.get(timeout=1) if testrunner.use08spec(): self.assertEqual(body, msg.content.body) if (properties): self.assertEqual(properties, msg.content.properties) else: - self.assertEqual(body, msg.body) + self.assertEqual(body, msg.content.body) if (properties): - self.assertEqual(properties, msg.application_headers) + self.assertEqual(properties, msg.content['application_headers']) def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None): """ diff --git a/python/tests_0-10/alternate-exchange.py b/python/tests_0-10/alternate-exchange.py index a1c6151fca..d6ac62ccfe 100644 --- a/python/tests_0-10/alternate-exchange.py +++ b/python/tests_0-10/alternate-exchange.py @@ -50,17 +50,17 @@ class AlternateExchangeTests(TestBase): #publish to the primary exchange #...one message that makes it to the 'processed' queue: - channel.message_transfer(destination="primary", routing_key="my-key", body="Good") + channel.message_transfer(destination="primary", content=Content("Good", properties={'routing_key':"my-key"})) #...and one that does not: - channel.message_transfer(destination="primary", routing_key="unused-key", body="Bad") + channel.message_transfer(destination="primary", content=Content("Bad", properties={'routing_key':"unused-key"})) #delete the exchanges channel.exchange_delete(exchange="primary") channel.exchange_delete(exchange="secondary") #verify behaviour - self.assertEqual("Good", processed.get(timeout=1).body) - self.assertEqual("Bad", returned.get(timeout=1).body) + self.assertEqual("Good", processed.get(timeout=1).content.body) + self.assertEqual("Bad", returned.get(timeout=1).content.body) self.assertEmpty(processed) self.assertEmpty(returned) @@ -79,18 +79,18 @@ class AlternateExchangeTests(TestBase): #create a queue using the dlq as its alternate exchange: channel.queue_declare(queue="delete-me", alternate_exchange="dlq") #send it some messages: - channel.message_transfer(routing_key="delete-me", body="One") - channel.message_transfer(routing_key="delete-me", body="Two") - channel.message_transfer(routing_key="delete-me", body="Three") + channel.message_transfer(content=Content("One", properties={'routing_key':"delete-me"})) + channel.message_transfer(content=Content("Two", properties={'routing_key':"delete-me"})) + channel.message_transfer(content=Content("Three", properties={'routing_key':"delete-me"})) #delete it: channel.queue_delete(queue="delete-me") #delete the dlq exchange: channel.exchange_delete(exchange="dlq") #check the messages were delivered to the dlq: - self.assertEqual("One", dlq.get(timeout=1).body) - self.assertEqual("Two", dlq.get(timeout=1).body) - self.assertEqual("Three", dlq.get(timeout=1).body) + self.assertEqual("One", dlq.get(timeout=1).content.body) + self.assertEqual("Two", dlq.get(timeout=1).content.body) + self.assertEqual("Three", dlq.get(timeout=1).content.body) self.assertEmpty(dlq) @@ -109,10 +109,11 @@ class AlternateExchangeTests(TestBase): #create a queue using the dlq as its alternate exchange: channel.queue_declare(queue="no-consumers", alternate_exchange="dlq", exclusive=True) #send it some messages: - channel.message_transfer(routing_key="no-consumers", body="no one wants me", immediate=True) + #TODO: WE HAVE LOST THE IMMEDIATE FLAG; FIX THIS ONCE ITS BACK + channel.message_transfer(content=Content("no one wants me", properties={'routing_key':"no-consumers"})) #check the messages were delivered to the dlq: - self.assertEqual("no one wants me", dlq.get(timeout=1).body) + self.assertEqual("no one wants me", dlq.get(timeout=1).content.body) self.assertEmpty(dlq) #cleanup: diff --git a/python/tests_0-10/basic.py b/python/tests_0-10/basic.py deleted file mode 100644 index e7d22e00da..0000000000 --- a/python/tests_0-10/basic.py +++ /dev/null @@ -1,396 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase - -class BasicTests(TestBase): - """Tests for 'methods' on the amqp basic 'class'""" - - def test_consume_no_local(self): - """ - Test that the no_local flag is honoured in the consume method - """ - channel = self.channel - #setup, declare two queues: - channel.queue_declare(queue="test-queue-1a", exclusive=True) - channel.queue_declare(queue="test-queue-1b", exclusive=True) - #establish two consumers one of which excludes delivery of locally sent messages - channel.basic_consume(consumer_tag="local_included", queue="test-queue-1a") - channel.basic_consume(consumer_tag="local_excluded", queue="test-queue-1b", no_local=True) - - #send a message - channel.basic_publish(routing_key="test-queue-1a", content=Content("consume_no_local")) - channel.basic_publish(routing_key="test-queue-1b", content=Content("consume_no_local")) - - #check the queues of the two consumers - excluded = self.client.queue("local_excluded") - included = self.client.queue("local_included") - msg = included.get(timeout=1) - self.assertEqual("consume_no_local", msg.content.body) - try: - excluded.get(timeout=1) - self.fail("Received locally published message though no_local=true") - except Empty: None - - - def test_consume_exclusive(self): - """ - Test that the exclusive flag is honoured in the consume method - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-2", exclusive=True) - - #check that an exclusive consumer prevents other consumer being created: - channel.basic_consume(consumer_tag="first", queue="test-queue-2", exclusive=True) - try: - channel.basic_consume(consumer_tag="second", queue="test-queue-2") - self.fail("Expected consume request to fail due to previous exclusive consumer") - except Closed, e: - self.assertChannelException(403, e.args[0]) - - #open new channel and cleanup last consumer: - channel = self.client.channel(2) - channel.channel_open() - - #check that an exclusive consumer cannot be created if a consumer already exists: - channel.basic_consume(consumer_tag="first", queue="test-queue-2") - try: - channel.basic_consume(consumer_tag="second", queue="test-queue-2", exclusive=True) - self.fail("Expected exclusive consume request to fail due to previous consumer") - except Closed, e: - self.assertChannelException(403, e.args[0]) - - def test_consume_queue_errors(self): - """ - Test error conditions associated with the queue field of the consume method: - """ - channel = self.channel - try: - #queue specified but doesn't exist: - channel.basic_consume(queue="invalid-queue") - self.fail("Expected failure when consuming from non-existent queue") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - channel = self.client.channel(2) - channel.channel_open() - try: - #queue not specified and none previously declared for channel: - channel.basic_consume(queue="") - self.fail("Expected failure when consuming from unspecified queue") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - - def test_consume_unique_consumers(self): - """ - Ensure unique consumer tags are enforced - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-3", exclusive=True) - - #check that attempts to use duplicate tags are detected and prevented: - channel.basic_consume(consumer_tag="first", queue="test-queue-3") - try: - channel.basic_consume(consumer_tag="first", queue="test-queue-3") - self.fail("Expected consume request to fail due to non-unique tag") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - - def test_cancel(self): - """ - Test compliance of the basic.cancel method - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-4", exclusive=True) - channel.basic_consume(consumer_tag="my-consumer", queue="test-queue-4") - channel.basic_publish(routing_key="test-queue-4", content=Content("One")) - - myqueue = self.client.queue("my-consumer") - msg = myqueue.get(timeout=1) - self.assertEqual("One", msg.content.body) - - #cancel should stop messages being delivered - channel.basic_cancel(consumer_tag="my-consumer") - channel.basic_publish(routing_key="test-queue-4", content=Content("Two")) - try: - msg = myqueue.get(timeout=1) - self.fail("Got message after cancellation: " + msg) - except Empty: None - - #cancellation of non-existant consumers should be handled without error - channel.basic_cancel(consumer_tag="my-consumer") - channel.basic_cancel(consumer_tag="this-never-existed") - - - def test_ack(self): - """ - Test basic ack/recover behaviour - """ - channel = self.channel - channel.queue_declare(queue="test-ack-queue", exclusive=True) - - reply = channel.basic_consume(queue="test-ack-queue", no_ack=False) - queue = self.client.queue(reply.consumer_tag) - - channel.basic_publish(routing_key="test-ack-queue", content=Content("One")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Two")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Three")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Four")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Five")) - - msg1 = queue.get(timeout=1) - msg2 = queue.get(timeout=1) - msg3 = queue.get(timeout=1) - msg4 = queue.get(timeout=1) - msg5 = queue.get(timeout=1) - - self.assertEqual("One", msg1.content.body) - self.assertEqual("Two", msg2.content.body) - self.assertEqual("Three", msg3.content.body) - self.assertEqual("Four", msg4.content.body) - self.assertEqual("Five", msg5.content.body) - - channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two - channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four - - channel.basic_recover(requeue=False) - - msg3b = queue.get(timeout=1) - msg5b = queue.get(timeout=1) - - self.assertEqual("Three", msg3b.content.body) - self.assertEqual("Five", msg5b.content.body) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) - except Empty: None - - def test_recover_requeue(self): - """ - Test requeing on recovery - """ - channel = self.channel - channel.queue_declare(queue="test-requeue", exclusive=True) - - subscription = channel.basic_consume(queue="test-requeue", no_ack=False) - queue = self.client.queue(subscription.consumer_tag) - - channel.basic_publish(routing_key="test-requeue", content=Content("One")) - channel.basic_publish(routing_key="test-requeue", content=Content("Two")) - channel.basic_publish(routing_key="test-requeue", content=Content("Three")) - channel.basic_publish(routing_key="test-requeue", content=Content("Four")) - channel.basic_publish(routing_key="test-requeue", content=Content("Five")) - - msg1 = queue.get(timeout=1) - msg2 = queue.get(timeout=1) - msg3 = queue.get(timeout=1) - msg4 = queue.get(timeout=1) - msg5 = queue.get(timeout=1) - - self.assertEqual("One", msg1.content.body) - self.assertEqual("Two", msg2.content.body) - self.assertEqual("Three", msg3.content.body) - self.assertEqual("Four", msg4.content.body) - self.assertEqual("Five", msg5.content.body) - - channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two - channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four - - channel.basic_cancel(consumer_tag=subscription.consumer_tag) - - channel.basic_recover(requeue=True) - - subscription2 = channel.basic_consume(queue="test-requeue") - queue2 = self.client.queue(subscription2.consumer_tag) - - msg3b = queue2.get(timeout=1) - msg5b = queue2.get(timeout=1) - - self.assertEqual("Three", msg3b.content.body) - self.assertEqual("Five", msg5b.content.body) - - self.assertEqual(True, msg3b.redelivered) - self.assertEqual(True, msg5b.redelivered) - - try: - extra = queue2.get(timeout=1) - self.fail("Got unexpected message in second queue: " + extra.content.body) - except Empty: None - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message in original queue: " + extra.content.body) - except Empty: None - - - def test_qos_prefetch_count(self): - """ - Test that the prefetch count specified is honoured - """ - #setup: declare queue and subscribe - channel = self.channel - channel.queue_declare(queue="test-prefetch-count", exclusive=True) - subscription = channel.basic_consume(queue="test-prefetch-count", no_ack=False) - queue = self.client.queue(subscription.consumer_tag) - - #set prefetch to 5: - channel.basic_qos(prefetch_count=5) - - #publish 10 messages: - for i in range(1, 11): - channel.basic_publish(routing_key="test-prefetch-count", content=Content("Message %d" % i)) - - #only 5 messages should have been delivered: - for i in range(1, 6): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 6th message in original queue: " + extra.content.body) - except Empty: None - - #ack messages and check that the next set arrive ok: - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - for i in range(6, 11): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 11th message in original queue: " + extra.content.body) - except Empty: None - - - - def test_qos_prefetch_size(self): - """ - Test that the prefetch size specified is honoured - """ - #setup: declare queue and subscribe - channel = self.channel - channel.queue_declare(queue="test-prefetch-size", exclusive=True) - subscription = channel.basic_consume(queue="test-prefetch-size", no_ack=False) - queue = self.client.queue(subscription.consumer_tag) - - #set prefetch to 50 bytes (each message is 9 or 10 bytes): - channel.basic_qos(prefetch_size=50) - - #publish 10 messages: - for i in range(1, 11): - channel.basic_publish(routing_key="test-prefetch-size", content=Content("Message %d" % i)) - - #only 5 messages should have been delivered (i.e. 45 bytes worth): - for i in range(1, 6): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 6th message in original queue: " + extra.content.body) - except Empty: None - - #ack messages and check that the next set arrive ok: - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - for i in range(6, 11): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 11th message in original queue: " + extra.content.body) - except Empty: None - - #make sure that a single oversized message still gets delivered - large = "abcdefghijklmnopqrstuvwxyz" - large = large + "-" + large; - channel.basic_publish(routing_key="test-prefetch-size", content=Content(large)) - msg = queue.get(timeout=1) - self.assertEqual(large, msg.content.body) - - def test_get(self): - """ - Test basic_get method - """ - channel = self.channel - channel.queue_declare(queue="test-get", exclusive=True) - - #publish some messages (no_ack=True) - for i in range(1, 11): - channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) - - #use basic_get to read back the messages, and check that we get an empty at the end - for i in range(1, 11): - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_ok") - self.assertEqual("Message %d" % i, reply.content.body) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_empty") - - #repeat for no_ack=False - for i in range(11, 21): - channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) - - for i in range(11, 21): - reply = channel.basic_get(no_ack=False) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_ok") - self.assertEqual("Message %d" % i, reply.content.body) - if(i == 13): - channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True) - if(i in [15, 17, 19]): - channel.basic_ack(delivery_tag=reply.delivery_tag) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_empty") - - #recover(requeue=True) - channel.basic_recover(requeue=True) - - #get the unacked messages again (14, 16, 18, 20) - for i in [14, 16, 18, 20]: - reply = channel.basic_get(no_ack=False) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_ok") - self.assertEqual("Message %d" % i, reply.content.body) - channel.basic_ack(delivery_tag=reply.delivery_tag) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_empty") - - channel.basic_recover(requeue=True) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_empty") diff --git a/python/tests_0-10/broker.py b/python/tests_0-10/broker.py index 647f5d4fa5..0eb71287ec 100644 --- a/python/tests_0-10/broker.py +++ b/python/tests_0-10/broker.py @@ -37,19 +37,19 @@ class BrokerTests(TestBase): ctag = "tag1" ch.message_subscribe(queue = "myqueue", destination = ctag, confirm_mode = 0) body = "test no-ack" - ch.message_transfer(routing_key = "myqueue", body = body) + ch.message_transfer(content = Content(body, properties = {"routing_key" : "myqueue"})) msg = self.client.queue(ctag).get(timeout = 5) - self.assert_(msg.body == body) + self.assert_(msg.content.body == body) # Acknowledging consumer self.queue_declare(ch, queue = "otherqueue") ctag = "tag2" ch.message_subscribe(queue = "otherqueue", destination = ctag, confirm_mode = 1) body = "test ack" - ch.message_transfer(routing_key = "otherqueue", body = body) + ch.message_transfer(content = Content(body, properties = {"routing_key" : "otherqueue"})) msg = self.client.queue(ctag).get(timeout = 5) msg.complete() - self.assert_(msg.body == body) + self.assert_(msg.content.body == body) def test_simple_delivery_immediate(self): """ @@ -64,9 +64,9 @@ class BrokerTests(TestBase): queue = self.client.queue(consumer_tag) body = "Immediate Delivery" - channel.message_transfer(destination="test-exchange", routing_key="key", body=body, immediate=True) + channel.message_transfer(destination="test-exchange", content = Content(body, properties = {"routing_key" : "key"})) msg = queue.get(timeout=5) - self.assert_(msg.body == body) + self.assert_(msg.content.body == body) # TODO: Ensure we fail if immediate=True and there's no consumer. @@ -81,13 +81,13 @@ class BrokerTests(TestBase): self.queue_declare(channel, queue="test-queue") channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") body = "Queued Delivery" - channel.message_transfer(destination="test-exchange", routing_key="key", body=body) + channel.message_transfer(destination="test-exchange", content = Content(body, properties = {"routing_key" : "key"})) consumer_tag = "tag1" channel.message_subscribe(queue="test-queue", destination=consumer_tag, confirm_mode = 0) queue = self.client.queue(consumer_tag) msg = queue.get(timeout=5) - self.assert_(msg.body == body) + self.assert_(msg.content.body == body) def test_invalid_channel(self): channel = self.client.channel(200) @@ -114,8 +114,9 @@ class BrokerTests(TestBase): channel.message_subscribe(destination="my-tag", queue="flow_test_queue") incoming = self.client.queue("my-tag") - channel.channel_flow(active=False) - channel.message_transfer(routing_key="flow_test_queue", body="abcdefghijklmnopqrstuvwxyz") + channel.channel_flow(active=False) + c = Content("abcdefghijklmnopqrstuvwxyz", properties = {"routing_key" : "flow_test_queue"}) + channel.message_transfer(content = c) try: incoming.get(timeout=1) self.fail("Received message when flow turned off.") @@ -123,4 +124,4 @@ class BrokerTests(TestBase): channel.channel_flow(active=True) msg = incoming.get(timeout=1) - self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.body) + self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.content.body) diff --git a/python/tests_0-10/dtx.py b/python/tests_0-10/dtx.py index a5b53ac65b..ea587f5998 100644 --- a/python/tests_0-10/dtx.py +++ b/python/tests_0-10/dtx.py @@ -248,8 +248,8 @@ class DtxTests(TestBase): #setup channel1.queue_declare(queue="one", exclusive=True) channel1.queue_declare(queue="two", exclusive=True) - channel1.message_transfer(routing_key="one", message_id="a", body="DtxMessage") - channel1.message_transfer(routing_key="two", message_id="b", body="DtxMessage") + channel1.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage")) + channel1.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage")) #create a xid tx = self.xid("dummy") @@ -284,8 +284,8 @@ class DtxTests(TestBase): #setup channel.queue_declare(queue="one", exclusive=True) channel.queue_declare(queue="two", exclusive=True) - channel.message_transfer(routing_key="one", message_id="a", body="DtxMessage") - channel.message_transfer(routing_key="two", message_id="b", body="DtxMessage") + channel.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage")) + channel.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage")) tx = self.xid("dummy") @@ -358,17 +358,17 @@ class DtxTests(TestBase): channel.dtx_demarcation_select() tx = self.xid("dummy") channel.dtx_demarcation_start(xid=tx) - channel.message_transfer(routing_key="tx-queue", message_id="one", body="DtxMessage") + channel.message_transfer(content=Content(properties={'routing_key':"tx-queue", 'message_id':"one"}, body="DtxMessage")) channel.dtx_demarcation_end(xid=tx) #now that association with txn is ended, publish another message - channel.message_transfer(routing_key="tx-queue", message_id="two", body="DtxMessage") + channel.message_transfer(content=Content(properties={'routing_key':"tx-queue", 'message_id':"two"}, body="DtxMessage")) #check the second message is available, but not the first self.assertMessageCount(1, "tx-queue") channel.message_subscribe(queue="tx-queue", destination="results", confirm_mode=1) msg = self.client.queue("results").get(timeout=1) - self.assertEqual("two", msg.message_id) + self.assertEqual("two", msg.content['message_id']) channel.message_cancel(destination="results") #ack the message then close the channel msg.complete() @@ -393,7 +393,7 @@ class DtxTests(TestBase): tester.dtx_demarcation_select() tx = self.xid("dummy") tester.dtx_demarcation_start(xid=tx) - tester.message_transfer(routing_key="dummy", body="whatever") + tester.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever")) tester.dtx_demarcation_end(xid=tx) tester.dtx_coordination_prepare(xid=tx) failed = False @@ -427,7 +427,7 @@ class DtxTests(TestBase): tester.dtx_demarcation_select() tx = self.xid("dummy") tester.dtx_demarcation_start(xid=tx) - tester.message_transfer(routing_key="dummy", body="whatever") + tester.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever")) tester.dtx_demarcation_end(xid=tx) failed = False try: @@ -456,14 +456,14 @@ class DtxTests(TestBase): #setup: channel2.queue_declare(queue="dummy", exclusive=True) - channel2.message_transfer(routing_key="dummy", body="whatever") + channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever")) tx = self.xid("dummy") channel2.dtx_demarcation_select() channel2.dtx_demarcation_start(xid=tx) channel2.message_get(queue="dummy", destination="dummy") self.client.queue("dummy").get(timeout=1).complete() - channel2.message_transfer(routing_key="dummy", body="whatever") + channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever")) channel2.channel_close() self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).status) @@ -497,7 +497,7 @@ class DtxTests(TestBase): tx = self.xid("dummy") channel.queue_declare(queue="queue-a", exclusive=True) channel.queue_declare(queue="queue-b", exclusive=True) - channel.message_transfer(routing_key="queue-a", message_id="timeout", body="DtxMessage") + channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':"timeout"}, body="DtxMessage")) channel.dtx_demarcation_select() channel.dtx_demarcation_start(xid=tx) @@ -527,7 +527,7 @@ class DtxTests(TestBase): for i in range(1, 10): tx = self.xid("tx%s" % (i)) channel.dtx_demarcation_start(xid=tx) - channel.message_transfer(routing_key="dummy", body="message%s" % (i)) + channel.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="message%s" % (i))) channel.dtx_demarcation_end(xid=tx) if i in [2, 5, 6, 8]: channel.dtx_coordination_prepare(xid=tx) @@ -575,7 +575,7 @@ class DtxTests(TestBase): channel.queue_declare(queue="queue-a", exclusive=True) channel.queue_declare(queue="queue-b", exclusive=True) #put message with specified id on one queue: - channel.message_transfer(routing_key="queue-a", message_id=id, body="DtxMessage") + channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':id}, body="DtxMessage")) #start the transaction: channel.dtx_demarcation_select() @@ -594,12 +594,13 @@ class DtxTests(TestBase): msg.complete(); #re-publish to dest - channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body) + channel.message_transfer(content=Content(properties={'routing_key':dest, 'message_id':msg.content['message_id']}, + body=msg.content.body)) def assertMessageCount(self, expected, queue): self.assertEqual(expected, self.channel.queue_query(queue=queue).message_count) def assertMessageId(self, expected, queue): self.channel.message_subscribe(queue=queue, destination="results") - self.assertEqual(expected, self.client.queue("results").get(timeout=1).message_id) + self.assertEqual(expected, self.client.queue("results").get(timeout=1).content['message_id']) self.channel.message_cancel(destination="results") diff --git a/python/tests_0-10/example.py b/python/tests_0-10/example.py index e4c80951ac..e3e2c3b095 100644 --- a/python/tests_0-10/example.py +++ b/python/tests_0-10/example.py @@ -76,10 +76,9 @@ class ExampleTest (TestBase): # Now lets publish a message and see if our consumer gets it. To do # this we need to import the Content class. - body = "Hello World!" - channel.message_transfer(destination="test", - routing_key="key", - body = body) + sent = Content("Hello World!") + sent["routing_key"] = "key" + channel.message_transfer(destination="test", content=sent) # Now we'll wait for the message to arrive. We can use the timeout # argument in case the server hangs. By default queue.get() will wait @@ -87,7 +86,7 @@ class ExampleTest (TestBase): msg = queue.get(timeout=10) # And check that we got the right response with assertEqual - self.assertEqual(body, msg.body) + self.assertEqual(sent.body, msg.content.body) # Now acknowledge the message. msg.complete() diff --git a/python/tests_0-10/exchange.py b/python/tests_0-10/exchange.py index 3a47ffff8c..4137eb7a51 100644 --- a/python/tests_0-10/exchange.py +++ b/python/tests_0-10/exchange.py @@ -61,10 +61,10 @@ class StandardExchangeVerifier: self.assertPublishGet(q, ex, "a.x.b.x") self.assertPublishGet(q, ex, "a.x.x.b.x") # Shouldn't match - self.channel.message_transfer(destination=ex, routing_key="a.b", body="") - self.channel.message_transfer(destination=ex, routing_key="a.b.x.y", body="") - self.channel.message_transfer(destination=ex, routing_key="x.a.b.x", body="") - self.channel.message_transfer(destination=ex, routing_key="a.b", body="") + self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"})) + self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b.x.y"})) + self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"x.a.b.x"})) + self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"})) self.assert_(q.empty()) def verifyHeadersExchange(self, ex): @@ -74,7 +74,7 @@ class StandardExchangeVerifier: q = self.consume("q") headers = {"name":"fred", "age":3} self.assertPublishGet(q, exchange=ex, properties=headers) - self.channel.message_transfer(destination=ex, body="") # No headers, won't deliver + self.channel.message_transfer(destination=ex) # No headers, won't deliver self.assertEmpty(q); @@ -275,7 +275,7 @@ class HeadersExchangeTests(TestBase): self.assertPublishGet(self.q, exchange="amq.match", properties=headers) def myBasicPublish(self, headers): - self.channel.message_transfer(destination="amq.match", body="foobar", application_headers=headers) + self.channel.message_transfer(destination="amq.match", content=Content("foobar", properties={'application_headers':headers})) def testMatchAll(self): self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3}) diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py index 6cf2f3ef89..f08f437a65 100644 --- a/python/tests_0-10/message.py +++ b/python/tests_0-10/message.py @@ -38,14 +38,14 @@ class MessageTests(TestBase): channel.message_subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True) #send a message - channel.message_transfer(routing_key="test-queue-1a", body="consume_no_local") - channel.message_transfer(routing_key="test-queue-1b", body="consume_no_local") + channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1b"}, body="consume_no_local")) #check the queues of the two consumers excluded = self.client.queue("local_excluded") included = self.client.queue("local_included") msg = included.get(timeout=1) - self.assertEqual("consume_no_local", msg.body) + self.assertEqual("consume_no_local", msg.content.body) try: excluded.get(timeout=1) self.fail("Received locally published message though no_local=true") @@ -125,14 +125,14 @@ class MessageTests(TestBase): #setup, declare a queue: channel.queue_declare(queue="test-queue-4", exclusive=True) channel.message_subscribe(destination="my-consumer", queue="test-queue-4") - channel.message_transfer(routing_key="test-queue-4", body="One") + channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="One")) #cancel should stop messages being delivered channel.message_cancel(destination="my-consumer") - channel.message_transfer(routing_key="test-queue-4", body="Two") + channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="Two")) myqueue = self.client.queue("my-consumer") msg = myqueue.get(timeout=1) - self.assertEqual("One", msg.body) + self.assertEqual("One", msg.content.body) try: msg = myqueue.get(timeout=1) self.fail("Got message after cancellation: " + msg) @@ -153,11 +153,11 @@ class MessageTests(TestBase): channel.message_subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") - channel.message_transfer(routing_key="test-ack-queue", body="One") - channel.message_transfer(routing_key="test-ack-queue", body="Two") - channel.message_transfer(routing_key="test-ack-queue", body="Three") - channel.message_transfer(routing_key="test-ack-queue", body="Four") - channel.message_transfer(routing_key="test-ack-queue", body="Five") + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="One")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Two")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Three")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Four")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Five")) msg1 = queue.get(timeout=1) msg2 = queue.get(timeout=1) @@ -165,11 +165,11 @@ class MessageTests(TestBase): msg4 = queue.get(timeout=1) msg5 = queue.get(timeout=1) - self.assertEqual("One", msg1.body) - self.assertEqual("Two", msg2.body) - self.assertEqual("Three", msg3.body) - self.assertEqual("Four", msg4.body) - self.assertEqual("Five", msg5.body) + self.assertEqual("One", msg1.content.body) + self.assertEqual("Two", msg2.content.body) + self.assertEqual("Three", msg3.content.body) + self.assertEqual("Four", msg4.content.body) + self.assertEqual("Five", msg5.content.body) msg2.complete(cumulative=True)#One and Two msg4.complete(cumulative=False) @@ -179,12 +179,12 @@ class MessageTests(TestBase): msg3b = queue.get(timeout=1) msg5b = queue.get(timeout=1) - self.assertEqual("Three", msg3b.body) - self.assertEqual("Five", msg5b.body) + self.assertEqual("Three", msg3b.content.body) + self.assertEqual("Five", msg5b.content.body) try: extra = queue.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) + self.fail("Got unexpected message: " + extra.content.body) except Empty: None def test_recover_requeue(self): @@ -197,11 +197,11 @@ class MessageTests(TestBase): channel.message_subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") - channel.message_transfer(routing_key="test-requeue", body="One") - channel.message_transfer(routing_key="test-requeue", body="Two") - channel.message_transfer(routing_key="test-requeue", body="Three") - channel.message_transfer(routing_key="test-requeue", body="Four") - channel.message_transfer(routing_key="test-requeue", body="Five") + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="One")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Two")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Three")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Four")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Five")) msg1 = queue.get(timeout=1) msg2 = queue.get(timeout=1) @@ -209,11 +209,11 @@ class MessageTests(TestBase): msg4 = queue.get(timeout=1) msg5 = queue.get(timeout=1) - self.assertEqual("One", msg1.body) - self.assertEqual("Two", msg2.body) - self.assertEqual("Three", msg3.body) - self.assertEqual("Four", msg4.body) - self.assertEqual("Five", msg5.body) + self.assertEqual("One", msg1.content.body) + self.assertEqual("Two", msg2.content.body) + self.assertEqual("Three", msg3.content.body) + self.assertEqual("Four", msg4.content.body) + self.assertEqual("Five", msg5.content.body) msg2.complete(cumulative=True) #One and Two msg4.complete(cumulative=False) #Four @@ -221,7 +221,7 @@ class MessageTests(TestBase): channel.message_cancel(destination="consumer_tag") #publish a new message - channel.message_transfer(routing_key="test-requeue", body="Six") + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Six")) #requeue unacked messages (Three and Five) channel.message_recover(requeue=True) @@ -231,21 +231,21 @@ class MessageTests(TestBase): msg3b = queue2.get(timeout=1) msg5b = queue2.get(timeout=1) - self.assertEqual("Three", msg3b.body) - self.assertEqual("Five", msg5b.body) + self.assertEqual("Three", msg3b.content.body) + self.assertEqual("Five", msg5b.content.body) - self.assertEqual(True, msg3b.redelivered) - self.assertEqual(True, msg5b.redelivered) + self.assertEqual(True, msg3b.content['redelivered']) + self.assertEqual(True, msg5b.content['redelivered']) - self.assertEqual("Six", queue2.get(timeout=1).body) + self.assertEqual("Six", queue2.get(timeout=1).content.body) try: extra = queue2.get(timeout=1) - self.fail("Got unexpected message in second queue: " + extra.body) + self.fail("Got unexpected message in second queue: " + extra.content.body) except Empty: None try: extra = queue.get(timeout=1) - self.fail("Got unexpected message in original queue: " + extra.body) + self.fail("Got unexpected message in original queue: " + extra.content.body) except Empty: None @@ -264,15 +264,15 @@ class MessageTests(TestBase): #publish 10 messages: for i in range(1, 11): - channel.message_transfer(routing_key="test-prefetch-count", body="Message %d" % i) + channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-count"}, body="Message %d" % i)) #only 5 messages should have been delivered: for i in range(1, 6): msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) + self.assertEqual("Message %d" % i, msg.content.body) try: extra = queue.get(timeout=1) - self.fail("Got unexpected 6th message in original queue: " + extra.body) + self.fail("Got unexpected 6th message in original queue: " + extra.content.body) except Empty: None #ack messages and check that the next set arrive ok: @@ -280,13 +280,13 @@ class MessageTests(TestBase): for i in range(6, 11): msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) + self.assertEqual("Message %d" % i, msg.content.body) msg.complete() try: extra = queue.get(timeout=1) - self.fail("Got unexpected 11th message in original queue: " + extra.body) + self.fail("Got unexpected 11th message in original queue: " + extra.content.body) except Empty: None @@ -306,16 +306,16 @@ class MessageTests(TestBase): #publish 10 messages: for i in range(1, 11): - channel.message_transfer(routing_key="test-prefetch-size", body="Message %d" % i) + channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body="Message %d" % i)) #only 5 messages should have been delivered (i.e. 45 bytes worth): for i in range(1, 6): msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) + self.assertEqual("Message %d" % i, msg.content.body) try: extra = queue.get(timeout=1) - self.fail("Got unexpected 6th message in original queue: " + extra.body) + self.fail("Got unexpected 6th message in original queue: " + extra.content.body) except Empty: None #ack messages and check that the next set arrive ok: @@ -323,328 +323,38 @@ class MessageTests(TestBase): for i in range(6, 11): msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) + self.assertEqual("Message %d" % i, msg.content.body) msg.complete() try: extra = queue.get(timeout=1) - self.fail("Got unexpected 11th message in original queue: " + extra.body) + self.fail("Got unexpected 11th message in original queue: " + extra.content.body) except Empty: None #make sure that a single oversized message still gets delivered large = "abcdefghijklmnopqrstuvwxyz" large = large + "-" + large; - channel.message_transfer(routing_key="test-prefetch-size", body=large) + channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body=large)) msg = queue.get(timeout=1) - self.assertEqual(large, msg.body) + self.assertEqual(large, msg.content.body) - def test_get(self): - """ - Test message_get method - """ - channel = self.channel - channel.queue_declare(queue="test-get", exclusive=True) - - #publish some messages (no_ack=True) - for i in range(1, 11): - channel.message_transfer(routing_key="test-get", body="Message %d" % i) - - #use message_get to read back the messages, and check that we get an empty at the end - for i in range(1, 11): - tag = "queue %d" % i - reply = channel.message_get(no_ack=True, queue="test-get", destination=tag) - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "ok") - self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body) - - reply = channel.message_get(no_ack=True, queue="test-get") - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "empty") - - #repeat for confirm_mode=1 - for i in range(11, 21): - channel.message_transfer(routing_key="test-get", body="Message %d" % i) - - for i in range(11, 21): - tag = "queue %d" % i - reply = channel.message_get(confirm_mode=1, queue="test-get", destination=tag) - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "ok") - msg = self.client.queue(tag).get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - - if (i==13): - msg.complete()#11, 12 & 13 - if(i in [15, 17, 19]): - msg.complete(cumulative=False) - - reply = channel.message_get(no_ack=True, queue="test-get") - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "empty") - - #recover(requeue=True) - channel.message_recover(requeue=True) - - #get the unacked messages again (14, 16, 18, 20) - for i in [14, 16, 18, 20]: - tag = "queue %d" % i - reply = channel.message_get(confirm_mode=1, queue="test-get", destination=tag) - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "ok") - msg = self.client.queue(tag).get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - msg.complete() - - reply = channel.message_get(no_ack=True, queue="test-get") - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "empty") - - channel.message_recover(requeue=True) - - reply = channel.message_get(no_ack=True, queue="test-get") - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "empty") - - def test_reference_simple(self): - """ - Test basic ability to handle references - """ - channel = self.channel - channel.queue_declare(queue="ref_queue", exclusive=True) - channel.message_subscribe(queue="ref_queue", destination="c1") - queue = self.client.queue("c1") - - refId = "myref" - channel.message_open(reference=refId) - channel.message_append(reference=refId, bytes="abcd") - channel.synchronous = False - ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId)) - channel.synchronous = True - - channel.message_append(reference=refId, bytes="efgh") - channel.message_append(reference=refId, bytes="ijkl") - channel.message_close(reference=refId) - - #first, wait for the ok for the transfer - ack.get_response(timeout=1) - - self.assertDataEquals(channel, queue.get(timeout=1), "abcdefghijkl") - - - def test_reference_large(self): - """ - Test basic ability to handle references whose content exceeds max frame size - """ - channel = self.channel - self.queue_declare(queue="ref_queue") - - #generate a big data string (> max frame size of consumer): - data = "0123456789" - for i in range(0, 10): - data += data - #send it inline - channel.synchronous = False - ack = channel.message_transfer(routing_key="ref_queue", body=data) - channel.synchronous = True - #first, wait for the ok for the transfer - ack.get_response(timeout=1) - - #create a new connection for consumer, with specific max frame size (< data) - other = self.connect(tune_params={"channel_max":10, "frame_max":5120, "heartbeat":0}) - ch2 = other.channel(1) - ch2.channel_open() - ch2.message_subscribe(queue="ref_queue", destination="c1") - queue = other.queue("c1") - - msg = queue.get(timeout=1) - self.assertTrue(isinstance(msg.body, ReferenceId)) - self.assertTrue(msg.reference) - self.assertEquals(data, msg.reference.get_complete()) - - def test_reference_completion(self): - """ - Test that reference transfer are not deemed complete until - closed (therefore are not acked or routed until that point) - """ - channel = self.channel - channel.queue_declare(queue="ref_queue", exclusive=True) - channel.message_subscribe(queue="ref_queue", destination="c1") - queue = self.client.queue("c1") - - refId = "myref" - channel.message_open(reference=refId) - channel.message_append(reference=refId, bytes="abcd") - channel.synchronous = False - ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId)) - channel.synchronous = True - - try: - msg = queue.get(timeout=1) - self.fail("Got unexpected message on queue: " + msg) - except Empty: None - - self.assertTrue(not ack.is_complete()) - - channel.message_close(reference=refId) - - #first, wait for the ok for the transfer - ack.get_response(timeout=1) - - self.assertDataEquals(channel, queue.get(timeout=1), "abcd") - def test_reference_multi_transfer(self): - """ - Test that multiple transfer requests for the same reference are - correctly handled. - """ - channel = self.channel - #declare and consume from two queues - channel.queue_declare(queue="q-one", exclusive=True) - channel.queue_declare(queue="q-two", exclusive=True) - channel.message_subscribe(queue="q-one", destination="q-one") - channel.message_subscribe(queue="q-two", destination="q-two") - queue1 = self.client.queue("q-one") - queue2 = self.client.queue("q-two") - - #transfer a single ref to both queues (in separate commands) - channel.message_open(reference="my-ref") - channel.synchronous = False - ack1 = channel.message_transfer(routing_key="q-one", body=ReferenceId("my-ref")) - channel.message_append(reference="my-ref", bytes="my data") - ack2 = channel.message_transfer(routing_key="q-two", body=ReferenceId("my-ref")) - channel.synchronous = True - channel.message_close(reference="my-ref") - - #check that both queues have the message - self.assertDataEquals(channel, queue1.get(timeout=1), "my data") - self.assertDataEquals(channel, queue2.get(timeout=1), "my data") - self.assertEmpty(queue1) - self.assertEmpty(queue2) - - #transfer a single ref to the same queue twice (in separate commands) - channel.message_open(reference="my-ref") - channel.synchronous = False - ack1 = channel.message_transfer(routing_key="q-one", message_id="abc", body=ReferenceId("my-ref")) - channel.message_append(reference="my-ref", bytes="second message") - ack2 = channel.message_transfer(routing_key="q-one", message_id="xyz", body=ReferenceId("my-ref")) - channel.synchronous = True - channel.message_close(reference="my-ref") - - msg1 = queue1.get(timeout=1) - msg2 = queue1.get(timeout=1) - #order is undefined - if msg1.message_id == "abc": - self.assertEquals(msg2.message_id, "xyz") - else: - self.assertEquals(msg1.message_id, "xyz") - self.assertEquals(msg2.message_id, "abc") - - #would be legal for the incoming messages to be transfered - #inline or by reference in any combination - - if isinstance(msg1.body, ReferenceId): - self.assertEquals("second message", msg1.reference.get_complete()) - if isinstance(msg2.body, ReferenceId): - if msg1.body != msg2.body: - self.assertEquals("second message", msg2.reference.get_complete()) - #else ok, as same ref as msg1 - else: - self.assertEquals("second message", msg1.body) - if isinstance(msg2.body, ReferenceId): - self.assertEquals("second message", msg2.reference.get_complete()) - else: - self.assertEquals("second message", msg2.body) - - self.assertEmpty(queue1) - - def test_reference_unopened_on_append_error(self): - channel = self.channel - try: - channel.message_append(reference="unopened") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_reference_unopened_on_close_error(self): - channel = self.channel - try: - channel.message_close(reference="unopened") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_reference_unopened_on_transfer_error(self): - channel = self.channel - try: - channel.message_transfer(body=ReferenceId("unopened")) - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_reference_already_opened_error(self): - channel = self.channel - channel.message_open(reference="a") - try: - channel.message_open(reference="a") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_empty_reference(self): - channel = self.channel - channel.queue_declare(queue="ref_queue", exclusive=True) - channel.message_subscribe(queue="ref_queue", destination="c1") - queue = self.client.queue("c1") - - refId = "myref" - channel.message_open(reference=refId) - channel.synchronous = False - ack = channel.message_transfer(routing_key="ref_queue", message_id="empty-msg", body=ReferenceId(refId)) - channel.synchronous = True - channel.message_close(reference=refId) - - #first, wait for the ok for the transfer - ack.get_response(timeout=1) - - msg = queue.get(timeout=1) - self.assertEquals(msg.message_id, "empty-msg") - self.assertDataEquals(channel, msg, "") def test_reject(self): channel = self.channel channel.queue_declare(queue = "q", exclusive=True) channel.message_subscribe(queue = "q", destination = "consumer") - channel.message_transfer(routing_key = "q", body="blah, blah") + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body="blah, blah")) msg = self.client.queue("consumer").get(timeout = 1) - self.assertEquals(msg.body, "blah, blah") + self.assertEquals(msg.content.body, "blah, blah") channel.message_cancel(destination = "consumer") msg.reject() channel.message_subscribe(queue = "q", destination = "checker") msg = self.client.queue("checker").get(timeout = 1) - self.assertEquals(msg.body, "blah, blah") - - def test_checkpoint(self): - channel = self.channel - channel.queue_declare(queue = "q", exclusive=True) - - channel.message_open(reference="my-ref") - channel.message_append(reference="my-ref", bytes="abcdefgh") - channel.message_append(reference="my-ref", bytes="ijklmnop") - channel.message_checkpoint(reference="my-ref", identifier="my-checkpoint") - channel.channel_close() - - channel = self.client.channel(2) - channel.channel_open() - channel.message_subscribe(queue = "q", destination = "consumer") - offset = channel.message_resume(reference="my-ref", identifier="my-checkpoint").value - self.assertTrue(offset<=16) - channel.message_append(reference="my-ref", bytes="qrstuvwxyz") - channel.synchronous = False - channel.message_transfer(routing_key="q-one", message_id="abcd", body=ReferenceId("my-ref")) - channel.synchronous = True - channel.message_close(reference="my-ref") - - self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz") - self.assertEmpty(self.client.queue("consumer")) + self.assertEquals(msg.content.body, "blah, blah") def test_credit_flow_messages(self): """ @@ -660,7 +370,7 @@ class MessageTests(TestBase): channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): - channel.message_transfer(routing_key = "q", body = "Message %d" % i) + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) #set message credit to finite amount (less than enough for all messages) channel.message_flow(unit = 0, value = 5, destination = "c") @@ -692,7 +402,7 @@ class MessageTests(TestBase): channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): - channel.message_transfer(routing_key = "q", body = "abcdefgh") + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) #each message is currently interpreted as requiring 75 bytes of credit #set byte credit to finite amount (less than enough for all messages) @@ -726,7 +436,7 @@ class MessageTests(TestBase): channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): - channel.message_transfer(routing_key = "q", body = "Message %d" % i) + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) #set message credit to finite amount (less than enough for all messages) channel.message_flow(unit = 0, value = 5, destination = "c") @@ -760,7 +470,7 @@ class MessageTests(TestBase): channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): - channel.message_transfer(routing_key = "q", body = "abcdefgh") + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) #each message is currently interpreted as requiring 75 bytes of credit #set byte credit to finite amount (less than enough for all messages) @@ -783,11 +493,5 @@ class MessageTests(TestBase): self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") self.assertEmpty(q) - - def assertDataEquals(self, channel, msg, expected): - if isinstance(msg.body, ReferenceId): - data = msg.reference.get_complete() - else: - data = msg.body - self.assertEquals(expected, data) + self.assertEquals(expected, msg.content.body) diff --git a/python/tests_0-10/queue.py b/python/tests_0-10/queue.py index 8d99c50d32..05fa1aebc6 100644 --- a/python/tests_0-10/queue.py +++ b/python/tests_0-10/queue.py @@ -33,9 +33,9 @@ class QueueTests(TestBase): channel.exchange_declare(exchange="test-exchange", type="direct") channel.queue_declare(queue="test-queue", exclusive=True) channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") - channel.message_transfer(destination="test-exchange", routing_key="key", body="one") - channel.message_transfer(destination="test-exchange", routing_key="key", body="two") - channel.message_transfer(destination="test-exchange", routing_key="key", body="three") + channel.message_transfer(destination="test-exchange", content=Content("one", properties={'routing_key':"key"})) + channel.message_transfer(destination="test-exchange", content=Content("two", properties={'routing_key':"key"})) + channel.message_transfer(destination="test-exchange", content=Content("three", properties={'routing_key':"key"})) #check that the queue now reports 3 messages: channel.queue_declare(queue="test-queue") @@ -48,11 +48,11 @@ class QueueTests(TestBase): self.assertEqual(0, reply.message_count) #send a further message and consume it, ensuring that the other messages are really gone - channel.message_transfer(destination="test-exchange", routing_key="key", body="four") + channel.message_transfer(destination="test-exchange", content=Content("four", properties={'routing_key':"key"})) channel.message_subscribe(queue="test-queue", destination="tag") queue = self.client.queue("tag") msg = queue.get(timeout=1) - self.assertEqual("four", msg.body) + self.assertEqual("four", msg.content.body) #check error conditions (use new channels): channel = self.client.channel(2) @@ -179,23 +179,25 @@ class QueueTests(TestBase): channel.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args) #send a message that will match both bindings - channel.message_transfer(destination=exchange, routing_key=routing_key, application_headers=headers, body="one") + channel.message_transfer(destination=exchange, + content=Content("one", properties={'routing_key':routing_key, 'application_headers':headers})) #unbind first queue channel.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) #send another message - channel.message_transfer(destination=exchange, routing_key=routing_key, application_headers=headers, body="two") + channel.message_transfer(destination=exchange, + content=Content("two", properties={'routing_key':routing_key, 'application_headers':headers})) #check one queue has both messages and the other has only one - self.assertEquals("one", queue1.get(timeout=1).body) + self.assertEquals("one", queue1.get(timeout=1).content.body) try: msg = queue1.get(timeout=1) - self.fail("Got extra message: %s" % msg.body) + self.fail("Got extra message: %s" % msg.content.body) except Empty: pass - self.assertEquals("one", queue2.get(timeout=1).body) - self.assertEquals("two", queue2.get(timeout=1).body) + self.assertEquals("one", queue2.get(timeout=1).content.body) + self.assertEquals("two", queue2.get(timeout=1).content.body) try: msg = queue2.get(timeout=1) self.fail("Got extra message: " + msg) @@ -210,9 +212,9 @@ class QueueTests(TestBase): #straight-forward case: channel.queue_declare(queue="delete-me") - channel.message_transfer(routing_key="delete-me", body="a") - channel.message_transfer(routing_key="delete-me", body="b") - channel.message_transfer(routing_key="delete-me", body="c") + channel.message_transfer(content=Content("a", properties={'routing_key':"delete-me"})) + channel.message_transfer(content=Content("b", properties={'routing_key':"delete-me"})) + channel.message_transfer(content=Content("c", properties={'routing_key':"delete-me"})) channel.queue_delete(queue="delete-me") #check that it has gone be declaring passively try: @@ -241,7 +243,7 @@ class QueueTests(TestBase): #create a queue and add a message to it (use default binding): channel.queue_declare(queue="delete-me-2") channel.queue_declare(queue="delete-me-2", passive="True") - channel.message_transfer(routing_key="delete-me-2", body="message") + channel.message_transfer(content=Content("message", properties={'routing_key':"delete-me-2"})) #try to delete, but only if empty: try: @@ -258,7 +260,7 @@ class QueueTests(TestBase): channel.message_subscribe(destination="consumer_tag", queue="delete-me-2") queue = self.client.queue("consumer_tag") msg = queue.get(timeout=1) - self.assertEqual("message", msg.body) + self.assertEqual("message", msg.content.body) channel.message_cancel(destination="consumer_tag") #retry deletion on empty queue: diff --git a/python/tests_0-10/testlib.py b/python/tests_0-10/testlib.py index f345fbbd80..a0355c4ce0 100644 --- a/python/tests_0-10/testlib.py +++ b/python/tests_0-10/testlib.py @@ -49,7 +49,7 @@ class TestBaseTest(TestBase): def testAssertEmptyFail(self): self.queue_declare(queue="full") q = self.consume("full") - self.channel.message_transfer(routing_key="full", body="") + self.channel.message_transfer(content=Content("", properties={'routing_key':"full"})) try: self.assertEmpty(q); self.fail("assertEmpty did not assert on non-empty queue") diff --git a/python/tests_0-10/tx.py b/python/tests_0-10/tx.py index 4c2f75d35e..7c50de4ee2 100644 --- a/python/tests_0-10/tx.py +++ b/python/tests_0-10/tx.py @@ -53,21 +53,21 @@ class TxTests(TestBase): #check results for i in range(1, 5): msg = queue_c.get(timeout=1) - self.assertEqual("TxMessage %d" % i, msg.body) + self.assertEqual("TxMessage %d" % i, msg.content.body) msg.complete() msg = queue_b.get(timeout=1) - self.assertEqual("TxMessage 6", msg.body) + self.assertEqual("TxMessage 6", msg.content.body) msg.complete() msg = queue_a.get(timeout=1) - self.assertEqual("TxMessage 7", msg.body) + self.assertEqual("TxMessage 7", msg.content.body) msg.complete() for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) + self.fail("Got unexpected message: " + extra.content.body) except Empty: None #cleanup @@ -83,7 +83,7 @@ class TxTests(TestBase): for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) + self.fail("Got unexpected message: " + extra.content.body) except Empty: None channel.tx_rollback() @@ -91,21 +91,21 @@ class TxTests(TestBase): #check results for i in range(1, 5): msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) + self.assertEqual("Message %d" % i, msg.content.body) msg.complete() msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.body) + self.assertEqual("Message 6", msg.content.body) msg.complete() msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.body) + self.assertEqual("Message 7", msg.content.body) msg.complete() for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) + self.fail("Got unexpected message: " + extra.content.body) except Empty: None #cleanup @@ -121,7 +121,7 @@ class TxTests(TestBase): for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) + self.fail("Got unexpected message: " + extra.content.body) except Empty: None channel.tx_rollback() @@ -129,21 +129,21 @@ class TxTests(TestBase): #check results for i in range(1, 5): msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) + self.assertEqual("Message %d" % i, msg.content.body) msg.complete() msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.body) + self.assertEqual("Message 6", msg.content.body) msg.complete() msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.body) + self.assertEqual("Message 7", msg.content.body) msg.complete() for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) + self.fail("Got unexpected message: " + extra.content.body) except Empty: None #cleanup @@ -166,10 +166,12 @@ class TxTests(TestBase): channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic) for i in range(1, 5): - channel.message_transfer(routing_key=name_a, message_id="msg%d" % i, body="Message %d" % i) + channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"msg%d" % i}, body="Message %d" % i)) - channel.message_transfer(routing_key=key, destination="amq.direct", message_id="msg6", body="Message 6") - channel.message_transfer(routing_key=topic, destination="amq.topic", message_id="msg7", body="Message 7") + channel.message_transfer(destination="amq.direct", + content=Content(properties={'routing_key':key, 'message_id':"msg6"}, body="Message 6")) + channel.message_transfer(destination="amq.topic", + content=Content(properties={'routing_key':topic, 'message_id':"msg7"}, body="Message 7")) channel.tx_select() @@ -178,27 +180,31 @@ class TxTests(TestBase): queue_a = self.client.queue("sub_a") for i in range(1, 5): msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) + self.assertEqual("Message %d" % i, msg.content.body) msg.complete() channel.message_subscribe(queue=name_b, destination="sub_b", confirm_mode=1) queue_b = self.client.queue("sub_b") msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.body) + self.assertEqual("Message 6", msg.content.body) msg.complete() sub_c = channel.message_subscribe(queue=name_c, destination="sub_c", confirm_mode=1) queue_c = self.client.queue("sub_c") msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.body) + self.assertEqual("Message 7", msg.content.body) msg.complete() #publish messages for i in range(1, 5): - channel.message_transfer(routing_key=topic, destination="amq.topic", message_id="tx-msg%d" % i, body="TxMessage %d" % i) - - channel.message_transfer(routing_key=key, destination="amq.direct", message_id="tx-msg6", body="TxMessage 6") - channel.message_transfer(routing_key=name_a, message_id="tx-msg7", body="TxMessage 7") - + channel.message_transfer(destination="amq.topic", + content=Content(properties={'routing_key':topic, 'message_id':"tx-msg%d" % i}, + body="TxMessage %d" % i)) + + channel.message_transfer(destination="amq.direct", + content=Content(properties={'routing_key':key, 'message_id':"tx-msg6"}, + body="TxMessage 6")) + channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"tx-msg7"}, + body="TxMessage 7")) return queue_a, queue_b, queue_c |