summaryrefslogtreecommitdiff
path: root/qpid/python
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-03-21 02:09:17 +0000
committerAlan Conway <aconway@apache.org>2007-03-21 02:09:17 +0000
commitcbea321af3ebc1a4bdd5acb11ae935845751447f (patch)
tree4aecd0ea8d3d9846787e60e4daa191a13290f350 /qpid/python
parentd2eb3361494710466280341c98f76c03536d2ebe (diff)
downloadqpid-python-cbea321af3ebc1a4bdd5acb11ae935845751447f.tar.gz
removed python/tests: replaced by tests_0-8/tests_0-9
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@520707 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
-rw-r--r--qpid/python/tests/__init__.py20
-rw-r--r--qpid/python/tests/basic.py394
-rw-r--r--qpid/python/tests/broker.py109
-rw-r--r--qpid/python/tests/example.py94
-rw-r--r--qpid/python/tests/exchange.py327
-rw-r--r--qpid/python/tests/message.py659
-rw-r--r--qpid/python/tests/queue.py255
-rw-r--r--qpid/python/tests/testlib.py66
-rw-r--r--qpid/python/tests/tx.py188
9 files changed, 0 insertions, 2112 deletions
diff --git a/qpid/python/tests/__init__.py b/qpid/python/tests/__init__.py
deleted file mode 100644
index 9a09d2d04f..0000000000
--- a/qpid/python/tests/__init__.py
+++ /dev/null
@@ -1,20 +0,0 @@
-# Do not delete - marks this directory as a python package.
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
diff --git a/qpid/python/tests/basic.py b/qpid/python/tests/basic.py
deleted file mode 100644
index 0100b8ee3e..0000000000
--- a/qpid/python/tests/basic.py
+++ /dev/null
@@ -1,394 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Client, Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class BasicTests(TestBase):
- """Tests for 'methods' on the amqp basic 'class'"""
-
- def test_consume_no_local(self):
- """
- Test that the no_local flag is honoured in the consume method
- """
- channel = self.channel
- #setup, declare two queues:
- channel.queue_declare(queue="test-queue-1a", exclusive=True)
- channel.queue_declare(queue="test-queue-1b", exclusive=True)
- #establish two consumers one of which excludes delivery of locally sent messages
- channel.basic_consume(consumer_tag="local_included", queue="test-queue-1a")
- channel.basic_consume(consumer_tag="local_excluded", queue="test-queue-1b", no_local=True)
-
- #send a message
- channel.basic_publish(routing_key="test-queue-1a", content=Content("consume_no_local"))
- channel.basic_publish(routing_key="test-queue-1b", content=Content("consume_no_local"))
-
- #check the queues of the two consumers
- excluded = self.client.queue("local_excluded")
- included = self.client.queue("local_included")
- msg = included.get(timeout=1)
- self.assertEqual("consume_no_local", msg.content.body)
- try:
- excluded.get(timeout=1)
- self.fail("Received locally published message though no_local=true")
- except Empty: None
-
-
- def test_consume_exclusive(self):
- """
- Test that the exclusive flag is honoured in the consume method
- """
- channel = self.channel
- #setup, declare a queue:
- channel.queue_declare(queue="test-queue-2", exclusive=True)
-
- #check that an exclusive consumer prevents other consumer being created:
- channel.basic_consume(consumer_tag="first", queue="test-queue-2", exclusive=True)
- try:
- channel.basic_consume(consumer_tag="second", queue="test-queue-2")
- self.fail("Expected consume request to fail due to previous exclusive consumer")
- except Closed, e:
- self.assertChannelException(403, e.args[0])
-
- #open new channel and cleanup last consumer:
- channel = self.client.channel(2)
- channel.channel_open()
-
- #check that an exclusive consumer cannot be created if a consumer already exists:
- channel.basic_consume(consumer_tag="first", queue="test-queue-2")
- try:
- channel.basic_consume(consumer_tag="second", queue="test-queue-2", exclusive=True)
- self.fail("Expected exclusive consume request to fail due to previous consumer")
- except Closed, e:
- self.assertChannelException(403, e.args[0])
-
- def test_consume_queue_errors(self):
- """
- Test error conditions associated with the queue field of the consume method:
- """
- channel = self.channel
- try:
- #queue specified but doesn't exist:
- channel.basic_consume(queue="invalid-queue")
- self.fail("Expected failure when consuming from non-existent queue")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- channel = self.client.channel(2)
- channel.channel_open()
- try:
- #queue not specified and none previously declared for channel:
- channel.basic_consume(queue="")
- self.fail("Expected failure when consuming from unspecified queue")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
-
- def test_consume_unique_consumers(self):
- """
- Ensure unique consumer tags are enforced
- """
- channel = self.channel
- #setup, declare a queue:
- channel.queue_declare(queue="test-queue-3", exclusive=True)
-
- #check that attempts to use duplicate tags are detected and prevented:
- channel.basic_consume(consumer_tag="first", queue="test-queue-3")
- try:
- channel.basic_consume(consumer_tag="first", queue="test-queue-3")
- self.fail("Expected consume request to fail due to non-unique tag")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
-
- def test_cancel(self):
- """
- Test compliance of the basic.cancel method
- """
- channel = self.channel
- #setup, declare a queue:
- channel.queue_declare(queue="test-queue-4", exclusive=True)
- channel.basic_consume(consumer_tag="my-consumer", queue="test-queue-4")
- channel.basic_publish(routing_key="test-queue-4", content=Content("One"))
-
- #cancel should stop messages being delivered
- channel.basic_cancel(consumer_tag="my-consumer")
- channel.basic_publish(routing_key="test-queue-4", content=Content("Two"))
- myqueue = self.client.queue("my-consumer")
- msg = myqueue.get(timeout=1)
- self.assertEqual("One", msg.content.body)
- try:
- msg = myqueue.get(timeout=1)
- self.fail("Got message after cancellation: " + msg)
- except Empty: None
-
- #cancellation of non-existant consumers should be handled without error
- channel.basic_cancel(consumer_tag="my-consumer")
- channel.basic_cancel(consumer_tag="this-never-existed")
-
-
- def test_ack(self):
- """
- Test basic ack/recover behaviour
- """
- channel = self.channel
- self.queue_declare(queue="test-ack-queue", exclusive=True)
-
- reply = channel.basic_consume(queue="test-ack-queue", no_ack=False)
- queue = self.client.queue(reply.consumer_tag)
-
- channel.basic_publish(routing_key="test-ack-queue", content=Content("One"))
- channel.basic_publish(routing_key="test-ack-queue", content=Content("Two"))
- channel.basic_publish(routing_key="test-ack-queue", content=Content("Three"))
- channel.basic_publish(routing_key="test-ack-queue", content=Content("Four"))
- channel.basic_publish(routing_key="test-ack-queue", content=Content("Five"))
-
- msg1 = queue.get(timeout=1)
- msg2 = queue.get(timeout=1)
- msg3 = queue.get(timeout=1)
- msg4 = queue.get(timeout=1)
- msg5 = queue.get(timeout=1)
-
- self.assertEqual("One", msg1.content.body)
- self.assertEqual("Two", msg2.content.body)
- self.assertEqual("Three", msg3.content.body)
- self.assertEqual("Four", msg4.content.body)
- self.assertEqual("Five", msg5.content.body)
-
- channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two
- channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
-
- channel.basic_recover(requeue=False)
-
- msg3b = queue.get(timeout=1)
- msg5b = queue.get(timeout=1)
-
- self.assertEqual("Three", msg3b.content.body)
- self.assertEqual("Five", msg5b.content.body)
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
- except Empty: None
-
- def test_recover_requeue(self):
- """
- Test requeing on recovery
- """
- channel = self.channel
- channel.queue_declare(queue="test-requeue", exclusive=True)
-
- subscription = channel.basic_consume(queue="test-requeue", no_ack=False)
- queue = self.client.queue(subscription.consumer_tag)
-
- channel.basic_publish(routing_key="test-requeue", content=Content("One"))
- channel.basic_publish(routing_key="test-requeue", content=Content("Two"))
- channel.basic_publish(routing_key="test-requeue", content=Content("Three"))
- channel.basic_publish(routing_key="test-requeue", content=Content("Four"))
- channel.basic_publish(routing_key="test-requeue", content=Content("Five"))
-
- msg1 = queue.get(timeout=1)
- msg2 = queue.get(timeout=1)
- msg3 = queue.get(timeout=1)
- msg4 = queue.get(timeout=1)
- msg5 = queue.get(timeout=1)
-
- self.assertEqual("One", msg1.content.body)
- self.assertEqual("Two", msg2.content.body)
- self.assertEqual("Three", msg3.content.body)
- self.assertEqual("Four", msg4.content.body)
- self.assertEqual("Five", msg5.content.body)
-
- channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two
- channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
-
- channel.basic_cancel(consumer_tag=subscription.consumer_tag)
- subscription2 = channel.basic_consume(queue="test-requeue")
- queue2 = self.client.queue(subscription2.consumer_tag)
-
- channel.basic_recover(requeue=True)
-
- msg3b = queue2.get(timeout=1)
- msg5b = queue2.get(timeout=1)
-
- self.assertEqual("Three", msg3b.content.body)
- self.assertEqual("Five", msg5b.content.body)
-
- self.assertEqual(True, msg3b.redelivered)
- self.assertEqual(True, msg5b.redelivered)
-
- try:
- extra = queue2.get(timeout=1)
- self.fail("Got unexpected message in second queue: " + extra.content.body)
- except Empty: None
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected message in original queue: " + extra.content.body)
- except Empty: None
-
-
- def test_qos_prefetch_count(self):
- """
- Test that the prefetch count specified is honoured
- """
- #setup: declare queue and subscribe
- channel = self.channel
- channel.queue_declare(queue="test-prefetch-count", exclusive=True)
- subscription = channel.basic_consume(queue="test-prefetch-count", no_ack=False)
- queue = self.client.queue(subscription.consumer_tag)
-
- #set prefetch to 5:
- channel.basic_qos(prefetch_count=5)
-
- #publish 10 messages:
- for i in range(1, 11):
- channel.basic_publish(routing_key="test-prefetch-count", content=Content("Message %d" % i))
-
- #only 5 messages should have been delivered:
- for i in range(1, 6):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
- except Empty: None
-
- #ack messages and check that the next set arrive ok:
- channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
-
- for i in range(6, 11):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
-
- channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
- except Empty: None
-
-
-
- def test_qos_prefetch_size(self):
- """
- Test that the prefetch size specified is honoured
- """
- #setup: declare queue and subscribe
- channel = self.channel
- channel.queue_declare(queue="test-prefetch-size", exclusive=True)
- subscription = channel.basic_consume(queue="test-prefetch-size", no_ack=False)
- queue = self.client.queue(subscription.consumer_tag)
-
- #set prefetch to 50 bytes (each message is 9 or 10 bytes):
- channel.basic_qos(prefetch_size=50)
-
- #publish 10 messages:
- for i in range(1, 11):
- channel.basic_publish(routing_key="test-prefetch-size", content=Content("Message %d" % i))
-
- #only 5 messages should have been delivered (i.e. 45 bytes worth):
- for i in range(1, 6):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
- except Empty: None
-
- #ack messages and check that the next set arrive ok:
- channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
-
- for i in range(6, 11):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
-
- channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
- except Empty: None
-
- #make sure that a single oversized message still gets delivered
- large = "abcdefghijklmnopqrstuvwxyz"
- large = large + "-" + large;
- channel.basic_publish(routing_key="test-prefetch-size", content=Content(large))
- msg = queue.get(timeout=1)
- self.assertEqual(large, msg.content.body)
-
- def test_get(self):
- """
- Test basic_get method
- """
- channel = self.channel
- channel.queue_declare(queue="test-get", exclusive=True)
-
- #publish some messages (no_ack=True)
- for i in range(1, 11):
- channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
-
- #use basic_get to read back the messages, and check that we get an empty at the end
- for i in range(1, 11):
- reply = channel.basic_get(no_ack=True)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get_ok")
- self.assertEqual("Message %d" % i, reply.content.body)
-
- reply = channel.basic_get(no_ack=True)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get_empty")
-
- #repeat for no_ack=False
- for i in range(11, 21):
- channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
-
- for i in range(11, 21):
- reply = channel.basic_get(no_ack=False)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get_ok")
- self.assertEqual("Message %d" % i, reply.content.body)
- if(i == 13):
- channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True)
- if(i in [15, 17, 19]):
- channel.basic_ack(delivery_tag=reply.delivery_tag)
-
- reply = channel.basic_get(no_ack=True)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get_empty")
-
- #recover(requeue=True)
- channel.basic_recover(requeue=True)
-
- #get the unacked messages again (14, 16, 18, 20)
- for i in [14, 16, 18, 20]:
- reply = channel.basic_get(no_ack=False)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get_ok")
- self.assertEqual("Message %d" % i, reply.content.body)
- channel.basic_ack(delivery_tag=reply.delivery_tag)
-
- reply = channel.basic_get(no_ack=True)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get_empty")
-
- channel.basic_recover(requeue=True)
-
- reply = channel.basic_get(no_ack=True)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get_empty")
diff --git a/qpid/python/tests/broker.py b/qpid/python/tests/broker.py
deleted file mode 100644
index ebb3a525f5..0000000000
--- a/qpid/python/tests/broker.py
+++ /dev/null
@@ -1,109 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class BrokerTests(TestBase):
- """Tests for basic Broker functionality"""
-
- def test_ack_and_no_ack(self):
- """
- First, this test tries to receive a message with a no-ack
- consumer. Second, this test tries to explicitely receive and
- acknowledge a message with an acknowledging consumer.
- """
- ch = self.channel
- self.queue_declare(ch, queue = "myqueue")
-
- # No ack consumer
- ctag = "tag1"
- ch.message_consume(queue = "myqueue", destination = ctag, no_ack = True)
- body = "test no-ack"
- ch.message_transfer(routing_key = "myqueue", body = body)
- msg = self.client.queue(ctag).get(timeout = 5)
- self.assert_(msg.body == body)
-
- # Acknowledging consumer
- self.queue_declare(ch, queue = "otherqueue")
- ctag = "tag2"
- ch.message_consume(queue = "otherqueue", destination = ctag, no_ack = False)
- body = "test ack"
- ch.message_transfer(routing_key = "otherqueue", body = body)
- msg = self.client.queue(ctag).get(timeout = 5)
- msg.ok()
- self.assert_(msg.body == body)
-
- def test_simple_delivery_immediate(self):
- """
- Test simple message delivery where consume is issued before publish
- """
- channel = self.channel
- self.exchange_declare(channel, exchange="test-exchange", type="direct")
- self.queue_declare(channel, queue="test-queue")
- channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
- consumer_tag = "tag1"
- channel.message_consume(queue="test-queue", destination=consumer_tag, no_ack=True)
- queue = self.client.queue(consumer_tag)
-
- body = "Immediate Delivery"
- channel.message_transfer(destination="test-exchange", routing_key="key", body=body, immediate=True)
- msg = queue.get(timeout=5)
- self.assert_(msg.body == body)
-
- # TODO: Ensure we fail if immediate=True and there's no consumer.
-
-
- def test_simple_delivery_queued(self):
- """
- Test basic message delivery where publish is issued before consume
- (i.e. requires queueing of the message)
- """
- channel = self.channel
- self.exchange_declare(channel, exchange="test-exchange", type="direct")
- self.queue_declare(channel, queue="test-queue")
- channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
- body = "Queued Delivery"
- channel.message_transfer(destination="test-exchange", routing_key="key", body=body)
-
- consumer_tag = "tag1"
- channel.message_consume(queue="test-queue", destination=consumer_tag, no_ack=True)
- queue = self.client.queue(consumer_tag)
- msg = queue.get(timeout=5)
- self.assert_(msg.body == body)
-
- def test_invalid_channel(self):
- channel = self.client.channel(200)
- try:
- channel.queue_declare(exclusive=True)
- self.fail("Expected error on queue_declare for invalid channel")
- except Closed, e:
- self.assertConnectionException(504, e.args[0])
-
- def test_closed_channel(self):
- channel = self.client.channel(200)
- channel.channel_open()
- channel.channel_close()
- try:
- channel.queue_declare(exclusive=True)
- self.fail("Expected error on queue_declare for closed channel")
- except Closed, e:
- self.assertConnectionException(504, e.args[0])
-
diff --git a/qpid/python/tests/example.py b/qpid/python/tests/example.py
deleted file mode 100644
index 7ab4cc7d0a..0000000000
--- a/qpid/python/tests/example.py
+++ /dev/null
@@ -1,94 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class ExampleTest (TestBase):
- """
- An example Qpid test, illustrating the unittest frameowkr and the
- python Qpid client. The test class must inherit TestCase. The
- test code uses the Qpid client to interact with a qpid broker and
- verify it behaves as expected.
- """
-
- def test_example(self):
- """
- An example test. Note that test functions must start with 'test_'
- to be recognized by the test framework.
- """
-
- # By inheriting TestBase, self.client is automatically connected
- # and self.channel is automatically opened as channel(1)
- # Other channel methods mimic the protocol.
- channel = self.channel
-
- # Now we can send regular commands. If you want to see what the method
- # arguments mean or what other commands are available, you can use the
- # python builtin help() method. For example:
- #help(chan)
- #help(chan.exchange_declare)
-
- # If you want browse the available protocol methods without being
- # connected to a live server you can use the amqp-doc utility:
- #
- # Usage amqp-doc [<options>] <spec> [<pattern_1> ... <pattern_n>]
- #
- # Options:
- # -e, --regexp use regex instead of glob when matching
-
- # Now that we know what commands are available we can use them to
- # interact with the server.
-
- # Here we use ordinal arguments.
- self.exchange_declare(channel, 0, "test", "direct")
-
- # Here we use keyword arguments.
- self.queue_declare(channel, queue="test-queue")
- channel.queue_bind(queue="test-queue", exchange="test", routing_key="key")
-
- # Call Channel.basic_consume to register as a consumer.
- # All the protocol methods return a message object. The message object
- # has fields corresponding to the reply method fields, plus a content
- # field that is filled if the reply includes content. In this case the
- # interesting field is the consumer_tag.
- channel.message_consume(queue="test-queue", destination="consumer_tag")
-
- # We can use the Client.queue(...) method to access the queue
- # corresponding to our consumer_tag.
- queue = self.client.queue("consumer_tag")
-
- # Now lets publish a message and see if our consumer gets it. To do
- # this we need to import the Content class.
- body = "Hello World!"
- channel.message_transfer(destination="test",
- routing_key="key",
- body = body)
-
- # Now we'll wait for the message to arrive. We can use the timeout
- # argument in case the server hangs. By default queue.get() will wait
- # until a message arrives or the connection to the server dies.
- msg = queue.get(timeout=10)
-
- # And check that we got the right response with assertEqual
- self.assertEqual(body, msg.body)
-
- # Now acknowledge the message.
- msg.ok()
-
diff --git a/qpid/python/tests/exchange.py b/qpid/python/tests/exchange.py
deleted file mode 100644
index 54c462de24..0000000000
--- a/qpid/python/tests/exchange.py
+++ /dev/null
@@ -1,327 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-Tests for exchange behaviour.
-
-Test classes ending in 'RuleTests' are derived from rules in amqp.xml.
-"""
-
-import Queue, logging
-from qpid.testlib import TestBase
-from qpid.content import Content
-from qpid.client import Closed
-
-
-class StandardExchangeVerifier:
- """Verifies standard exchange behavior.
-
- Used as base class for classes that test standard exchanges."""
-
- def verifyDirectExchange(self, ex):
- """Verify that ex behaves like a direct exchange."""
- self.queue_declare(queue="q")
- self.channel.queue_bind(queue="q", exchange=ex, routing_key="k")
- self.assertPublishConsume(exchange=ex, queue="q", routing_key="k")
- try:
- self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk")
- self.fail("Expected Empty exception")
- except Queue.Empty: None # Expected
-
- def verifyFanOutExchange(self, ex):
- """Verify that ex behaves like a fanout exchange."""
- self.queue_declare(queue="q")
- self.channel.queue_bind(queue="q", exchange=ex)
- self.queue_declare(queue="p")
- self.channel.queue_bind(queue="p", exchange=ex)
- for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex)
-
- def verifyTopicExchange(self, ex):
- """Verify that ex behaves like a topic exchange"""
- self.queue_declare(queue="a")
- self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*")
- q = self.consume("a")
- self.assertPublishGet(q, ex, "a.b.x")
- self.assertPublishGet(q, ex, "a.x.b.x")
- self.assertPublishGet(q, ex, "a.x.x.b.x")
- # Shouldn't match
- self.channel.message_transfer(destination=ex, routing_key="a.b")
- self.channel.message_transfer(destination=ex, routing_key="a.b.x.y")
- self.channel.message_transfer(destination=ex, routing_key="x.a.b.x")
- self.channel.message_transfer(destination=ex, routing_key="a.b")
- self.assert_(q.empty())
-
- def verifyHeadersExchange(self, ex):
- """Verify that ex is a headers exchange"""
- self.queue_declare(queue="q")
- self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} )
- q = self.consume("q")
- headers = {"name":"fred", "age":3}
- self.assertPublishGet(q, exchange=ex, properties=headers)
- self.channel.message_transfer(destination=ex, body="") # No headers, won't deliver
- self.assertEmpty(q);
-
-
-class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier):
- """
- The server SHOULD implement these standard exchange types: topic, headers.
-
- Client attempts to declare an exchange with each of these standard types.
- """
-
- def testDirect(self):
- """Declare and test a direct exchange"""
- self.exchange_declare(0, exchange="d", type="direct")
- self.verifyDirectExchange("d")
-
- def testFanout(self):
- """Declare and test a fanout exchange"""
- self.exchange_declare(0, exchange="f", type="fanout")
- self.verifyFanOutExchange("f")
-
- def testTopic(self):
- """Declare and test a topic exchange"""
- self.exchange_declare(0, exchange="t", type="topic")
- self.verifyTopicExchange("t")
-
- def testHeaders(self):
- """Declare and test a headers exchange"""
- self.exchange_declare(0, exchange="h", type="headers")
- self.verifyHeadersExchange("h")
-
-
-class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
- """
- The server MUST, in each virtual host, pre-declare an exchange instance
- for each standard exchange type that it implements, where the name of the
- exchange instance is amq. followed by the exchange type name.
-
- Client creates a temporary queue and attempts to bind to each required
- exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if
- those types are defined).
- """
- def testAmqDirect(self): self.verifyDirectExchange("amq.direct")
-
- def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout")
-
- def testAmqTopic(self): self.verifyTopicExchange("amq.topic")
-
- def testAmqMatch(self): self.verifyHeadersExchange("amq.match")
-
-class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier):
- """
- The server MUST predeclare a direct exchange to act as the default exchange
- for content Publish methods and for default queue bindings.
-
- Client checks that the default exchange is active by specifying a queue
- binding with no exchange name, and publishing a message with a suitable
- routing key but without specifying the exchange name, then ensuring that
- the message arrives in the queue correctly.
- """
- def testDefaultExchange(self):
- # Test automatic binding by queue name.
- self.queue_declare(queue="d")
- self.assertPublishConsume(queue="d", routing_key="d")
- # Test explicit bind to default queue
- self.verifyDirectExchange("")
-
-
-# TODO aconway 2006-09-27: Fill in empty tests:
-
-class DefaultAccessRuleTests(TestBase):
- """
- The server MUST NOT allow clients to access the default exchange except
- by specifying an empty exchange name in the Queue.Bind and content Publish
- methods.
- """
-
-class ExtensionsRuleTests(TestBase):
- """
- The server MAY implement other exchange types as wanted.
- """
-
-
-class DeclareMethodMinimumRuleTests(TestBase):
- """
- The server SHOULD support a minimum of 16 exchanges per virtual host and
- ideally, impose no limit except as defined by available resources.
-
- The client creates as many exchanges as it can until the server reports
- an error; the number of exchanges successfuly created must be at least
- sixteen.
- """
-
-
-class DeclareMethodTicketFieldValidityRuleTests(TestBase):
- """
- The client MUST provide a valid access ticket giving "active" access to
- the realm in which the exchange exists or will be created, or "passive"
- access if the if-exists flag is set.
-
- Client creates access ticket with wrong access rights and attempts to use
- in this method.
- """
-
-
-class DeclareMethodExchangeFieldReservedRuleTests(TestBase):
- """
- Exchange names starting with "amq." are reserved for predeclared and
- standardised exchanges. The client MUST NOT attempt to create an exchange
- starting with "amq.".
-
-
- """
-
-
-class DeclareMethodTypeFieldTypedRuleTests(TestBase):
- """
- Exchanges cannot be redeclared with different types. The client MUST not
- attempt to redeclare an existing exchange with a different type than used
- in the original Exchange.Declare method.
-
-
- """
-
-
-class DeclareMethodTypeFieldSupportRuleTests(TestBase):
- """
- The client MUST NOT attempt to create an exchange with a type that the
- server does not support.
-
-
- """
-
-
-class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase):
- """
- If set, and the exchange does not already exist, the server MUST raise a
- channel exception with reply code 404 (not found).
- """
- def test(self):
- try:
- self.channel.exchange_declare(exchange="humpty_dumpty", passive=True)
- self.fail("Expected 404 for passive declaration of unknown exchange.")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
-
-class DeclareMethodDurableFieldSupportRuleTests(TestBase):
- """
- The server MUST support both durable and transient exchanges.
-
-
- """
-
-
-class DeclareMethodDurableFieldStickyRuleTests(TestBase):
- """
- The server MUST ignore the durable field if the exchange already exists.
-
-
- """
-
-
-class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase):
- """
- The server MUST ignore the auto-delete field if the exchange already
- exists.
-
-
- """
-
-
-class DeleteMethodTicketFieldValidityRuleTests(TestBase):
- """
- The client MUST provide a valid access ticket giving "active" access
- rights to the exchange's access realm.
-
- Client creates access ticket with wrong access rights and attempts to use
- in this method.
- """
-
-
-class DeleteMethodExchangeFieldExistsRuleTests(TestBase):
- """
- The client MUST NOT attempt to delete an exchange that does not exist.
- """
-
-
-class HeadersExchangeTests(TestBase):
- """
- Tests for headers exchange functionality.
- """
- def setUp(self):
- TestBase.setUp(self)
- self.queue_declare(queue="q")
- self.q = self.consume("q")
-
- def myAssertPublishGet(self, headers):
- self.assertPublishGet(self.q, exchange="amq.match", properties=headers)
-
- def myBasicPublish(self, headers):
- self.channel.message_transfer(destination="amq.match", body="foobar", application_headers=headers)
-
- def testMatchAll(self):
- self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3})
- self.myAssertPublishGet({"name":"fred", "age":3})
- self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"})
-
- # None of these should match
- self.myBasicPublish({})
- self.myBasicPublish({"name":"barney"})
- self.myBasicPublish({"name":10})
- self.myBasicPublish({"name":"fred", "age":2})
- self.assertEmpty(self.q)
-
- def testMatchAny(self):
- self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3})
- self.myAssertPublishGet({"name":"fred"})
- self.myAssertPublishGet({"name":"fred", "ignoreme":10})
- self.myAssertPublishGet({"ignoreme":10, "age":3})
-
- # Wont match
- self.myBasicPublish({})
- self.myBasicPublish({"irrelevant":0})
- self.assertEmpty(self.q)
-
-
-class MiscellaneousErrorsTests(TestBase):
- """
- Test some miscellaneous error conditions
- """
- def testTypeNotKnown(self):
- try:
- self.channel.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type")
- self.fail("Expected 503 for declaration of unknown exchange type.")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def testDifferentDeclaredType(self):
- self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="direct")
- try:
- self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic")
- self.fail("Expected 530 for redeclaration of exchange with different type.")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
- #cleanup
- other = self.connect()
- c2 = other.channel(1)
- c2.channel_open()
- c2.exchange_delete(exchange="test_different_declared_type_exchange")
-
diff --git a/qpid/python/tests/message.py b/qpid/python/tests/message.py
deleted file mode 100644
index d5f5d4dbc2..0000000000
--- a/qpid/python/tests/message.py
+++ /dev/null
@@ -1,659 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Client, Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-from qpid.reference import Reference, ReferenceId
-
-class MessageTests(TestBase):
- """Tests for 'methods' on the amqp message 'class'"""
-
- def test_consume_no_local(self):
- """
- Test that the no_local flag is honoured in the consume method
- """
- channel = self.channel
- #setup, declare two queues:
- channel.queue_declare(queue="test-queue-1a", exclusive=True)
- channel.queue_declare(queue="test-queue-1b", exclusive=True)
- #establish two consumers one of which excludes delivery of locally sent messages
- channel.message_consume(destination="local_included", queue="test-queue-1a")
- channel.message_consume(destination="local_excluded", queue="test-queue-1b", no_local=True)
-
- #send a message
- channel.message_transfer(routing_key="test-queue-1a", body="consume_no_local")
- channel.message_transfer(routing_key="test-queue-1b", body="consume_no_local")
-
- #check the queues of the two consumers
- excluded = self.client.queue("local_excluded")
- included = self.client.queue("local_included")
- msg = included.get(timeout=1)
- self.assertEqual("consume_no_local", msg.body)
- try:
- excluded.get(timeout=1)
- self.fail("Received locally published message though no_local=true")
- except Empty: None
-
-
- def test_consume_exclusive(self):
- """
- Test that the exclusive flag is honoured in the consume method
- """
- channel = self.channel
- #setup, declare a queue:
- channel.queue_declare(queue="test-queue-2", exclusive=True)
-
- #check that an exclusive consumer prevents other consumer being created:
- channel.message_consume(destination="first", queue="test-queue-2", exclusive=True)
- try:
- channel.message_consume(destination="second", queue="test-queue-2")
- self.fail("Expected consume request to fail due to previous exclusive consumer")
- except Closed, e:
- self.assertChannelException(403, e.args[0])
-
- #open new channel and cleanup last consumer:
- channel = self.client.channel(2)
- channel.channel_open()
-
- #check that an exclusive consumer cannot be created if a consumer already exists:
- channel.message_consume(destination="first", queue="test-queue-2")
- try:
- channel.message_consume(destination="second", queue="test-queue-2", exclusive=True)
- self.fail("Expected exclusive consume request to fail due to previous consumer")
- except Closed, e:
- self.assertChannelException(403, e.args[0])
-
- def test_consume_queue_errors(self):
- """
- Test error conditions associated with the queue field of the consume method:
- """
- channel = self.channel
- try:
- #queue specified but doesn't exist:
- channel.message_consume(queue="invalid-queue")
- self.fail("Expected failure when consuming from non-existent queue")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- channel = self.client.channel(2)
- channel.channel_open()
- try:
- #queue not specified and none previously declared for channel:
- channel.message_consume(queue="")
- self.fail("Expected failure when consuming from unspecified queue")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
-
- def test_consume_unique_consumers(self):
- """
- Ensure unique consumer tags are enforced
- """
- channel = self.channel
- #setup, declare a queue:
- channel.queue_declare(queue="test-queue-3", exclusive=True)
-
- #check that attempts to use duplicate tags are detected and prevented:
- channel.message_consume(destination="first", queue="test-queue-3")
- try:
- channel.message_consume(destination="first", queue="test-queue-3")
- self.fail("Expected consume request to fail due to non-unique tag")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
-
- def test_cancel(self):
- """
- Test compliance of the basic.cancel method
- """
- channel = self.channel
- #setup, declare a queue:
- channel.queue_declare(queue="test-queue-4", exclusive=True)
- channel.message_consume(destination="my-consumer", queue="test-queue-4")
- channel.message_transfer(routing_key="test-queue-4", body="One")
-
- #cancel should stop messages being delivered
- channel.message_cancel(destination="my-consumer")
- channel.message_transfer(routing_key="test-queue-4", body="Two")
- myqueue = self.client.queue("my-consumer")
- msg = myqueue.get(timeout=1)
- self.assertEqual("One", msg.body)
- try:
- msg = myqueue.get(timeout=1)
- self.fail("Got message after cancellation: " + msg)
- except Empty: None
-
- #cancellation of non-existant consumers should be handled without error
- channel.message_cancel(destination="my-consumer")
- channel.message_cancel(destination="this-never-existed")
-
-
- def test_ack(self):
- """
- Test basic ack/recover behaviour
- """
- channel = self.channel
- channel.queue_declare(queue="test-ack-queue", exclusive=True)
-
- channel.message_consume(queue="test-ack-queue", destination="consumer_tag", no_ack=False)
- queue = self.client.queue("consumer_tag")
-
- channel.message_transfer(routing_key="test-ack-queue", body="One")
- channel.message_transfer(routing_key="test-ack-queue", body="Two")
- channel.message_transfer(routing_key="test-ack-queue", body="Three")
- channel.message_transfer(routing_key="test-ack-queue", body="Four")
- channel.message_transfer(routing_key="test-ack-queue", body="Five")
-
- msg1 = queue.get(timeout=1)
- msg2 = queue.get(timeout=1)
- msg3 = queue.get(timeout=1)
- msg4 = queue.get(timeout=1)
- msg5 = queue.get(timeout=1)
-
- self.assertEqual("One", msg1.body)
- self.assertEqual("Two", msg2.body)
- self.assertEqual("Three", msg3.body)
- self.assertEqual("Four", msg4.body)
- self.assertEqual("Five", msg5.body)
-
- msg1.ok(batchoffset=1)#One and Two
- msg4.ok()
-
- channel.message_recover(requeue=False)
-
- msg3b = queue.get(timeout=1)
- msg5b = queue.get(timeout=1)
-
- self.assertEqual("Three", msg3b.body)
- self.assertEqual("Five", msg5b.body)
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected message: " + extra.body)
- except Empty: None
-
- def test_recover_requeue(self):
- """
- Test requeing on recovery
- """
- channel = self.channel
- channel.queue_declare(queue="test-requeue", exclusive=True)
-
- channel.message_consume(queue="test-requeue", destination="consumer_tag", no_ack=False)
- queue = self.client.queue("consumer_tag")
-
- channel.message_transfer(routing_key="test-requeue", body="One")
- channel.message_transfer(routing_key="test-requeue", body="Two")
- channel.message_transfer(routing_key="test-requeue", body="Three")
- channel.message_transfer(routing_key="test-requeue", body="Four")
- channel.message_transfer(routing_key="test-requeue", body="Five")
-
- msg1 = queue.get(timeout=1)
- msg2 = queue.get(timeout=1)
- msg3 = queue.get(timeout=1)
- msg4 = queue.get(timeout=1)
- msg5 = queue.get(timeout=1)
-
- self.assertEqual("One", msg1.body)
- self.assertEqual("Two", msg2.body)
- self.assertEqual("Three", msg3.body)
- self.assertEqual("Four", msg4.body)
- self.assertEqual("Five", msg5.body)
-
- msg1.ok(batchoffset=1) #One and Two
- msg4.ok() #Four
-
- channel.message_cancel(destination="consumer_tag")
- channel.message_consume(queue="test-requeue", destination="consumer_tag")
- queue2 = self.client.queue("consumer_tag")
-
- channel.message_recover(requeue=True)
-
- msg3b = queue2.get(timeout=1)
- msg5b = queue2.get(timeout=1)
-
- self.assertEqual("Three", msg3b.body)
- self.assertEqual("Five", msg5b.body)
-
- self.assertEqual(True, msg3b.redelivered)
- self.assertEqual(True, msg5b.redelivered)
-
- try:
- extra = queue2.get(timeout=1)
- self.fail("Got unexpected message in second queue: " + extra.body)
- except Empty: None
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected message in original queue: " + extra.body)
- except Empty: None
-
-
- def test_qos_prefetch_count(self):
- """
- Test that the prefetch count specified is honoured
- """
- #setup: declare queue and subscribe
- channel = self.channel
- channel.queue_declare(queue="test-prefetch-count", exclusive=True)
- subscription = channel.message_consume(queue="test-prefetch-count", destination="consumer_tag", no_ack=False)
- queue = self.client.queue("consumer_tag")
-
- #set prefetch to 5:
- channel.message_qos(prefetch_count=5)
-
- #publish 10 messages:
- for i in range(1, 11):
- channel.message_transfer(routing_key="test-prefetch-count", body="Message %d" % i)
-
- #only 5 messages should have been delivered:
- for i in range(1, 6):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 6th message in original queue: " + extra.body)
- except Empty: None
-
- #ack messages and check that the next set arrive ok:
- #todo: once batching is implmented, send a single response for all messages
- msg.ok(batchoffset=-4)#1-5
-
- for i in range(6, 11):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
-
- msg.ok(batchoffset=-4)#6-10
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 11th message in original queue: " + extra.body)
- except Empty: None
-
-
-
- def test_qos_prefetch_size(self):
- """
- Test that the prefetch size specified is honoured
- """
- #setup: declare queue and subscribe
- channel = self.channel
- channel.queue_declare(queue="test-prefetch-size", exclusive=True)
- subscription = channel.message_consume(queue="test-prefetch-size", destination="consumer_tag", no_ack=False)
- queue = self.client.queue("consumer_tag")
-
- #set prefetch to 50 bytes (each message is 9 or 10 bytes):
- channel.message_qos(prefetch_size=50)
-
- #publish 10 messages:
- for i in range(1, 11):
- channel.message_transfer(routing_key="test-prefetch-size", body="Message %d" % i)
-
- #only 5 messages should have been delivered (i.e. 45 bytes worth):
- for i in range(1, 6):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 6th message in original queue: " + extra.body)
- except Empty: None
-
- #ack messages and check that the next set arrive ok:
- msg.ok(batchoffset=-4)#1-5
-
- for i in range(6, 11):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
-
- msg.ok(batchoffset=-4)#6-10
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 11th message in original queue: " + extra.body)
- except Empty: None
-
- #make sure that a single oversized message still gets delivered
- large = "abcdefghijklmnopqrstuvwxyz"
- large = large + "-" + large;
- channel.message_transfer(routing_key="test-prefetch-size", body=large)
- msg = queue.get(timeout=1)
- self.assertEqual(large, msg.body)
-
- def test_get(self):
- """
- Test message_get method
- """
- channel = self.channel
- channel.queue_declare(queue="test-get", exclusive=True)
-
- #publish some messages (no_ack=True)
- for i in range(1, 11):
- channel.message_transfer(routing_key="test-get", body="Message %d" % i)
-
- #use message_get to read back the messages, and check that we get an empty at the end
- for i in range(1, 11):
- tag = "queue %d" % i
- reply = channel.message_get(no_ack=True, queue="test-get", destination=tag)
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "ok")
- self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body)
-
- reply = channel.message_get(no_ack=True, queue="test-get")
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "get-empty")
-
- #repeat for no_ack=False
- for i in range(11, 21):
- channel.message_transfer(routing_key="test-get", body="Message %d" % i)
-
- for i in range(11, 21):
- tag = "queue %d" % i
- reply = channel.message_get(no_ack=False, queue="test-get", destination=tag)
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "ok")
- msg = self.client.queue(tag).get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
-
- if (i==13):
- msg.ok(batchoffset=-2)#11, 12 & 13
- if(i in [15, 17, 19]):
- msg.ok()
-
- reply = channel.message_get(no_ack=True, queue="test-get")
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "get-empty")
-
- #recover(requeue=True)
- channel.message_recover(requeue=True)
-
- #get the unacked messages again (14, 16, 18, 20)
- for i in [14, 16, 18, 20]:
- tag = "queue %d" % i
- reply = channel.message_get(no_ack=False, queue="test-get", destination=tag)
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "ok")
- self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body)
- reply.ok()
- #channel.message_ack(delivery_tag=reply.delivery_tag)
-
- reply = channel.message_get(no_ack=True, queue="test-get")
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "get-empty")
-
- channel.message_recover(requeue=True)
-
- reply = channel.message_get(no_ack=True, queue="test-get")
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "get-empty")
-
- def test_reference_simple(self):
- """
- Test basic ability to handle references
- """
- channel = self.channel
- channel.queue_declare(queue="ref_queue", exclusive=True)
- channel.message_consume(queue="ref_queue", destination="c1")
- queue = self.client.queue("c1")
-
- refId = "myref"
- channel.message_open(reference=refId)
- channel.message_append(reference=refId, bytes="abcd")
- channel.synchronous = False
- ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId))
- channel.synchronous = True
-
- channel.message_append(reference=refId, bytes="efgh")
- channel.message_append(reference=refId, bytes="ijkl")
- channel.message_close(reference=refId)
-
- #first, wait for the ok for the transfer
- ack.get_response(timeout=1)
-
- msg = queue.get(timeout=1)
- if isinstance(msg, Reference):
- #should we force broker to deliver as reference by frame
- #size limit? or test that separately? for compliance,
- #allowing either seems best for now...
- data = msg.get_complete()
- else:
- data = msg.body
- self.assertEquals("abcdefghijkl", data)
-
-
- def test_reference_large(self):
- """
- Test basic ability to handle references whose content exceeds max frame size
- """
- channel = self.channel
- self.queue_declare(queue="ref_queue")
-
- #generate a big data string (> max frame size of consumer):
- data = "0123456789"
- for i in range(0, 10):
- data += data
- #send it inline
- channel.synchronous = False
- ack = channel.message_transfer(routing_key="ref_queue", body=data)
- channel.synchronous = True
- #first, wait for the ok for the transfer
- ack.get_response(timeout=1)
-
- #create a new connection for consumer, with specific max frame size (< data)
- other = self.connect(tune_params={"channel_max":10, "frame_max":5120, "heartbeat":0})
- ch2 = other.channel(1)
- ch2.channel_open()
- ch2.message_consume(queue="ref_queue", destination="c1")
- queue = other.queue("c1")
-
- msg = queue.get(timeout=1)
- self.assertTrue(isinstance(msg.body, ReferenceId))
- self.assertTrue(msg.reference)
- self.assertEquals(data, msg.reference.get_complete())
-
- def test_reference_completion(self):
- """
- Test that reference transfer are not deemed complete until
- closed (therefore are not acked or routed until that point)
- """
- channel = self.channel
- channel.queue_declare(queue="ref_queue", exclusive=True)
- channel.message_consume(queue="ref_queue", destination="c1")
- queue = self.client.queue("c1")
-
- refId = "myref"
- channel.message_open(reference=refId)
- channel.message_append(reference=refId, bytes="abcd")
- channel.synchronous = False
- ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId))
- channel.synchronous = True
-
- try:
- msg = queue.get(timeout=1)
- self.fail("Got unexpected message on queue: " + msg)
- except Empty: None
-
- self.assertTrue(not ack.is_complete())
-
- channel.message_close(reference=refId)
-
- #first, wait for the ok for the transfer
- ack.get_response(timeout=1)
-
- self.assertDataEquals(channel, queue.get(timeout=1), "abcd")
-
- def test_reference_multi_transfer(self):
- """
- Test that multiple transfer requests for the same reference are
- correctly handled.
- """
- channel = self.channel
- #declare and consume from two queues
- channel.queue_declare(queue="q-one", exclusive=True)
- channel.queue_declare(queue="q-two", exclusive=True)
- channel.message_consume(queue="q-one", destination="q-one")
- channel.message_consume(queue="q-two", destination="q-two")
- queue1 = self.client.queue("q-one")
- queue2 = self.client.queue("q-two")
-
- #transfer a single ref to both queues (in separate commands)
- channel.message_open(reference="my-ref")
- channel.synchronous = False
- ack1 = channel.message_transfer(routing_key="q-one", body=ReferenceId("my-ref"))
- channel.message_append(reference="my-ref", bytes="my data")
- ack2 = channel.message_transfer(routing_key="q-two", body=ReferenceId("my-ref"))
- channel.synchronous = True
- channel.message_close(reference="my-ref")
-
- #check that both queues have the message
- self.assertDataEquals(channel, queue1.get(timeout=1), "my data")
- self.assertDataEquals(channel, queue2.get(timeout=1), "my data")
- self.assertEmpty(queue1)
- self.assertEmpty(queue2)
-
- #transfer a single ref to the same queue twice (in separate commands)
- channel.message_open(reference="my-ref")
- channel.synchronous = False
- ack1 = channel.message_transfer(routing_key="q-one", message_id="abc", body=ReferenceId("my-ref"))
- channel.message_append(reference="my-ref", bytes="second message")
- ack2 = channel.message_transfer(routing_key="q-one", message_id="xyz", body=ReferenceId("my-ref"))
- channel.synchronous = True
- channel.message_close(reference="my-ref")
-
- msg1 = queue1.get(timeout=1)
- msg2 = queue1.get(timeout=1)
- #order is undefined
- if msg1.message_id == "abc":
- self.assertEquals(msg2.message_id, "xyz")
- else:
- self.assertEquals(msg1.message_id, "xyz")
- self.assertEquals(msg2.message_id, "abc")
-
- #would be legal for the incoming messages to be transfered
- #inline or by reference in any combination
-
- if isinstance(msg1.body, ReferenceId):
- self.assertEquals("second message", msg1.reference.get_complete())
- if isinstance(msg2.body, ReferenceId):
- if msg1.body != msg2.body:
- self.assertEquals("second message", msg2.reference.get_complete())
- #else ok, as same ref as msg1
- else:
- self.assertEquals("second message", msg1.body)
- if isinstance(msg2.body, ReferenceId):
- self.assertEquals("second message", msg2.reference.get_complete())
- else:
- self.assertEquals("second message", msg2.body)
-
- self.assertEmpty(queue1)
-
- def test_reference_unopened_on_append_error(self):
- channel = self.channel
- try:
- channel.message_append(reference="unopened")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def test_reference_unopened_on_close_error(self):
- channel = self.channel
- try:
- channel.message_close(reference="unopened")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def test_reference_unopened_on_transfer_error(self):
- channel = self.channel
- try:
- channel.message_transfer(body=ReferenceId("unopened"))
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def test_reference_already_opened_error(self):
- channel = self.channel
- channel.message_open(reference="a")
- try:
- channel.message_open(reference="a")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def test_empty_reference(self):
- channel = self.channel
- channel.queue_declare(queue="ref_queue", exclusive=True)
- channel.message_consume(queue="ref_queue", destination="c1")
- queue = self.client.queue("c1")
-
- refId = "myref"
- channel.message_open(reference=refId)
- channel.synchronous = False
- ack = channel.message_transfer(routing_key="ref_queue", message_id="empty-msg", body=ReferenceId(refId))
- channel.synchronous = True
- channel.message_close(reference=refId)
-
- #first, wait for the ok for the transfer
- ack.get_response(timeout=1)
-
- msg = queue.get(timeout=1)
- self.assertEquals(msg.message_id, "empty-msg")
- self.assertDataEquals(channel, msg, "")
-
- def test_reject(self):
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True)
-
- channel.message_consume(queue = "q", destination = "consumer")
- channel.message_transfer(routing_key = "q", body="blah, blah")
- msg = self.client.queue("consumer").get(timeout = 1)
- self.assertEquals(msg.body, "blah, blah")
- channel.message_cancel(destination = "consumer")
- msg.reject()
-
- channel.message_consume(queue = "q", destination = "checker")
- msg = self.client.queue("checker").get(timeout = 1)
- self.assertEquals(msg.body, "blah, blah")
-
- def test_checkpoint(self):
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True)
-
- channel.message_open(reference="my-ref")
- channel.message_append(reference="my-ref", bytes="abcdefgh")
- channel.message_append(reference="my-ref", bytes="ijklmnop")
- channel.message_checkpoint(reference="my-ref", identifier="my-checkpoint")
- channel.channel_close()
-
- channel = self.client.channel(2)
- channel.channel_open()
- channel.message_consume(queue = "q", destination = "consumer")
- offset = channel.message_resume(reference="my-ref", identifier="my-checkpoint").value
- self.assertEquals(offset, 16)
- channel.message_append(reference="my-ref", bytes="qrstuvwxyz")
- channel.synchronous = False
- channel.message_transfer(routing_key="q-one", message_id="abcd", body=ReferenceId("my-ref"))
- channel.synchronous = True
- channel.message_close(reference="my-ref")
-
- self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz")
- self.assertEmpty(self.client.queue("consumer"))
-
-
- def assertDataEquals(self, channel, msg, expected):
- if isinstance(msg.body, ReferenceId):
- data = msg.reference.get_complete()
- else:
- data = msg.body
- self.assertEquals("abcdefghijkl", data)
-
diff --git a/qpid/python/tests/queue.py b/qpid/python/tests/queue.py
deleted file mode 100644
index d85f04c4c2..0000000000
--- a/qpid/python/tests/queue.py
+++ /dev/null
@@ -1,255 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Client, Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class QueueTests(TestBase):
- """Tests for 'methods' on the amqp queue 'class'"""
-
- def test_purge(self):
- """
- Test that the purge method removes messages from the queue
- """
- channel = self.channel
- #setup, declare a queue and add some messages to it:
- channel.exchange_declare(exchange="test-exchange", type="direct")
- channel.queue_declare(queue="test-queue", exclusive=True)
- channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
- channel.message_transfer(destination="test-exchange", routing_key="key", body="one")
- channel.message_transfer(destination="test-exchange", routing_key="key", body="two")
- channel.message_transfer(destination="test-exchange", routing_key="key", body="three")
-
- #check that the queue now reports 3 messages:
- reply = channel.queue_declare(queue="test-queue")
- self.assertEqual(3, reply.message_count)
-
- #now do the purge, then test that three messages are purged and the count drops to 0
- reply = channel.queue_purge(queue="test-queue");
- self.assertEqual(3, reply.message_count)
- reply = channel.queue_declare(queue="test-queue")
- self.assertEqual(0, reply.message_count)
-
- #send a further message and consume it, ensuring that the other messages are really gone
- channel.message_transfer(destination="test-exchange", routing_key="key", content=Content("four"))
- channel.message_consume(queue="test-queue", destination="tag", no_ack=True)
- queue = self.client.queue("tag")
- msg = queue.get(timeout=1)
- self.assertEqual("four", msg.content.body)
-
- #check error conditions (use new channels):
- channel = self.client.channel(2)
- channel.channel_open()
- try:
- #queue specified but doesn't exist:
- channel.queue_purge(queue="invalid-queue")
- self.fail("Expected failure when purging non-existent queue")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- channel = self.client.channel(3)
- channel.channel_open()
- try:
- #queue not specified and none previously declared for channel:
- channel.queue_purge()
- self.fail("Expected failure when purging unspecified queue")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
-
- #cleanup
- other = self.connect()
- channel = other.channel(1)
- channel.channel_open()
- channel.exchange_delete(exchange="test-exchange")
-
- def test_declare_exclusive(self):
- """
- Test that the exclusive field is honoured in queue.declare
- """
- # TestBase.setUp has already opened channel(1)
- c1 = self.channel
- # Here we open a second separate connection:
- other = self.connect()
- c2 = other.channel(1)
- c2.channel_open()
-
- #declare an exclusive queue:
- c1.queue_declare(queue="exclusive-queue", exclusive="True")
- try:
- #other connection should not be allowed to declare this:
- c2.queue_declare(queue="exclusive-queue", exclusive="True")
- self.fail("Expected second exclusive queue_declare to raise a channel exception")
- except Closed, e:
- self.assertChannelException(405, e.args[0])
-
-
- def test_declare_passive(self):
- """
- Test that the passive field is honoured in queue.declare
- """
- channel = self.channel
- #declare an exclusive queue:
- channel.queue_declare(queue="passive-queue-1", exclusive="True")
- channel.queue_declare(queue="passive-queue-1", passive="True")
- try:
- #other connection should not be allowed to declare this:
- channel.queue_declare(queue="passive-queue-2", passive="True")
- self.fail("Expected passive declaration of non-existant queue to raise a channel exception")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
-
- def test_bind(self):
- """
- Test various permutations of the queue.bind method
- """
- channel = self.channel
- channel.queue_declare(queue="queue-1", exclusive="True")
-
- #straightforward case, both exchange & queue exist so no errors expected:
- channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1")
-
- #bind the default queue for the channel (i.e. last one declared):
- channel.queue_bind(exchange="amq.direct", routing_key="key2")
-
- #use the queue name where neither routing key nor queue are specified:
- channel.queue_bind(exchange="amq.direct")
-
- #try and bind to non-existant exchange
- try:
- channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1")
- self.fail("Expected bind to non-existant exchange to fail")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- #need to reopen a channel:
- channel = self.client.channel(2)
- channel.channel_open()
-
- #try and bind non-existant queue:
- try:
- channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1")
- self.fail("Expected bind of non-existant queue to fail")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
-
- def test_delete_simple(self):
- """
- Test core queue deletion behaviour
- """
- channel = self.channel
-
- #straight-forward case:
- channel.queue_declare(queue="delete-me")
- channel.message_transfer(routing_key="delete-me", body="a")
- channel.message_transfer(routing_key="delete-me", body="b")
- channel.message_transfer(routing_key="delete-me", body="c")
- reply = channel.queue_delete(queue="delete-me")
- self.assertEqual(3, reply.message_count)
- #check that it has gone be declaring passively
- try:
- channel.queue_declare(queue="delete-me", passive="True")
- self.fail("Queue has not been deleted")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- #check attempted deletion of non-existant queue is handled correctly:
- channel = self.client.channel(2)
- channel.channel_open()
- try:
- channel.queue_delete(queue="i-dont-exist", if_empty="True")
- self.fail("Expected delete of non-existant queue to fail")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
-
-
- def test_delete_ifempty(self):
- """
- Test that if_empty field of queue_delete is honoured
- """
- channel = self.channel
-
- #create a queue and add a message to it (use default binding):
- channel.queue_declare(queue="delete-me-2")
- channel.queue_declare(queue="delete-me-2", passive="True")
- channel.message_transfer(routing_key="delete-me-2", body="message")
-
- #try to delete, but only if empty:
- try:
- channel.queue_delete(queue="delete-me-2", if_empty="True")
- self.fail("Expected delete if_empty to fail for non-empty queue")
- except Closed, e:
- self.assertChannelException(406, e.args[0])
-
- #need new channel now:
- channel = self.client.channel(2)
- channel.channel_open()
-
- #empty queue:
- channel.message_consume(destination="consumer_tag", queue="delete-me-2", no_ack=True)
- queue = self.client.queue("consumer_tag")
- msg = queue.get(timeout=1)
- self.assertEqual("message", msg.content.body)
- channel.message_cancel(destination="consumer_tag")
-
- #retry deletion on empty queue:
- channel.queue_delete(queue="delete-me-2", if_empty="True")
-
- #check that it has gone by declaring passively:
- try:
- channel.queue_declare(queue="delete-me-2", passive="True")
- self.fail("Queue has not been deleted")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- def test_delete_ifunused(self):
- """
- Test that if_unused field of queue_delete is honoured
- """
- channel = self.channel
-
- #create a queue and register a consumer:
- channel.queue_declare(queue="delete-me-3")
- channel.queue_declare(queue="delete-me-3", passive="True")
- channel.message_consume(destination="consumer_tag", queue="delete-me-3", no_ack=True)
-
- #need new channel now:
- channel2 = self.client.channel(2)
- channel2.channel_open()
- #try to delete, but only if empty:
- try:
- channel2.queue_delete(queue="delete-me-3", if_unused="True")
- self.fail("Expected delete if_unused to fail for queue with existing consumer")
- except Closed, e:
- self.assertChannelException(406, e.args[0])
-
-
- channel.message_cancel(destination="consumer_tag")
- channel.queue_delete(queue="delete-me-3", if_unused="True")
- #check that it has gone by declaring passively:
- try:
- channel.queue_declare(queue="delete-me-3", passive="True")
- self.fail("Queue has not been deleted")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
-
diff --git a/qpid/python/tests/testlib.py b/qpid/python/tests/testlib.py
deleted file mode 100644
index f345fbbd80..0000000000
--- a/qpid/python/tests/testlib.py
+++ /dev/null
@@ -1,66 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-#
-# Tests for the testlib itself.
-#
-
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-from Queue import Empty
-
-import sys
-from traceback import *
-
-def mytrace(frame, event, arg):
- print_stack(frame);
- print "===="
- return mytrace
-
-class TestBaseTest(TestBase):
- """Verify TestBase functions work as expected"""
-
- def testAssertEmptyPass(self):
- """Test assert empty works"""
- self.queue_declare(queue="empty")
- q = self.consume("empty")
- self.assertEmpty(q)
- try:
- q.get(timeout=1)
- self.fail("Queue is not empty.")
- except Empty: None # Ignore
-
- def testAssertEmptyFail(self):
- self.queue_declare(queue="full")
- q = self.consume("full")
- self.channel.message_transfer(routing_key="full", body="")
- try:
- self.assertEmpty(q);
- self.fail("assertEmpty did not assert on non-empty queue")
- except AssertionError: None # Ignore
-
- def testMessageProperties(self):
- """Verify properties are passed with message"""
- props={"x":1, "y":2}
- self.queue_declare(queue="q")
- q = self.consume("q")
- self.assertPublishGet(q, routing_key="q", properties=props)
-
-
-
diff --git a/qpid/python/tests/tx.py b/qpid/python/tests/tx.py
deleted file mode 100644
index 0f6b4f5cd1..0000000000
--- a/qpid/python/tests/tx.py
+++ /dev/null
@@ -1,188 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Client, Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class TxTests(TestBase):
- """
- Tests for 'methods' on the amqp tx 'class'
- """
-
- def test_commit(self):
- """
- Test that commited publishes are delivered and commited acks are not re-delivered
- """
- channel = self.channel
- queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-commit-a", "tx-commit-b", "tx-commit-c")
- channel.tx_commit()
-
- #check results
- for i in range(1, 5):
- msg = queue_c.get(timeout=1)
- self.assertEqual("TxMessage %d" % i, msg.body)
- msg.ok()
-
- msg = queue_b.get(timeout=1)
- self.assertEqual("TxMessage 6", msg.body)
- msg.ok()
-
- msg = queue_a.get(timeout=1)
- self.assertEqual("TxMessage 7", msg.body)
- msg.ok()
-
- for q in [queue_a, queue_b, queue_c]:
- try:
- extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.body)
- except Empty: None
-
- #cleanup
- channel.tx_commit()
-
- def test_auto_rollback(self):
- """
- Test that a channel closed with an open transaction is effectively rolled back
- """
- channel = self.channel
- queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
-
- for q in [queue_a, queue_b, queue_c]:
- try:
- extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.body)
- except Empty: None
-
- channel.tx_rollback()
-
- #check results
- for i in range(1, 5):
- msg = queue_a.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
- msg.ok()
-
- msg = queue_b.get(timeout=1)
- self.assertEqual("Message 6", msg.body)
- msg.ok()
-
- msg = queue_c.get(timeout=1)
- self.assertEqual("Message 7", msg.body)
- msg.ok()
-
- for q in [queue_a, queue_b, queue_c]:
- try:
- extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.body)
- except Empty: None
-
- #cleanup
- channel.tx_commit()
-
- def test_rollback(self):
- """
- Test that rolled back publishes are not delivered and rolled back acks are re-delivered
- """
- channel = self.channel
- queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c")
-
- for q in [queue_a, queue_b, queue_c]:
- try:
- extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.body)
- except Empty: None
-
- channel.tx_rollback()
-
- #check results
- for i in range(1, 5):
- msg = queue_a.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
- msg.ok()
-
- msg = queue_b.get(timeout=1)
- self.assertEqual("Message 6", msg.body)
- msg.ok()
-
- msg = queue_c.get(timeout=1)
- self.assertEqual("Message 7", msg.body)
- msg.ok()
-
- for q in [queue_a, queue_b, queue_c]:
- try:
- extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.body)
- except Empty: None
-
- #cleanup
- channel.tx_commit()
-
- def perform_txn_work(self, channel, name_a, name_b, name_c):
- """
- Utility method that does some setup and some work under a transaction. Used for testing both
- commit and rollback
- """
- #setup:
- channel.queue_declare(queue=name_a, exclusive=True)
- channel.queue_declare(queue=name_b, exclusive=True)
- channel.queue_declare(queue=name_c, exclusive=True)
-
- key = "my_key_" + name_b
- topic = "my_topic_" + name_c
-
- channel.queue_bind(queue=name_b, exchange="amq.direct", routing_key=key)
- channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic)
-
- for i in range(1, 5):
- channel.message_transfer(routing_key=name_a, body="Message %d" % i)
-
- channel.message_transfer(routing_key=key, destination="amq.direct", body="Message 6")
- channel.message_transfer(routing_key=topic, destination="amq.topic", body="Message 7")
-
- channel.tx_select()
-
- #consume and ack messages
- channel.message_consume(queue=name_a, destination="sub_a", no_ack=False)
- queue_a = self.client.queue("sub_a")
- for i in range(1, 5):
- msg = queue_a.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
-
- msg.ok(batchoffset=-3)
-
- channel.message_consume(queue=name_b, destination="sub_b", no_ack=False)
- queue_b = self.client.queue("sub_b")
- msg = queue_b.get(timeout=1)
- self.assertEqual("Message 6", msg.body)
- msg.ok()
-
- sub_c = channel.message_consume(queue=name_c, destination="sub_c", no_ack=False)
- queue_c = self.client.queue("sub_c")
- msg = queue_c.get(timeout=1)
- self.assertEqual("Message 7", msg.body)
- msg.ok()
-
- #publish messages
- for i in range(1, 5):
- channel.message_transfer(routing_key=topic, destination="amq.topic", body="TxMessage %d" % i)
-
- channel.message_transfer(routing_key=key, destination="amq.direct", body="TxMessage 6")
- channel.message_transfer(routing_key=name_a, body="TxMessage 7")
-
- return queue_a, queue_b, queue_c