summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-08-28 19:38:17 +0000
committerGordon Sim <gsim@apache.org>2007-08-28 19:38:17 +0000
commit9e10f4ea3b2f8ab6650f635cada48e4735ca20d7 (patch)
tree26ad3b8dffa17fa665fe7a033a7c8092839df011 /python
parent6b09696b216c090b512c6af92bf7976ae3407add (diff)
downloadqpid-python-9e10f4ea3b2f8ab6650f635cada48e4735ca20d7.tar.gz
Updated message.transfer encoding to use header and content segments (including new structs).
Unified more between the basic and message classes messages. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@570538 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/cpp_failing_0-10.txt13
-rw-r--r--python/qpid/client.py2
-rw-r--r--python/qpid/connection.py4
-rw-r--r--python/qpid/testlib.py9
-rw-r--r--python/tests_0-10/alternate-exchange.py25
-rw-r--r--python/tests_0-10/basic.py396
-rw-r--r--python/tests_0-10/broker.py23
-rw-r--r--python/tests_0-10/dtx.py33
-rw-r--r--python/tests_0-10/example.py9
-rw-r--r--python/tests_0-10/exchange.py12
-rw-r--r--python/tests_0-10/message.py410
-rw-r--r--python/tests_0-10/queue.py34
-rw-r--r--python/tests_0-10/testlib.py2
-rw-r--r--python/tests_0-10/tx.py56
14 files changed, 165 insertions, 863 deletions
diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt
index e68f942d67..97cf420717 100644
--- a/python/cpp_failing_0-10.txt
+++ b/python/cpp_failing_0-10.txt
@@ -1,15 +1,4 @@
+tests_0-10.alternate-exchange.AlternateExchangeTests.test_immediate
tests_0-10.message.MessageTests.test_reject
tests_0-10.basic.BasicTests.test_get
-tests_0-10.message.MessageTests.test_get
-tests_0-10.message.MessageTests.test_checkpoint
-tests_0-10.message.MessageTests.test_empty_reference
-tests_0-10.message.MessageTests.test_reference_already_opened_error
-tests_0-10.message.MessageTests.test_reference_completion
-tests_0-10.message.MessageTests.test_reference_large
-tests_0-10.message.MessageTests.test_reference_multi_transfer
-tests_0-10.message.MessageTests.test_reference_simple
-tests_0-10.message.MessageTests.test_reference_unopened_on_append_error
-tests_0-10.message.MessageTests.test_reference_unopened_on_close_error
-tests_0-10.message.MessageTests.test_reference_unopened_on_transfer_error
-
diff --git a/python/qpid/client.py b/python/qpid/client.py
index 3efd79c389..edcd1b8ad2 100644
--- a/python/qpid/client.py
+++ b/python/qpid/client.py
@@ -115,8 +115,6 @@ class ClientDelegate(Delegate):
self.client.started.set()
def message_transfer(self, ch, msg):
- if isinstance(msg.body, ReferenceId):
- msg.reference = ch.references.get(msg.body.id)
self.client.queue(msg.destination).put(msg)
def message_open(self, ch, msg):
diff --git a/python/qpid/connection.py b/python/qpid/connection.py
index 46b58e83b7..39bcde17df 100644
--- a/python/qpid/connection.py
+++ b/python/qpid/connection.py
@@ -239,9 +239,7 @@ class Response(Frame):
return "[%s] Response(%s,%s,%s) %s" % (self.channel, self.id, self.request_id, self.batch_offset, self.method)
def uses_struct_encoding(spec):
- return (spec.major == 0 and
- spec.minor == 10 and
- "transitional" not in spec.file)
+ return (spec.major == 0 and spec.minor == 10)
class Header(Frame):
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
index 0b2a1b78d6..28c07ba43a 100644
--- a/python/qpid/testlib.py
+++ b/python/qpid/testlib.py
@@ -280,18 +280,17 @@ class TestBase(unittest.TestCase):
routing_key=routing_key)
else:
self.channel.message_transfer(
- destination=exchange, body=body,
- application_headers=properties,
- routing_key=routing_key)
+ destination=exchange,
+ content=Content(body, properties={'application_headers':properties,'routing_key':routing_key}))
msg = queue.get(timeout=1)
if testrunner.use08spec():
self.assertEqual(body, msg.content.body)
if (properties):
self.assertEqual(properties, msg.content.properties)
else:
- self.assertEqual(body, msg.body)
+ self.assertEqual(body, msg.content.body)
if (properties):
- self.assertEqual(properties, msg.application_headers)
+ self.assertEqual(properties, msg.content['application_headers'])
def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None):
"""
diff --git a/python/tests_0-10/alternate-exchange.py b/python/tests_0-10/alternate-exchange.py
index a1c6151fca..d6ac62ccfe 100644
--- a/python/tests_0-10/alternate-exchange.py
+++ b/python/tests_0-10/alternate-exchange.py
@@ -50,17 +50,17 @@ class AlternateExchangeTests(TestBase):
#publish to the primary exchange
#...one message that makes it to the 'processed' queue:
- channel.message_transfer(destination="primary", routing_key="my-key", body="Good")
+ channel.message_transfer(destination="primary", content=Content("Good", properties={'routing_key':"my-key"}))
#...and one that does not:
- channel.message_transfer(destination="primary", routing_key="unused-key", body="Bad")
+ channel.message_transfer(destination="primary", content=Content("Bad", properties={'routing_key':"unused-key"}))
#delete the exchanges
channel.exchange_delete(exchange="primary")
channel.exchange_delete(exchange="secondary")
#verify behaviour
- self.assertEqual("Good", processed.get(timeout=1).body)
- self.assertEqual("Bad", returned.get(timeout=1).body)
+ self.assertEqual("Good", processed.get(timeout=1).content.body)
+ self.assertEqual("Bad", returned.get(timeout=1).content.body)
self.assertEmpty(processed)
self.assertEmpty(returned)
@@ -79,18 +79,18 @@ class AlternateExchangeTests(TestBase):
#create a queue using the dlq as its alternate exchange:
channel.queue_declare(queue="delete-me", alternate_exchange="dlq")
#send it some messages:
- channel.message_transfer(routing_key="delete-me", body="One")
- channel.message_transfer(routing_key="delete-me", body="Two")
- channel.message_transfer(routing_key="delete-me", body="Three")
+ channel.message_transfer(content=Content("One", properties={'routing_key':"delete-me"}))
+ channel.message_transfer(content=Content("Two", properties={'routing_key':"delete-me"}))
+ channel.message_transfer(content=Content("Three", properties={'routing_key':"delete-me"}))
#delete it:
channel.queue_delete(queue="delete-me")
#delete the dlq exchange:
channel.exchange_delete(exchange="dlq")
#check the messages were delivered to the dlq:
- self.assertEqual("One", dlq.get(timeout=1).body)
- self.assertEqual("Two", dlq.get(timeout=1).body)
- self.assertEqual("Three", dlq.get(timeout=1).body)
+ self.assertEqual("One", dlq.get(timeout=1).content.body)
+ self.assertEqual("Two", dlq.get(timeout=1).content.body)
+ self.assertEqual("Three", dlq.get(timeout=1).content.body)
self.assertEmpty(dlq)
@@ -109,10 +109,11 @@ class AlternateExchangeTests(TestBase):
#create a queue using the dlq as its alternate exchange:
channel.queue_declare(queue="no-consumers", alternate_exchange="dlq", exclusive=True)
#send it some messages:
- channel.message_transfer(routing_key="no-consumers", body="no one wants me", immediate=True)
+ #TODO: WE HAVE LOST THE IMMEDIATE FLAG; FIX THIS ONCE ITS BACK
+ channel.message_transfer(content=Content("no one wants me", properties={'routing_key':"no-consumers"}))
#check the messages were delivered to the dlq:
- self.assertEqual("no one wants me", dlq.get(timeout=1).body)
+ self.assertEqual("no one wants me", dlq.get(timeout=1).content.body)
self.assertEmpty(dlq)
#cleanup:
diff --git a/python/tests_0-10/basic.py b/python/tests_0-10/basic.py
deleted file mode 100644
index e7d22e00da..0000000000
--- a/python/tests_0-10/basic.py
+++ /dev/null
@@ -1,396 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Client, Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class BasicTests(TestBase):
- """Tests for 'methods' on the amqp basic 'class'"""
-
- def test_consume_no_local(self):
- """
- Test that the no_local flag is honoured in the consume method
- """
- channel = self.channel
- #setup, declare two queues:
- channel.queue_declare(queue="test-queue-1a", exclusive=True)
- channel.queue_declare(queue="test-queue-1b", exclusive=True)
- #establish two consumers one of which excludes delivery of locally sent messages
- channel.basic_consume(consumer_tag="local_included", queue="test-queue-1a")
- channel.basic_consume(consumer_tag="local_excluded", queue="test-queue-1b", no_local=True)
-
- #send a message
- channel.basic_publish(routing_key="test-queue-1a", content=Content("consume_no_local"))
- channel.basic_publish(routing_key="test-queue-1b", content=Content("consume_no_local"))
-
- #check the queues of the two consumers
- excluded = self.client.queue("local_excluded")
- included = self.client.queue("local_included")
- msg = included.get(timeout=1)
- self.assertEqual("consume_no_local", msg.content.body)
- try:
- excluded.get(timeout=1)
- self.fail("Received locally published message though no_local=true")
- except Empty: None
-
-
- def test_consume_exclusive(self):
- """
- Test that the exclusive flag is honoured in the consume method
- """
- channel = self.channel
- #setup, declare a queue:
- channel.queue_declare(queue="test-queue-2", exclusive=True)
-
- #check that an exclusive consumer prevents other consumer being created:
- channel.basic_consume(consumer_tag="first", queue="test-queue-2", exclusive=True)
- try:
- channel.basic_consume(consumer_tag="second", queue="test-queue-2")
- self.fail("Expected consume request to fail due to previous exclusive consumer")
- except Closed, e:
- self.assertChannelException(403, e.args[0])
-
- #open new channel and cleanup last consumer:
- channel = self.client.channel(2)
- channel.channel_open()
-
- #check that an exclusive consumer cannot be created if a consumer already exists:
- channel.basic_consume(consumer_tag="first", queue="test-queue-2")
- try:
- channel.basic_consume(consumer_tag="second", queue="test-queue-2", exclusive=True)
- self.fail("Expected exclusive consume request to fail due to previous consumer")
- except Closed, e:
- self.assertChannelException(403, e.args[0])
-
- def test_consume_queue_errors(self):
- """
- Test error conditions associated with the queue field of the consume method:
- """
- channel = self.channel
- try:
- #queue specified but doesn't exist:
- channel.basic_consume(queue="invalid-queue")
- self.fail("Expected failure when consuming from non-existent queue")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- channel = self.client.channel(2)
- channel.channel_open()
- try:
- #queue not specified and none previously declared for channel:
- channel.basic_consume(queue="")
- self.fail("Expected failure when consuming from unspecified queue")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
-
- def test_consume_unique_consumers(self):
- """
- Ensure unique consumer tags are enforced
- """
- channel = self.channel
- #setup, declare a queue:
- channel.queue_declare(queue="test-queue-3", exclusive=True)
-
- #check that attempts to use duplicate tags are detected and prevented:
- channel.basic_consume(consumer_tag="first", queue="test-queue-3")
- try:
- channel.basic_consume(consumer_tag="first", queue="test-queue-3")
- self.fail("Expected consume request to fail due to non-unique tag")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
-
- def test_cancel(self):
- """
- Test compliance of the basic.cancel method
- """
- channel = self.channel
- #setup, declare a queue:
- channel.queue_declare(queue="test-queue-4", exclusive=True)
- channel.basic_consume(consumer_tag="my-consumer", queue="test-queue-4")
- channel.basic_publish(routing_key="test-queue-4", content=Content("One"))
-
- myqueue = self.client.queue("my-consumer")
- msg = myqueue.get(timeout=1)
- self.assertEqual("One", msg.content.body)
-
- #cancel should stop messages being delivered
- channel.basic_cancel(consumer_tag="my-consumer")
- channel.basic_publish(routing_key="test-queue-4", content=Content("Two"))
- try:
- msg = myqueue.get(timeout=1)
- self.fail("Got message after cancellation: " + msg)
- except Empty: None
-
- #cancellation of non-existant consumers should be handled without error
- channel.basic_cancel(consumer_tag="my-consumer")
- channel.basic_cancel(consumer_tag="this-never-existed")
-
-
- def test_ack(self):
- """
- Test basic ack/recover behaviour
- """
- channel = self.channel
- channel.queue_declare(queue="test-ack-queue", exclusive=True)
-
- reply = channel.basic_consume(queue="test-ack-queue", no_ack=False)
- queue = self.client.queue(reply.consumer_tag)
-
- channel.basic_publish(routing_key="test-ack-queue", content=Content("One"))
- channel.basic_publish(routing_key="test-ack-queue", content=Content("Two"))
- channel.basic_publish(routing_key="test-ack-queue", content=Content("Three"))
- channel.basic_publish(routing_key="test-ack-queue", content=Content("Four"))
- channel.basic_publish(routing_key="test-ack-queue", content=Content("Five"))
-
- msg1 = queue.get(timeout=1)
- msg2 = queue.get(timeout=1)
- msg3 = queue.get(timeout=1)
- msg4 = queue.get(timeout=1)
- msg5 = queue.get(timeout=1)
-
- self.assertEqual("One", msg1.content.body)
- self.assertEqual("Two", msg2.content.body)
- self.assertEqual("Three", msg3.content.body)
- self.assertEqual("Four", msg4.content.body)
- self.assertEqual("Five", msg5.content.body)
-
- channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two
- channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
-
- channel.basic_recover(requeue=False)
-
- msg3b = queue.get(timeout=1)
- msg5b = queue.get(timeout=1)
-
- self.assertEqual("Three", msg3b.content.body)
- self.assertEqual("Five", msg5b.content.body)
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
- except Empty: None
-
- def test_recover_requeue(self):
- """
- Test requeing on recovery
- """
- channel = self.channel
- channel.queue_declare(queue="test-requeue", exclusive=True)
-
- subscription = channel.basic_consume(queue="test-requeue", no_ack=False)
- queue = self.client.queue(subscription.consumer_tag)
-
- channel.basic_publish(routing_key="test-requeue", content=Content("One"))
- channel.basic_publish(routing_key="test-requeue", content=Content("Two"))
- channel.basic_publish(routing_key="test-requeue", content=Content("Three"))
- channel.basic_publish(routing_key="test-requeue", content=Content("Four"))
- channel.basic_publish(routing_key="test-requeue", content=Content("Five"))
-
- msg1 = queue.get(timeout=1)
- msg2 = queue.get(timeout=1)
- msg3 = queue.get(timeout=1)
- msg4 = queue.get(timeout=1)
- msg5 = queue.get(timeout=1)
-
- self.assertEqual("One", msg1.content.body)
- self.assertEqual("Two", msg2.content.body)
- self.assertEqual("Three", msg3.content.body)
- self.assertEqual("Four", msg4.content.body)
- self.assertEqual("Five", msg5.content.body)
-
- channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two
- channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
-
- channel.basic_cancel(consumer_tag=subscription.consumer_tag)
-
- channel.basic_recover(requeue=True)
-
- subscription2 = channel.basic_consume(queue="test-requeue")
- queue2 = self.client.queue(subscription2.consumer_tag)
-
- msg3b = queue2.get(timeout=1)
- msg5b = queue2.get(timeout=1)
-
- self.assertEqual("Three", msg3b.content.body)
- self.assertEqual("Five", msg5b.content.body)
-
- self.assertEqual(True, msg3b.redelivered)
- self.assertEqual(True, msg5b.redelivered)
-
- try:
- extra = queue2.get(timeout=1)
- self.fail("Got unexpected message in second queue: " + extra.content.body)
- except Empty: None
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected message in original queue: " + extra.content.body)
- except Empty: None
-
-
- def test_qos_prefetch_count(self):
- """
- Test that the prefetch count specified is honoured
- """
- #setup: declare queue and subscribe
- channel = self.channel
- channel.queue_declare(queue="test-prefetch-count", exclusive=True)
- subscription = channel.basic_consume(queue="test-prefetch-count", no_ack=False)
- queue = self.client.queue(subscription.consumer_tag)
-
- #set prefetch to 5:
- channel.basic_qos(prefetch_count=5)
-
- #publish 10 messages:
- for i in range(1, 11):
- channel.basic_publish(routing_key="test-prefetch-count", content=Content("Message %d" % i))
-
- #only 5 messages should have been delivered:
- for i in range(1, 6):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
- except Empty: None
-
- #ack messages and check that the next set arrive ok:
- channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
-
- for i in range(6, 11):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
-
- channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
- except Empty: None
-
-
-
- def test_qos_prefetch_size(self):
- """
- Test that the prefetch size specified is honoured
- """
- #setup: declare queue and subscribe
- channel = self.channel
- channel.queue_declare(queue="test-prefetch-size", exclusive=True)
- subscription = channel.basic_consume(queue="test-prefetch-size", no_ack=False)
- queue = self.client.queue(subscription.consumer_tag)
-
- #set prefetch to 50 bytes (each message is 9 or 10 bytes):
- channel.basic_qos(prefetch_size=50)
-
- #publish 10 messages:
- for i in range(1, 11):
- channel.basic_publish(routing_key="test-prefetch-size", content=Content("Message %d" % i))
-
- #only 5 messages should have been delivered (i.e. 45 bytes worth):
- for i in range(1, 6):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
- except Empty: None
-
- #ack messages and check that the next set arrive ok:
- channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
-
- for i in range(6, 11):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
-
- channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
- except Empty: None
-
- #make sure that a single oversized message still gets delivered
- large = "abcdefghijklmnopqrstuvwxyz"
- large = large + "-" + large;
- channel.basic_publish(routing_key="test-prefetch-size", content=Content(large))
- msg = queue.get(timeout=1)
- self.assertEqual(large, msg.content.body)
-
- def test_get(self):
- """
- Test basic_get method
- """
- channel = self.channel
- channel.queue_declare(queue="test-get", exclusive=True)
-
- #publish some messages (no_ack=True)
- for i in range(1, 11):
- channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
-
- #use basic_get to read back the messages, and check that we get an empty at the end
- for i in range(1, 11):
- reply = channel.basic_get(no_ack=True)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get_ok")
- self.assertEqual("Message %d" % i, reply.content.body)
-
- reply = channel.basic_get(no_ack=True)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get_empty")
-
- #repeat for no_ack=False
- for i in range(11, 21):
- channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
-
- for i in range(11, 21):
- reply = channel.basic_get(no_ack=False)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get_ok")
- self.assertEqual("Message %d" % i, reply.content.body)
- if(i == 13):
- channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True)
- if(i in [15, 17, 19]):
- channel.basic_ack(delivery_tag=reply.delivery_tag)
-
- reply = channel.basic_get(no_ack=True)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get_empty")
-
- #recover(requeue=True)
- channel.basic_recover(requeue=True)
-
- #get the unacked messages again (14, 16, 18, 20)
- for i in [14, 16, 18, 20]:
- reply = channel.basic_get(no_ack=False)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get_ok")
- self.assertEqual("Message %d" % i, reply.content.body)
- channel.basic_ack(delivery_tag=reply.delivery_tag)
-
- reply = channel.basic_get(no_ack=True)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get_empty")
-
- channel.basic_recover(requeue=True)
-
- reply = channel.basic_get(no_ack=True)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get_empty")
diff --git a/python/tests_0-10/broker.py b/python/tests_0-10/broker.py
index 647f5d4fa5..0eb71287ec 100644
--- a/python/tests_0-10/broker.py
+++ b/python/tests_0-10/broker.py
@@ -37,19 +37,19 @@ class BrokerTests(TestBase):
ctag = "tag1"
ch.message_subscribe(queue = "myqueue", destination = ctag, confirm_mode = 0)
body = "test no-ack"
- ch.message_transfer(routing_key = "myqueue", body = body)
+ ch.message_transfer(content = Content(body, properties = {"routing_key" : "myqueue"}))
msg = self.client.queue(ctag).get(timeout = 5)
- self.assert_(msg.body == body)
+ self.assert_(msg.content.body == body)
# Acknowledging consumer
self.queue_declare(ch, queue = "otherqueue")
ctag = "tag2"
ch.message_subscribe(queue = "otherqueue", destination = ctag, confirm_mode = 1)
body = "test ack"
- ch.message_transfer(routing_key = "otherqueue", body = body)
+ ch.message_transfer(content = Content(body, properties = {"routing_key" : "otherqueue"}))
msg = self.client.queue(ctag).get(timeout = 5)
msg.complete()
- self.assert_(msg.body == body)
+ self.assert_(msg.content.body == body)
def test_simple_delivery_immediate(self):
"""
@@ -64,9 +64,9 @@ class BrokerTests(TestBase):
queue = self.client.queue(consumer_tag)
body = "Immediate Delivery"
- channel.message_transfer(destination="test-exchange", routing_key="key", body=body, immediate=True)
+ channel.message_transfer(destination="test-exchange", content = Content(body, properties = {"routing_key" : "key"}))
msg = queue.get(timeout=5)
- self.assert_(msg.body == body)
+ self.assert_(msg.content.body == body)
# TODO: Ensure we fail if immediate=True and there's no consumer.
@@ -81,13 +81,13 @@ class BrokerTests(TestBase):
self.queue_declare(channel, queue="test-queue")
channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
body = "Queued Delivery"
- channel.message_transfer(destination="test-exchange", routing_key="key", body=body)
+ channel.message_transfer(destination="test-exchange", content = Content(body, properties = {"routing_key" : "key"}))
consumer_tag = "tag1"
channel.message_subscribe(queue="test-queue", destination=consumer_tag, confirm_mode = 0)
queue = self.client.queue(consumer_tag)
msg = queue.get(timeout=5)
- self.assert_(msg.body == body)
+ self.assert_(msg.content.body == body)
def test_invalid_channel(self):
channel = self.client.channel(200)
@@ -114,8 +114,9 @@ class BrokerTests(TestBase):
channel.message_subscribe(destination="my-tag", queue="flow_test_queue")
incoming = self.client.queue("my-tag")
- channel.channel_flow(active=False)
- channel.message_transfer(routing_key="flow_test_queue", body="abcdefghijklmnopqrstuvwxyz")
+ channel.channel_flow(active=False)
+ c = Content("abcdefghijklmnopqrstuvwxyz", properties = {"routing_key" : "flow_test_queue"})
+ channel.message_transfer(content = c)
try:
incoming.get(timeout=1)
self.fail("Received message when flow turned off.")
@@ -123,4 +124,4 @@ class BrokerTests(TestBase):
channel.channel_flow(active=True)
msg = incoming.get(timeout=1)
- self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.body)
+ self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.content.body)
diff --git a/python/tests_0-10/dtx.py b/python/tests_0-10/dtx.py
index a5b53ac65b..ea587f5998 100644
--- a/python/tests_0-10/dtx.py
+++ b/python/tests_0-10/dtx.py
@@ -248,8 +248,8 @@ class DtxTests(TestBase):
#setup
channel1.queue_declare(queue="one", exclusive=True)
channel1.queue_declare(queue="two", exclusive=True)
- channel1.message_transfer(routing_key="one", message_id="a", body="DtxMessage")
- channel1.message_transfer(routing_key="two", message_id="b", body="DtxMessage")
+ channel1.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage"))
+ channel1.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage"))
#create a xid
tx = self.xid("dummy")
@@ -284,8 +284,8 @@ class DtxTests(TestBase):
#setup
channel.queue_declare(queue="one", exclusive=True)
channel.queue_declare(queue="two", exclusive=True)
- channel.message_transfer(routing_key="one", message_id="a", body="DtxMessage")
- channel.message_transfer(routing_key="two", message_id="b", body="DtxMessage")
+ channel.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage"))
+ channel.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage"))
tx = self.xid("dummy")
@@ -358,17 +358,17 @@ class DtxTests(TestBase):
channel.dtx_demarcation_select()
tx = self.xid("dummy")
channel.dtx_demarcation_start(xid=tx)
- channel.message_transfer(routing_key="tx-queue", message_id="one", body="DtxMessage")
+ channel.message_transfer(content=Content(properties={'routing_key':"tx-queue", 'message_id':"one"}, body="DtxMessage"))
channel.dtx_demarcation_end(xid=tx)
#now that association with txn is ended, publish another message
- channel.message_transfer(routing_key="tx-queue", message_id="two", body="DtxMessage")
+ channel.message_transfer(content=Content(properties={'routing_key':"tx-queue", 'message_id':"two"}, body="DtxMessage"))
#check the second message is available, but not the first
self.assertMessageCount(1, "tx-queue")
channel.message_subscribe(queue="tx-queue", destination="results", confirm_mode=1)
msg = self.client.queue("results").get(timeout=1)
- self.assertEqual("two", msg.message_id)
+ self.assertEqual("two", msg.content['message_id'])
channel.message_cancel(destination="results")
#ack the message then close the channel
msg.complete()
@@ -393,7 +393,7 @@ class DtxTests(TestBase):
tester.dtx_demarcation_select()
tx = self.xid("dummy")
tester.dtx_demarcation_start(xid=tx)
- tester.message_transfer(routing_key="dummy", body="whatever")
+ tester.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
tester.dtx_demarcation_end(xid=tx)
tester.dtx_coordination_prepare(xid=tx)
failed = False
@@ -427,7 +427,7 @@ class DtxTests(TestBase):
tester.dtx_demarcation_select()
tx = self.xid("dummy")
tester.dtx_demarcation_start(xid=tx)
- tester.message_transfer(routing_key="dummy", body="whatever")
+ tester.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
tester.dtx_demarcation_end(xid=tx)
failed = False
try:
@@ -456,14 +456,14 @@ class DtxTests(TestBase):
#setup:
channel2.queue_declare(queue="dummy", exclusive=True)
- channel2.message_transfer(routing_key="dummy", body="whatever")
+ channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
tx = self.xid("dummy")
channel2.dtx_demarcation_select()
channel2.dtx_demarcation_start(xid=tx)
channel2.message_get(queue="dummy", destination="dummy")
self.client.queue("dummy").get(timeout=1).complete()
- channel2.message_transfer(routing_key="dummy", body="whatever")
+ channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
channel2.channel_close()
self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).status)
@@ -497,7 +497,7 @@ class DtxTests(TestBase):
tx = self.xid("dummy")
channel.queue_declare(queue="queue-a", exclusive=True)
channel.queue_declare(queue="queue-b", exclusive=True)
- channel.message_transfer(routing_key="queue-a", message_id="timeout", body="DtxMessage")
+ channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':"timeout"}, body="DtxMessage"))
channel.dtx_demarcation_select()
channel.dtx_demarcation_start(xid=tx)
@@ -527,7 +527,7 @@ class DtxTests(TestBase):
for i in range(1, 10):
tx = self.xid("tx%s" % (i))
channel.dtx_demarcation_start(xid=tx)
- channel.message_transfer(routing_key="dummy", body="message%s" % (i))
+ channel.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="message%s" % (i)))
channel.dtx_demarcation_end(xid=tx)
if i in [2, 5, 6, 8]:
channel.dtx_coordination_prepare(xid=tx)
@@ -575,7 +575,7 @@ class DtxTests(TestBase):
channel.queue_declare(queue="queue-a", exclusive=True)
channel.queue_declare(queue="queue-b", exclusive=True)
#put message with specified id on one queue:
- channel.message_transfer(routing_key="queue-a", message_id=id, body="DtxMessage")
+ channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':id}, body="DtxMessage"))
#start the transaction:
channel.dtx_demarcation_select()
@@ -594,12 +594,13 @@ class DtxTests(TestBase):
msg.complete();
#re-publish to dest
- channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body)
+ channel.message_transfer(content=Content(properties={'routing_key':dest, 'message_id':msg.content['message_id']},
+ body=msg.content.body))
def assertMessageCount(self, expected, queue):
self.assertEqual(expected, self.channel.queue_query(queue=queue).message_count)
def assertMessageId(self, expected, queue):
self.channel.message_subscribe(queue=queue, destination="results")
- self.assertEqual(expected, self.client.queue("results").get(timeout=1).message_id)
+ self.assertEqual(expected, self.client.queue("results").get(timeout=1).content['message_id'])
self.channel.message_cancel(destination="results")
diff --git a/python/tests_0-10/example.py b/python/tests_0-10/example.py
index e4c80951ac..e3e2c3b095 100644
--- a/python/tests_0-10/example.py
+++ b/python/tests_0-10/example.py
@@ -76,10 +76,9 @@ class ExampleTest (TestBase):
# Now lets publish a message and see if our consumer gets it. To do
# this we need to import the Content class.
- body = "Hello World!"
- channel.message_transfer(destination="test",
- routing_key="key",
- body = body)
+ sent = Content("Hello World!")
+ sent["routing_key"] = "key"
+ channel.message_transfer(destination="test", content=sent)
# Now we'll wait for the message to arrive. We can use the timeout
# argument in case the server hangs. By default queue.get() will wait
@@ -87,7 +86,7 @@ class ExampleTest (TestBase):
msg = queue.get(timeout=10)
# And check that we got the right response with assertEqual
- self.assertEqual(body, msg.body)
+ self.assertEqual(sent.body, msg.content.body)
# Now acknowledge the message.
msg.complete()
diff --git a/python/tests_0-10/exchange.py b/python/tests_0-10/exchange.py
index 3a47ffff8c..4137eb7a51 100644
--- a/python/tests_0-10/exchange.py
+++ b/python/tests_0-10/exchange.py
@@ -61,10 +61,10 @@ class StandardExchangeVerifier:
self.assertPublishGet(q, ex, "a.x.b.x")
self.assertPublishGet(q, ex, "a.x.x.b.x")
# Shouldn't match
- self.channel.message_transfer(destination=ex, routing_key="a.b", body="")
- self.channel.message_transfer(destination=ex, routing_key="a.b.x.y", body="")
- self.channel.message_transfer(destination=ex, routing_key="x.a.b.x", body="")
- self.channel.message_transfer(destination=ex, routing_key="a.b", body="")
+ self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"}))
+ self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b.x.y"}))
+ self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"x.a.b.x"}))
+ self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"}))
self.assert_(q.empty())
def verifyHeadersExchange(self, ex):
@@ -74,7 +74,7 @@ class StandardExchangeVerifier:
q = self.consume("q")
headers = {"name":"fred", "age":3}
self.assertPublishGet(q, exchange=ex, properties=headers)
- self.channel.message_transfer(destination=ex, body="") # No headers, won't deliver
+ self.channel.message_transfer(destination=ex) # No headers, won't deliver
self.assertEmpty(q);
@@ -275,7 +275,7 @@ class HeadersExchangeTests(TestBase):
self.assertPublishGet(self.q, exchange="amq.match", properties=headers)
def myBasicPublish(self, headers):
- self.channel.message_transfer(destination="amq.match", body="foobar", application_headers=headers)
+ self.channel.message_transfer(destination="amq.match", content=Content("foobar", properties={'application_headers':headers}))
def testMatchAll(self):
self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3})
diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py
index 6cf2f3ef89..f08f437a65 100644
--- a/python/tests_0-10/message.py
+++ b/python/tests_0-10/message.py
@@ -38,14 +38,14 @@ class MessageTests(TestBase):
channel.message_subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True)
#send a message
- channel.message_transfer(routing_key="test-queue-1a", body="consume_no_local")
- channel.message_transfer(routing_key="test-queue-1b", body="consume_no_local")
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1b"}, body="consume_no_local"))
#check the queues of the two consumers
excluded = self.client.queue("local_excluded")
included = self.client.queue("local_included")
msg = included.get(timeout=1)
- self.assertEqual("consume_no_local", msg.body)
+ self.assertEqual("consume_no_local", msg.content.body)
try:
excluded.get(timeout=1)
self.fail("Received locally published message though no_local=true")
@@ -125,14 +125,14 @@ class MessageTests(TestBase):
#setup, declare a queue:
channel.queue_declare(queue="test-queue-4", exclusive=True)
channel.message_subscribe(destination="my-consumer", queue="test-queue-4")
- channel.message_transfer(routing_key="test-queue-4", body="One")
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="One"))
#cancel should stop messages being delivered
channel.message_cancel(destination="my-consumer")
- channel.message_transfer(routing_key="test-queue-4", body="Two")
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="Two"))
myqueue = self.client.queue("my-consumer")
msg = myqueue.get(timeout=1)
- self.assertEqual("One", msg.body)
+ self.assertEqual("One", msg.content.body)
try:
msg = myqueue.get(timeout=1)
self.fail("Got message after cancellation: " + msg)
@@ -153,11 +153,11 @@ class MessageTests(TestBase):
channel.message_subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1)
queue = self.client.queue("consumer_tag")
- channel.message_transfer(routing_key="test-ack-queue", body="One")
- channel.message_transfer(routing_key="test-ack-queue", body="Two")
- channel.message_transfer(routing_key="test-ack-queue", body="Three")
- channel.message_transfer(routing_key="test-ack-queue", body="Four")
- channel.message_transfer(routing_key="test-ack-queue", body="Five")
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="One"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Two"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Three"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Four"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Five"))
msg1 = queue.get(timeout=1)
msg2 = queue.get(timeout=1)
@@ -165,11 +165,11 @@ class MessageTests(TestBase):
msg4 = queue.get(timeout=1)
msg5 = queue.get(timeout=1)
- self.assertEqual("One", msg1.body)
- self.assertEqual("Two", msg2.body)
- self.assertEqual("Three", msg3.body)
- self.assertEqual("Four", msg4.body)
- self.assertEqual("Five", msg5.body)
+ self.assertEqual("One", msg1.content.body)
+ self.assertEqual("Two", msg2.content.body)
+ self.assertEqual("Three", msg3.content.body)
+ self.assertEqual("Four", msg4.content.body)
+ self.assertEqual("Five", msg5.content.body)
msg2.complete(cumulative=True)#One and Two
msg4.complete(cumulative=False)
@@ -179,12 +179,12 @@ class MessageTests(TestBase):
msg3b = queue.get(timeout=1)
msg5b = queue.get(timeout=1)
- self.assertEqual("Three", msg3b.body)
- self.assertEqual("Five", msg5b.body)
+ self.assertEqual("Three", msg3b.content.body)
+ self.assertEqual("Five", msg5b.content.body)
try:
extra = queue.get(timeout=1)
- self.fail("Got unexpected message: " + extra.body)
+ self.fail("Got unexpected message: " + extra.content.body)
except Empty: None
def test_recover_requeue(self):
@@ -197,11 +197,11 @@ class MessageTests(TestBase):
channel.message_subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1)
queue = self.client.queue("consumer_tag")
- channel.message_transfer(routing_key="test-requeue", body="One")
- channel.message_transfer(routing_key="test-requeue", body="Two")
- channel.message_transfer(routing_key="test-requeue", body="Three")
- channel.message_transfer(routing_key="test-requeue", body="Four")
- channel.message_transfer(routing_key="test-requeue", body="Five")
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="One"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Two"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Three"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Four"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Five"))
msg1 = queue.get(timeout=1)
msg2 = queue.get(timeout=1)
@@ -209,11 +209,11 @@ class MessageTests(TestBase):
msg4 = queue.get(timeout=1)
msg5 = queue.get(timeout=1)
- self.assertEqual("One", msg1.body)
- self.assertEqual("Two", msg2.body)
- self.assertEqual("Three", msg3.body)
- self.assertEqual("Four", msg4.body)
- self.assertEqual("Five", msg5.body)
+ self.assertEqual("One", msg1.content.body)
+ self.assertEqual("Two", msg2.content.body)
+ self.assertEqual("Three", msg3.content.body)
+ self.assertEqual("Four", msg4.content.body)
+ self.assertEqual("Five", msg5.content.body)
msg2.complete(cumulative=True) #One and Two
msg4.complete(cumulative=False) #Four
@@ -221,7 +221,7 @@ class MessageTests(TestBase):
channel.message_cancel(destination="consumer_tag")
#publish a new message
- channel.message_transfer(routing_key="test-requeue", body="Six")
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Six"))
#requeue unacked messages (Three and Five)
channel.message_recover(requeue=True)
@@ -231,21 +231,21 @@ class MessageTests(TestBase):
msg3b = queue2.get(timeout=1)
msg5b = queue2.get(timeout=1)
- self.assertEqual("Three", msg3b.body)
- self.assertEqual("Five", msg5b.body)
+ self.assertEqual("Three", msg3b.content.body)
+ self.assertEqual("Five", msg5b.content.body)
- self.assertEqual(True, msg3b.redelivered)
- self.assertEqual(True, msg5b.redelivered)
+ self.assertEqual(True, msg3b.content['redelivered'])
+ self.assertEqual(True, msg5b.content['redelivered'])
- self.assertEqual("Six", queue2.get(timeout=1).body)
+ self.assertEqual("Six", queue2.get(timeout=1).content.body)
try:
extra = queue2.get(timeout=1)
- self.fail("Got unexpected message in second queue: " + extra.body)
+ self.fail("Got unexpected message in second queue: " + extra.content.body)
except Empty: None
try:
extra = queue.get(timeout=1)
- self.fail("Got unexpected message in original queue: " + extra.body)
+ self.fail("Got unexpected message in original queue: " + extra.content.body)
except Empty: None
@@ -264,15 +264,15 @@ class MessageTests(TestBase):
#publish 10 messages:
for i in range(1, 11):
- channel.message_transfer(routing_key="test-prefetch-count", body="Message %d" % i)
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-count"}, body="Message %d" % i))
#only 5 messages should have been delivered:
for i in range(1, 6):
msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
+ self.assertEqual("Message %d" % i, msg.content.body)
try:
extra = queue.get(timeout=1)
- self.fail("Got unexpected 6th message in original queue: " + extra.body)
+ self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
except Empty: None
#ack messages and check that the next set arrive ok:
@@ -280,13 +280,13 @@ class MessageTests(TestBase):
for i in range(6, 11):
msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
+ self.assertEqual("Message %d" % i, msg.content.body)
msg.complete()
try:
extra = queue.get(timeout=1)
- self.fail("Got unexpected 11th message in original queue: " + extra.body)
+ self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
except Empty: None
@@ -306,16 +306,16 @@ class MessageTests(TestBase):
#publish 10 messages:
for i in range(1, 11):
- channel.message_transfer(routing_key="test-prefetch-size", body="Message %d" % i)
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body="Message %d" % i))
#only 5 messages should have been delivered (i.e. 45 bytes worth):
for i in range(1, 6):
msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
+ self.assertEqual("Message %d" % i, msg.content.body)
try:
extra = queue.get(timeout=1)
- self.fail("Got unexpected 6th message in original queue: " + extra.body)
+ self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
except Empty: None
#ack messages and check that the next set arrive ok:
@@ -323,328 +323,38 @@ class MessageTests(TestBase):
for i in range(6, 11):
msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
+ self.assertEqual("Message %d" % i, msg.content.body)
msg.complete()
try:
extra = queue.get(timeout=1)
- self.fail("Got unexpected 11th message in original queue: " + extra.body)
+ self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
except Empty: None
#make sure that a single oversized message still gets delivered
large = "abcdefghijklmnopqrstuvwxyz"
large = large + "-" + large;
- channel.message_transfer(routing_key="test-prefetch-size", body=large)
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body=large))
msg = queue.get(timeout=1)
- self.assertEqual(large, msg.body)
+ self.assertEqual(large, msg.content.body)
- def test_get(self):
- """
- Test message_get method
- """
- channel = self.channel
- channel.queue_declare(queue="test-get", exclusive=True)
-
- #publish some messages (no_ack=True)
- for i in range(1, 11):
- channel.message_transfer(routing_key="test-get", body="Message %d" % i)
-
- #use message_get to read back the messages, and check that we get an empty at the end
- for i in range(1, 11):
- tag = "queue %d" % i
- reply = channel.message_get(no_ack=True, queue="test-get", destination=tag)
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "ok")
- self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body)
-
- reply = channel.message_get(no_ack=True, queue="test-get")
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "empty")
-
- #repeat for confirm_mode=1
- for i in range(11, 21):
- channel.message_transfer(routing_key="test-get", body="Message %d" % i)
-
- for i in range(11, 21):
- tag = "queue %d" % i
- reply = channel.message_get(confirm_mode=1, queue="test-get", destination=tag)
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "ok")
- msg = self.client.queue(tag).get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
-
- if (i==13):
- msg.complete()#11, 12 & 13
- if(i in [15, 17, 19]):
- msg.complete(cumulative=False)
-
- reply = channel.message_get(no_ack=True, queue="test-get")
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "empty")
-
- #recover(requeue=True)
- channel.message_recover(requeue=True)
-
- #get the unacked messages again (14, 16, 18, 20)
- for i in [14, 16, 18, 20]:
- tag = "queue %d" % i
- reply = channel.message_get(confirm_mode=1, queue="test-get", destination=tag)
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "ok")
- msg = self.client.queue(tag).get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
- msg.complete()
-
- reply = channel.message_get(no_ack=True, queue="test-get")
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "empty")
-
- channel.message_recover(requeue=True)
-
- reply = channel.message_get(no_ack=True, queue="test-get")
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "empty")
-
- def test_reference_simple(self):
- """
- Test basic ability to handle references
- """
- channel = self.channel
- channel.queue_declare(queue="ref_queue", exclusive=True)
- channel.message_subscribe(queue="ref_queue", destination="c1")
- queue = self.client.queue("c1")
-
- refId = "myref"
- channel.message_open(reference=refId)
- channel.message_append(reference=refId, bytes="abcd")
- channel.synchronous = False
- ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId))
- channel.synchronous = True
-
- channel.message_append(reference=refId, bytes="efgh")
- channel.message_append(reference=refId, bytes="ijkl")
- channel.message_close(reference=refId)
-
- #first, wait for the ok for the transfer
- ack.get_response(timeout=1)
-
- self.assertDataEquals(channel, queue.get(timeout=1), "abcdefghijkl")
-
-
- def test_reference_large(self):
- """
- Test basic ability to handle references whose content exceeds max frame size
- """
- channel = self.channel
- self.queue_declare(queue="ref_queue")
-
- #generate a big data string (> max frame size of consumer):
- data = "0123456789"
- for i in range(0, 10):
- data += data
- #send it inline
- channel.synchronous = False
- ack = channel.message_transfer(routing_key="ref_queue", body=data)
- channel.synchronous = True
- #first, wait for the ok for the transfer
- ack.get_response(timeout=1)
-
- #create a new connection for consumer, with specific max frame size (< data)
- other = self.connect(tune_params={"channel_max":10, "frame_max":5120, "heartbeat":0})
- ch2 = other.channel(1)
- ch2.channel_open()
- ch2.message_subscribe(queue="ref_queue", destination="c1")
- queue = other.queue("c1")
-
- msg = queue.get(timeout=1)
- self.assertTrue(isinstance(msg.body, ReferenceId))
- self.assertTrue(msg.reference)
- self.assertEquals(data, msg.reference.get_complete())
-
- def test_reference_completion(self):
- """
- Test that reference transfer are not deemed complete until
- closed (therefore are not acked or routed until that point)
- """
- channel = self.channel
- channel.queue_declare(queue="ref_queue", exclusive=True)
- channel.message_subscribe(queue="ref_queue", destination="c1")
- queue = self.client.queue("c1")
-
- refId = "myref"
- channel.message_open(reference=refId)
- channel.message_append(reference=refId, bytes="abcd")
- channel.synchronous = False
- ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId))
- channel.synchronous = True
-
- try:
- msg = queue.get(timeout=1)
- self.fail("Got unexpected message on queue: " + msg)
- except Empty: None
-
- self.assertTrue(not ack.is_complete())
-
- channel.message_close(reference=refId)
-
- #first, wait for the ok for the transfer
- ack.get_response(timeout=1)
-
- self.assertDataEquals(channel, queue.get(timeout=1), "abcd")
- def test_reference_multi_transfer(self):
- """
- Test that multiple transfer requests for the same reference are
- correctly handled.
- """
- channel = self.channel
- #declare and consume from two queues
- channel.queue_declare(queue="q-one", exclusive=True)
- channel.queue_declare(queue="q-two", exclusive=True)
- channel.message_subscribe(queue="q-one", destination="q-one")
- channel.message_subscribe(queue="q-two", destination="q-two")
- queue1 = self.client.queue("q-one")
- queue2 = self.client.queue("q-two")
-
- #transfer a single ref to both queues (in separate commands)
- channel.message_open(reference="my-ref")
- channel.synchronous = False
- ack1 = channel.message_transfer(routing_key="q-one", body=ReferenceId("my-ref"))
- channel.message_append(reference="my-ref", bytes="my data")
- ack2 = channel.message_transfer(routing_key="q-two", body=ReferenceId("my-ref"))
- channel.synchronous = True
- channel.message_close(reference="my-ref")
-
- #check that both queues have the message
- self.assertDataEquals(channel, queue1.get(timeout=1), "my data")
- self.assertDataEquals(channel, queue2.get(timeout=1), "my data")
- self.assertEmpty(queue1)
- self.assertEmpty(queue2)
-
- #transfer a single ref to the same queue twice (in separate commands)
- channel.message_open(reference="my-ref")
- channel.synchronous = False
- ack1 = channel.message_transfer(routing_key="q-one", message_id="abc", body=ReferenceId("my-ref"))
- channel.message_append(reference="my-ref", bytes="second message")
- ack2 = channel.message_transfer(routing_key="q-one", message_id="xyz", body=ReferenceId("my-ref"))
- channel.synchronous = True
- channel.message_close(reference="my-ref")
-
- msg1 = queue1.get(timeout=1)
- msg2 = queue1.get(timeout=1)
- #order is undefined
- if msg1.message_id == "abc":
- self.assertEquals(msg2.message_id, "xyz")
- else:
- self.assertEquals(msg1.message_id, "xyz")
- self.assertEquals(msg2.message_id, "abc")
-
- #would be legal for the incoming messages to be transfered
- #inline or by reference in any combination
-
- if isinstance(msg1.body, ReferenceId):
- self.assertEquals("second message", msg1.reference.get_complete())
- if isinstance(msg2.body, ReferenceId):
- if msg1.body != msg2.body:
- self.assertEquals("second message", msg2.reference.get_complete())
- #else ok, as same ref as msg1
- else:
- self.assertEquals("second message", msg1.body)
- if isinstance(msg2.body, ReferenceId):
- self.assertEquals("second message", msg2.reference.get_complete())
- else:
- self.assertEquals("second message", msg2.body)
-
- self.assertEmpty(queue1)
-
- def test_reference_unopened_on_append_error(self):
- channel = self.channel
- try:
- channel.message_append(reference="unopened")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def test_reference_unopened_on_close_error(self):
- channel = self.channel
- try:
- channel.message_close(reference="unopened")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def test_reference_unopened_on_transfer_error(self):
- channel = self.channel
- try:
- channel.message_transfer(body=ReferenceId("unopened"))
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def test_reference_already_opened_error(self):
- channel = self.channel
- channel.message_open(reference="a")
- try:
- channel.message_open(reference="a")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def test_empty_reference(self):
- channel = self.channel
- channel.queue_declare(queue="ref_queue", exclusive=True)
- channel.message_subscribe(queue="ref_queue", destination="c1")
- queue = self.client.queue("c1")
-
- refId = "myref"
- channel.message_open(reference=refId)
- channel.synchronous = False
- ack = channel.message_transfer(routing_key="ref_queue", message_id="empty-msg", body=ReferenceId(refId))
- channel.synchronous = True
- channel.message_close(reference=refId)
-
- #first, wait for the ok for the transfer
- ack.get_response(timeout=1)
-
- msg = queue.get(timeout=1)
- self.assertEquals(msg.message_id, "empty-msg")
- self.assertDataEquals(channel, msg, "")
def test_reject(self):
channel = self.channel
channel.queue_declare(queue = "q", exclusive=True)
channel.message_subscribe(queue = "q", destination = "consumer")
- channel.message_transfer(routing_key = "q", body="blah, blah")
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body="blah, blah"))
msg = self.client.queue("consumer").get(timeout = 1)
- self.assertEquals(msg.body, "blah, blah")
+ self.assertEquals(msg.content.body, "blah, blah")
channel.message_cancel(destination = "consumer")
msg.reject()
channel.message_subscribe(queue = "q", destination = "checker")
msg = self.client.queue("checker").get(timeout = 1)
- self.assertEquals(msg.body, "blah, blah")
-
- def test_checkpoint(self):
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True)
-
- channel.message_open(reference="my-ref")
- channel.message_append(reference="my-ref", bytes="abcdefgh")
- channel.message_append(reference="my-ref", bytes="ijklmnop")
- channel.message_checkpoint(reference="my-ref", identifier="my-checkpoint")
- channel.channel_close()
-
- channel = self.client.channel(2)
- channel.channel_open()
- channel.message_subscribe(queue = "q", destination = "consumer")
- offset = channel.message_resume(reference="my-ref", identifier="my-checkpoint").value
- self.assertTrue(offset<=16)
- channel.message_append(reference="my-ref", bytes="qrstuvwxyz")
- channel.synchronous = False
- channel.message_transfer(routing_key="q-one", message_id="abcd", body=ReferenceId("my-ref"))
- channel.synchronous = True
- channel.message_close(reference="my-ref")
-
- self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz")
- self.assertEmpty(self.client.queue("consumer"))
+ self.assertEquals(msg.content.body, "blah, blah")
def test_credit_flow_messages(self):
"""
@@ -660,7 +370,7 @@ class MessageTests(TestBase):
channel.message_stop(destination = "c")
#send batch of messages to queue
for i in range(1, 11):
- channel.message_transfer(routing_key = "q", body = "Message %d" % i)
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
#set message credit to finite amount (less than enough for all messages)
channel.message_flow(unit = 0, value = 5, destination = "c")
@@ -692,7 +402,7 @@ class MessageTests(TestBase):
channel.message_stop(destination = "c")
#send batch of messages to queue
for i in range(1, 11):
- channel.message_transfer(routing_key = "q", body = "abcdefgh")
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
#each message is currently interpreted as requiring 75 bytes of credit
#set byte credit to finite amount (less than enough for all messages)
@@ -726,7 +436,7 @@ class MessageTests(TestBase):
channel.message_stop(destination = "c")
#send batch of messages to queue
for i in range(1, 11):
- channel.message_transfer(routing_key = "q", body = "Message %d" % i)
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
#set message credit to finite amount (less than enough for all messages)
channel.message_flow(unit = 0, value = 5, destination = "c")
@@ -760,7 +470,7 @@ class MessageTests(TestBase):
channel.message_stop(destination = "c")
#send batch of messages to queue
for i in range(1, 11):
- channel.message_transfer(routing_key = "q", body = "abcdefgh")
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
#each message is currently interpreted as requiring 75 bytes of credit
#set byte credit to finite amount (less than enough for all messages)
@@ -783,11 +493,5 @@ class MessageTests(TestBase):
self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
self.assertEmpty(q)
-
-
def assertDataEquals(self, channel, msg, expected):
- if isinstance(msg.body, ReferenceId):
- data = msg.reference.get_complete()
- else:
- data = msg.body
- self.assertEquals(expected, data)
+ self.assertEquals(expected, msg.content.body)
diff --git a/python/tests_0-10/queue.py b/python/tests_0-10/queue.py
index 8d99c50d32..05fa1aebc6 100644
--- a/python/tests_0-10/queue.py
+++ b/python/tests_0-10/queue.py
@@ -33,9 +33,9 @@ class QueueTests(TestBase):
channel.exchange_declare(exchange="test-exchange", type="direct")
channel.queue_declare(queue="test-queue", exclusive=True)
channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
- channel.message_transfer(destination="test-exchange", routing_key="key", body="one")
- channel.message_transfer(destination="test-exchange", routing_key="key", body="two")
- channel.message_transfer(destination="test-exchange", routing_key="key", body="three")
+ channel.message_transfer(destination="test-exchange", content=Content("one", properties={'routing_key':"key"}))
+ channel.message_transfer(destination="test-exchange", content=Content("two", properties={'routing_key':"key"}))
+ channel.message_transfer(destination="test-exchange", content=Content("three", properties={'routing_key':"key"}))
#check that the queue now reports 3 messages:
channel.queue_declare(queue="test-queue")
@@ -48,11 +48,11 @@ class QueueTests(TestBase):
self.assertEqual(0, reply.message_count)
#send a further message and consume it, ensuring that the other messages are really gone
- channel.message_transfer(destination="test-exchange", routing_key="key", body="four")
+ channel.message_transfer(destination="test-exchange", content=Content("four", properties={'routing_key':"key"}))
channel.message_subscribe(queue="test-queue", destination="tag")
queue = self.client.queue("tag")
msg = queue.get(timeout=1)
- self.assertEqual("four", msg.body)
+ self.assertEqual("four", msg.content.body)
#check error conditions (use new channels):
channel = self.client.channel(2)
@@ -179,23 +179,25 @@ class QueueTests(TestBase):
channel.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args)
#send a message that will match both bindings
- channel.message_transfer(destination=exchange, routing_key=routing_key, application_headers=headers, body="one")
+ channel.message_transfer(destination=exchange,
+ content=Content("one", properties={'routing_key':routing_key, 'application_headers':headers}))
#unbind first queue
channel.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
#send another message
- channel.message_transfer(destination=exchange, routing_key=routing_key, application_headers=headers, body="two")
+ channel.message_transfer(destination=exchange,
+ content=Content("two", properties={'routing_key':routing_key, 'application_headers':headers}))
#check one queue has both messages and the other has only one
- self.assertEquals("one", queue1.get(timeout=1).body)
+ self.assertEquals("one", queue1.get(timeout=1).content.body)
try:
msg = queue1.get(timeout=1)
- self.fail("Got extra message: %s" % msg.body)
+ self.fail("Got extra message: %s" % msg.content.body)
except Empty: pass
- self.assertEquals("one", queue2.get(timeout=1).body)
- self.assertEquals("two", queue2.get(timeout=1).body)
+ self.assertEquals("one", queue2.get(timeout=1).content.body)
+ self.assertEquals("two", queue2.get(timeout=1).content.body)
try:
msg = queue2.get(timeout=1)
self.fail("Got extra message: " + msg)
@@ -210,9 +212,9 @@ class QueueTests(TestBase):
#straight-forward case:
channel.queue_declare(queue="delete-me")
- channel.message_transfer(routing_key="delete-me", body="a")
- channel.message_transfer(routing_key="delete-me", body="b")
- channel.message_transfer(routing_key="delete-me", body="c")
+ channel.message_transfer(content=Content("a", properties={'routing_key':"delete-me"}))
+ channel.message_transfer(content=Content("b", properties={'routing_key':"delete-me"}))
+ channel.message_transfer(content=Content("c", properties={'routing_key':"delete-me"}))
channel.queue_delete(queue="delete-me")
#check that it has gone be declaring passively
try:
@@ -241,7 +243,7 @@ class QueueTests(TestBase):
#create a queue and add a message to it (use default binding):
channel.queue_declare(queue="delete-me-2")
channel.queue_declare(queue="delete-me-2", passive="True")
- channel.message_transfer(routing_key="delete-me-2", body="message")
+ channel.message_transfer(content=Content("message", properties={'routing_key':"delete-me-2"}))
#try to delete, but only if empty:
try:
@@ -258,7 +260,7 @@ class QueueTests(TestBase):
channel.message_subscribe(destination="consumer_tag", queue="delete-me-2")
queue = self.client.queue("consumer_tag")
msg = queue.get(timeout=1)
- self.assertEqual("message", msg.body)
+ self.assertEqual("message", msg.content.body)
channel.message_cancel(destination="consumer_tag")
#retry deletion on empty queue:
diff --git a/python/tests_0-10/testlib.py b/python/tests_0-10/testlib.py
index f345fbbd80..a0355c4ce0 100644
--- a/python/tests_0-10/testlib.py
+++ b/python/tests_0-10/testlib.py
@@ -49,7 +49,7 @@ class TestBaseTest(TestBase):
def testAssertEmptyFail(self):
self.queue_declare(queue="full")
q = self.consume("full")
- self.channel.message_transfer(routing_key="full", body="")
+ self.channel.message_transfer(content=Content("", properties={'routing_key':"full"}))
try:
self.assertEmpty(q);
self.fail("assertEmpty did not assert on non-empty queue")
diff --git a/python/tests_0-10/tx.py b/python/tests_0-10/tx.py
index 4c2f75d35e..7c50de4ee2 100644
--- a/python/tests_0-10/tx.py
+++ b/python/tests_0-10/tx.py
@@ -53,21 +53,21 @@ class TxTests(TestBase):
#check results
for i in range(1, 5):
msg = queue_c.get(timeout=1)
- self.assertEqual("TxMessage %d" % i, msg.body)
+ self.assertEqual("TxMessage %d" % i, msg.content.body)
msg.complete()
msg = queue_b.get(timeout=1)
- self.assertEqual("TxMessage 6", msg.body)
+ self.assertEqual("TxMessage 6", msg.content.body)
msg.complete()
msg = queue_a.get(timeout=1)
- self.assertEqual("TxMessage 7", msg.body)
+ self.assertEqual("TxMessage 7", msg.content.body)
msg.complete()
for q in [queue_a, queue_b, queue_c]:
try:
extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.body)
+ self.fail("Got unexpected message: " + extra.content.body)
except Empty: None
#cleanup
@@ -83,7 +83,7 @@ class TxTests(TestBase):
for q in [queue_a, queue_b, queue_c]:
try:
extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.body)
+ self.fail("Got unexpected message: " + extra.content.body)
except Empty: None
channel.tx_rollback()
@@ -91,21 +91,21 @@ class TxTests(TestBase):
#check results
for i in range(1, 5):
msg = queue_a.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
+ self.assertEqual("Message %d" % i, msg.content.body)
msg.complete()
msg = queue_b.get(timeout=1)
- self.assertEqual("Message 6", msg.body)
+ self.assertEqual("Message 6", msg.content.body)
msg.complete()
msg = queue_c.get(timeout=1)
- self.assertEqual("Message 7", msg.body)
+ self.assertEqual("Message 7", msg.content.body)
msg.complete()
for q in [queue_a, queue_b, queue_c]:
try:
extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.body)
+ self.fail("Got unexpected message: " + extra.content.body)
except Empty: None
#cleanup
@@ -121,7 +121,7 @@ class TxTests(TestBase):
for q in [queue_a, queue_b, queue_c]:
try:
extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.body)
+ self.fail("Got unexpected message: " + extra.content.body)
except Empty: None
channel.tx_rollback()
@@ -129,21 +129,21 @@ class TxTests(TestBase):
#check results
for i in range(1, 5):
msg = queue_a.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
+ self.assertEqual("Message %d" % i, msg.content.body)
msg.complete()
msg = queue_b.get(timeout=1)
- self.assertEqual("Message 6", msg.body)
+ self.assertEqual("Message 6", msg.content.body)
msg.complete()
msg = queue_c.get(timeout=1)
- self.assertEqual("Message 7", msg.body)
+ self.assertEqual("Message 7", msg.content.body)
msg.complete()
for q in [queue_a, queue_b, queue_c]:
try:
extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.body)
+ self.fail("Got unexpected message: " + extra.content.body)
except Empty: None
#cleanup
@@ -166,10 +166,12 @@ class TxTests(TestBase):
channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic)
for i in range(1, 5):
- channel.message_transfer(routing_key=name_a, message_id="msg%d" % i, body="Message %d" % i)
+ channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"msg%d" % i}, body="Message %d" % i))
- channel.message_transfer(routing_key=key, destination="amq.direct", message_id="msg6", body="Message 6")
- channel.message_transfer(routing_key=topic, destination="amq.topic", message_id="msg7", body="Message 7")
+ channel.message_transfer(destination="amq.direct",
+ content=Content(properties={'routing_key':key, 'message_id':"msg6"}, body="Message 6"))
+ channel.message_transfer(destination="amq.topic",
+ content=Content(properties={'routing_key':topic, 'message_id':"msg7"}, body="Message 7"))
channel.tx_select()
@@ -178,27 +180,31 @@ class TxTests(TestBase):
queue_a = self.client.queue("sub_a")
for i in range(1, 5):
msg = queue_a.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
+ self.assertEqual("Message %d" % i, msg.content.body)
msg.complete()
channel.message_subscribe(queue=name_b, destination="sub_b", confirm_mode=1)
queue_b = self.client.queue("sub_b")
msg = queue_b.get(timeout=1)
- self.assertEqual("Message 6", msg.body)
+ self.assertEqual("Message 6", msg.content.body)
msg.complete()
sub_c = channel.message_subscribe(queue=name_c, destination="sub_c", confirm_mode=1)
queue_c = self.client.queue("sub_c")
msg = queue_c.get(timeout=1)
- self.assertEqual("Message 7", msg.body)
+ self.assertEqual("Message 7", msg.content.body)
msg.complete()
#publish messages
for i in range(1, 5):
- channel.message_transfer(routing_key=topic, destination="amq.topic", message_id="tx-msg%d" % i, body="TxMessage %d" % i)
-
- channel.message_transfer(routing_key=key, destination="amq.direct", message_id="tx-msg6", body="TxMessage 6")
- channel.message_transfer(routing_key=name_a, message_id="tx-msg7", body="TxMessage 7")
-
+ channel.message_transfer(destination="amq.topic",
+ content=Content(properties={'routing_key':topic, 'message_id':"tx-msg%d" % i},
+ body="TxMessage %d" % i))
+
+ channel.message_transfer(destination="amq.direct",
+ content=Content(properties={'routing_key':key, 'message_id':"tx-msg6"},
+ body="TxMessage 6"))
+ channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"tx-msg7"},
+ body="TxMessage 7"))
return queue_a, queue_b, queue_c