diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2009-08-11 15:40:19 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2009-08-11 15:40:19 +0000 |
| commit | e419dd6f187b5d135701f17f4b4382ece95068e5 (patch) | |
| tree | 7c895aa61cbb9b926f3a0c6cecbc17e4e1989944 /qpid/python/tests_0-9 | |
| parent | f92d8f621534d0ae704101f8a38110fedbff39af (diff) | |
| download | qpid-python-e419dd6f187b5d135701f17f4b4382ece95068e5.tar.gz | |
- removed old and redundent tests
- removed old test harness in favor of qpid-python-test
- modified qpid-python-test to support "skipped" tests, these are
tests that failed due to an anticipated environmental reason such
as the broker is not running or it is the wrong version
- modified the qpid-python-test harness to exit with appropriate
error codes based on the test results
- modified the python clients to report version mismatches rather
than framing errors
- made qpid_config provide variables for 0-8, 0-9, and 0-10 versions
of the spec
- modified the 0-10 client to directly codegen classes
- added new 0-10 framing layer based on push parsing rather than pull
parsing
- added numerous framing tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@803168 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python/tests_0-9')
| -rw-r--r-- | qpid/python/tests_0-9/__init__.py | 2 | ||||
| -rw-r--r-- | qpid/python/tests_0-9/basic.py | 396 | ||||
| -rw-r--r-- | qpid/python/tests_0-9/broker.py | 133 | ||||
| -rw-r--r-- | qpid/python/tests_0-9/dtx.py | 587 | ||||
| -rw-r--r-- | qpid/python/tests_0-9/example.py | 94 | ||||
| -rw-r--r-- | qpid/python/tests_0-9/exchange.py | 327 | ||||
| -rw-r--r-- | qpid/python/tests_0-9/execution.py | 29 | ||||
| -rw-r--r-- | qpid/python/tests_0-9/message.py | 657 | ||||
| -rw-r--r-- | qpid/python/tests_0-9/query.py | 2 | ||||
| -rw-r--r-- | qpid/python/tests_0-9/queue.py | 261 | ||||
| -rw-r--r-- | qpid/python/tests_0-9/testlib.py | 66 | ||||
| -rw-r--r-- | qpid/python/tests_0-9/tx.py | 188 |
12 files changed, 19 insertions, 2723 deletions
diff --git a/qpid/python/tests_0-9/__init__.py b/qpid/python/tests_0-9/__init__.py index 9a09d2d04f..d9f2ed7dbb 100644 --- a/qpid/python/tests_0-9/__init__.py +++ b/qpid/python/tests_0-9/__init__.py @@ -18,3 +18,5 @@ # specific language governing permissions and limitations # under the License. # + +import query, queue diff --git a/qpid/python/tests_0-9/basic.py b/qpid/python/tests_0-9/basic.py deleted file mode 100644 index 607ba26343..0000000000 --- a/qpid/python/tests_0-9/basic.py +++ /dev/null @@ -1,396 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase - -class BasicTests(TestBase): - """Tests for 'methods' on the amqp basic 'class'""" - - def test_consume_no_local(self): - """ - Test that the no_local flag is honoured in the consume method - """ - channel = self.channel - #setup, declare two queues: - channel.queue_declare(queue="test-queue-1a", exclusive=True) - channel.queue_declare(queue="test-queue-1b", exclusive=True) - #establish two consumers one of which excludes delivery of locally sent messages - channel.basic_consume(consumer_tag="local_included", queue="test-queue-1a") - channel.basic_consume(consumer_tag="local_excluded", queue="test-queue-1b", no_local=True) - - #send a message - channel.basic_publish(routing_key="test-queue-1a", content=Content("consume_no_local")) - channel.basic_publish(routing_key="test-queue-1b", content=Content("consume_no_local")) - - #check the queues of the two consumers - excluded = self.client.queue("local_excluded") - included = self.client.queue("local_included") - msg = included.get(timeout=1) - self.assertEqual("consume_no_local", msg.content.body) - try: - excluded.get(timeout=1) - self.fail("Received locally published message though no_local=true") - except Empty: None - - - def test_consume_exclusive(self): - """ - Test that the exclusive flag is honoured in the consume method - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-2", exclusive=True) - - #check that an exclusive consumer prevents other consumer being created: - channel.basic_consume(consumer_tag="first", queue="test-queue-2", exclusive=True) - try: - channel.basic_consume(consumer_tag="second", queue="test-queue-2") - self.fail("Expected consume request to fail due to previous exclusive consumer") - except Closed, e: - self.assertChannelException(403, e.args[0]) - - #open new channel and cleanup last consumer: - channel = self.client.channel(2) - channel.channel_open() - - #check that an exclusive consumer cannot be created if a consumer already exists: - channel.basic_consume(consumer_tag="first", queue="test-queue-2") - try: - channel.basic_consume(consumer_tag="second", queue="test-queue-2", exclusive=True) - self.fail("Expected exclusive consume request to fail due to previous consumer") - except Closed, e: - self.assertChannelException(403, e.args[0]) - - def test_consume_queue_errors(self): - """ - Test error conditions associated with the queue field of the consume method: - """ - channel = self.channel - try: - #queue specified but doesn't exist: - channel.basic_consume(queue="invalid-queue") - self.fail("Expected failure when consuming from non-existent queue") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - channel = self.client.channel(2) - channel.channel_open() - try: - #queue not specified and none previously declared for channel: - channel.basic_consume(queue="") - self.fail("Expected failure when consuming from unspecified queue") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - - def test_consume_unique_consumers(self): - """ - Ensure unique consumer tags are enforced - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-3", exclusive=True) - - #check that attempts to use duplicate tags are detected and prevented: - channel.basic_consume(consumer_tag="first", queue="test-queue-3") - try: - channel.basic_consume(consumer_tag="first", queue="test-queue-3") - self.fail("Expected consume request to fail due to non-unique tag") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - - def test_cancel(self): - """ - Test compliance of the basic.cancel method - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-4", exclusive=True) - channel.basic_consume(consumer_tag="my-consumer", queue="test-queue-4") - channel.basic_publish(routing_key="test-queue-4", content=Content("One")) - - myqueue = self.client.queue("my-consumer") - msg = myqueue.get(timeout=1) - self.assertEqual("One", msg.content.body) - - #cancel should stop messages being delivered - channel.basic_cancel(consumer_tag="my-consumer") - channel.basic_publish(routing_key="test-queue-4", content=Content("Two")) - try: - msg = myqueue.get(timeout=1) - self.fail("Got message after cancellation: " + msg) - except Empty: None - - #cancellation of non-existant consumers should be handled without error - channel.basic_cancel(consumer_tag="my-consumer") - channel.basic_cancel(consumer_tag="this-never-existed") - - - def test_ack(self): - """ - Test basic ack/recover behaviour - """ - channel = self.channel - channel.queue_declare(queue="test-ack-queue", exclusive=True) - - reply = channel.basic_consume(queue="test-ack-queue", no_ack=False) - queue = self.client.queue(reply.consumer_tag) - - channel.basic_publish(routing_key="test-ack-queue", content=Content("One")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Two")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Three")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Four")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Five")) - - msg1 = queue.get(timeout=1) - msg2 = queue.get(timeout=1) - msg3 = queue.get(timeout=1) - msg4 = queue.get(timeout=1) - msg5 = queue.get(timeout=1) - - self.assertEqual("One", msg1.content.body) - self.assertEqual("Two", msg2.content.body) - self.assertEqual("Three", msg3.content.body) - self.assertEqual("Four", msg4.content.body) - self.assertEqual("Five", msg5.content.body) - - channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two - channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four - - channel.basic_recover(requeue=False) - - msg3b = queue.get(timeout=1) - msg5b = queue.get(timeout=1) - - self.assertEqual("Three", msg3b.content.body) - self.assertEqual("Five", msg5b.content.body) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) - except Empty: None - - def test_recover_requeue(self): - """ - Test requeing on recovery - """ - channel = self.channel - channel.queue_declare(queue="test-requeue", exclusive=True) - - subscription = channel.basic_consume(queue="test-requeue", no_ack=False) - queue = self.client.queue(subscription.consumer_tag) - - channel.basic_publish(routing_key="test-requeue", content=Content("One")) - channel.basic_publish(routing_key="test-requeue", content=Content("Two")) - channel.basic_publish(routing_key="test-requeue", content=Content("Three")) - channel.basic_publish(routing_key="test-requeue", content=Content("Four")) - channel.basic_publish(routing_key="test-requeue", content=Content("Five")) - - msg1 = queue.get(timeout=1) - msg2 = queue.get(timeout=1) - msg3 = queue.get(timeout=1) - msg4 = queue.get(timeout=1) - msg5 = queue.get(timeout=1) - - self.assertEqual("One", msg1.content.body) - self.assertEqual("Two", msg2.content.body) - self.assertEqual("Three", msg3.content.body) - self.assertEqual("Four", msg4.content.body) - self.assertEqual("Five", msg5.content.body) - - channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two - channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four - - channel.basic_cancel(consumer_tag=subscription.consumer_tag) - - channel.basic_recover(requeue=True) - - subscription2 = channel.basic_consume(queue="test-requeue") - queue2 = self.client.queue(subscription2.consumer_tag) - - msg3b = queue2.get(timeout=1) - msg5b = queue2.get(timeout=1) - - self.assertEqual("Three", msg3b.content.body) - self.assertEqual("Five", msg5b.content.body) - - self.assertEqual(True, msg3b.redelivered) - self.assertEqual(True, msg5b.redelivered) - - try: - extra = queue2.get(timeout=1) - self.fail("Got unexpected message in second queue: " + extra.content.body) - except Empty: None - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message in original queue: " + extra.content.body) - except Empty: None - - - def test_qos_prefetch_count(self): - """ - Test that the prefetch count specified is honoured - """ - #setup: declare queue and subscribe - channel = self.channel - channel.queue_declare(queue="test-prefetch-count", exclusive=True) - subscription = channel.basic_consume(queue="test-prefetch-count", no_ack=False) - queue = self.client.queue(subscription.consumer_tag) - - #set prefetch to 5: - channel.basic_qos(prefetch_count=5) - - #publish 10 messages: - for i in range(1, 11): - channel.basic_publish(routing_key="test-prefetch-count", content=Content("Message %d" % i)) - - #only 5 messages should have been delivered: - for i in range(1, 6): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 6th message in original queue: " + extra.content.body) - except Empty: None - - #ack messages and check that the next set arrive ok: - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - for i in range(6, 11): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 11th message in original queue: " + extra.content.body) - except Empty: None - - - - def test_qos_prefetch_size(self): - """ - Test that the prefetch size specified is honoured - """ - #setup: declare queue and subscribe - channel = self.channel - channel.queue_declare(queue="test-prefetch-size", exclusive=True) - subscription = channel.basic_consume(queue="test-prefetch-size", no_ack=False) - queue = self.client.queue(subscription.consumer_tag) - - #set prefetch to 50 bytes (each message is 9 or 10 bytes): - channel.basic_qos(prefetch_size=50) - - #publish 10 messages: - for i in range(1, 11): - channel.basic_publish(routing_key="test-prefetch-size", content=Content("Message %d" % i)) - - #only 5 messages should have been delivered (i.e. 45 bytes worth): - for i in range(1, 6): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 6th message in original queue: " + extra.content.body) - except Empty: None - - #ack messages and check that the next set arrive ok: - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - for i in range(6, 11): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 11th message in original queue: " + extra.content.body) - except Empty: None - - #make sure that a single oversized message still gets delivered - large = "abcdefghijklmnopqrstuvwxyz" - large = large + "-" + large; - channel.basic_publish(routing_key="test-prefetch-size", content=Content(large)) - msg = queue.get(timeout=1) - self.assertEqual(large, msg.content.body) - - def test_get(self): - """ - Test basic_get method - """ - channel = self.channel - channel.queue_declare(queue="test-get", exclusive=True) - - #publish some messages (no_ack=True) - for i in range(1, 11): - channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) - - #use basic_get to read back the messages, and check that we get an empty at the end - for i in range(1, 11): - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_ok") - self.assertEqual("Message %d" % i, reply.content.body) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_empty") - - #repeat for no_ack=False - for i in range(11, 21): - channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) - - for i in range(11, 21): - reply = channel.basic_get(no_ack=False) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_ok") - self.assertEqual("Message %d" % i, reply.content.body) - if(i == 13): - channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True) - if(i in [15, 17, 19]): - channel.basic_ack(delivery_tag=reply.delivery_tag) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_empty") - - #recover(requeue=True) - channel.basic_recover(requeue=True) - - #get the unacked messages again (14, 16, 18, 20) - for i in [14, 16, 18, 20]: - reply = channel.basic_get(no_ack=False) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_ok") - self.assertEqual("Message %d" % i, reply.content.body) - channel.basic_ack(delivery_tag=reply.delivery_tag) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_empty") - - channel.basic_recover(requeue=True) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_empty") diff --git a/qpid/python/tests_0-9/broker.py b/qpid/python/tests_0-9/broker.py deleted file mode 100644 index 03b4132d3e..0000000000 --- a/qpid/python/tests_0-9/broker.py +++ /dev/null @@ -1,133 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase - -class BrokerTests(TestBase): - """Tests for basic Broker functionality""" - - def test_ack_and_no_ack(self): - """ - First, this test tries to receive a message with a no-ack - consumer. Second, this test tries to explicitly receive and - acknowledge a message with an acknowledging consumer. - """ - ch = self.channel - self.queue_declare(ch, queue = "myqueue") - - # No ack consumer - ctag = "tag1" - ch.message_consume(queue = "myqueue", destination = ctag, no_ack = True) - body = "test no-ack" - ch.message_transfer(routing_key = "myqueue", body = body) - msg = self.client.queue(ctag).get(timeout = 5) - self.assert_(msg.body == body) - - # Acknowledging consumer - self.queue_declare(ch, queue = "otherqueue") - ctag = "tag2" - ch.message_consume(queue = "otherqueue", destination = ctag, no_ack = False) - body = "test ack" - ch.message_transfer(routing_key = "otherqueue", body = body) - msg = self.client.queue(ctag).get(timeout = 5) - msg.ok() - self.assert_(msg.body == body) - - def test_simple_delivery_immediate(self): - """ - Test simple message delivery where consume is issued before publish - """ - channel = self.channel - self.exchange_declare(channel, exchange="test-exchange", type="direct") - self.queue_declare(channel, queue="test-queue") - channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") - consumer_tag = "tag1" - channel.message_consume(queue="test-queue", destination=consumer_tag, no_ack=True) - queue = self.client.queue(consumer_tag) - - body = "Immediate Delivery" - channel.message_transfer(destination="test-exchange", routing_key="key", body=body, immediate=True) - msg = queue.get(timeout=5) - self.assert_(msg.body == body) - - # TODO: Ensure we fail if immediate=True and there's no consumer. - - - def test_simple_delivery_queued(self): - """ - Test basic message delivery where publish is issued before consume - (i.e. requires queueing of the message) - """ - channel = self.channel - self.exchange_declare(channel, exchange="test-exchange", type="direct") - self.queue_declare(channel, queue="test-queue") - channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") - body = "Queued Delivery" - channel.message_transfer(destination="test-exchange", routing_key="key", body=body) - - consumer_tag = "tag1" - channel.message_consume(queue="test-queue", destination=consumer_tag, no_ack=True) - queue = self.client.queue(consumer_tag) - msg = queue.get(timeout=5) - self.assert_(msg.body == body) - - def test_invalid_channel(self): - channel = self.client.channel(200) - try: - channel.queue_declare(exclusive=True) - self.fail("Expected error on queue_declare for invalid channel") - except Closed, e: - self.assertConnectionException(504, e.args[0]) - - def test_closed_channel(self): - channel = self.client.channel(200) - channel.channel_open() - channel.channel_close() - try: - channel.queue_declare(exclusive=True) - self.fail("Expected error on queue_declare for closed channel") - except Closed, e: - if isinstance(e.args[0], str): self.fail(e) - self.assertConnectionException(504, e.args[0]) - - def test_ping_pong(self): - channel = self.channel - reply = channel.channel_ping() - self.assertEqual(reply.method.klass.name, "channel") - self.assertEqual(reply.method.name, "ok") - #todo: provide a way to get notified of incoming pongs... - - def test_channel_flow(self): - channel = self.channel - channel.queue_declare(queue="flow_test_queue", exclusive=True) - channel.message_consume(destination="my-tag", queue="flow_test_queue") - incoming = self.client.queue("my-tag") - - channel.channel_flow(active=False) - channel.message_transfer(routing_key="flow_test_queue", body="abcdefghijklmnopqrstuvwxyz") - try: - incoming.get(timeout=1) - self.fail("Received message when flow turned off.") - except Empty: None - - channel.channel_flow(active=True) - msg = incoming.get(timeout=1) - self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.body) diff --git a/qpid/python/tests_0-9/dtx.py b/qpid/python/tests_0-9/dtx.py deleted file mode 100644 index bc268f4129..0000000000 --- a/qpid/python/tests_0-9/dtx.py +++ /dev/null @@ -1,587 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase -from struct import pack, unpack -from time import sleep - -class DtxTests(TestBase): - """ - Tests for the amqp dtx related classes. - - Tests of the form test_simple_xxx test the basic transactional - behaviour. The approach here is to 'swap' a message from one queue - to another by consuming and re-publishing in the same - transaction. That transaction is then completed in different ways - and the appropriate result verified. - - The other tests enforce more specific rules and behaviour on a - per-method or per-field basis. - """ - - XA_RBROLLBACK = 1 - XA_RBTIMEOUT = 2 - XA_OK = 8 - - def test_simple_commit(self): - """ - Test basic one-phase commit behaviour. - """ - channel = self.channel - tx = self.xid("my-xid") - self.txswap(tx, "commit") - - #neither queue should have any messages accessible - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(0, "queue-b") - - #commit - self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).flags) - - #check result - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(1, "queue-b") - self.assertMessageId("commit", "queue-b") - - def test_simple_prepare_commit(self): - """ - Test basic two-phase commit behaviour. - """ - channel = self.channel - tx = self.xid("my-xid") - self.txswap(tx, "prepare-commit") - - #prepare - self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).flags) - - #neither queue should have any messages accessible - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(0, "queue-b") - - #commit - self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).flags) - - #check result - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(1, "queue-b") - self.assertMessageId("prepare-commit", "queue-b") - - - def test_simple_rollback(self): - """ - Test basic rollback behaviour. - """ - channel = self.channel - tx = self.xid("my-xid") - self.txswap(tx, "rollback") - - #neither queue should have any messages accessible - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(0, "queue-b") - - #rollback - self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags) - - #check result - self.assertMessageCount(1, "queue-a") - self.assertMessageCount(0, "queue-b") - self.assertMessageId("rollback", "queue-a") - - def test_simple_prepare_rollback(self): - """ - Test basic rollback behaviour after the transaction has been prepared. - """ - channel = self.channel - tx = self.xid("my-xid") - self.txswap(tx, "prepare-rollback") - - #prepare - self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).flags) - - #neither queue should have any messages accessible - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(0, "queue-b") - - #rollback - self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags) - - #check result - self.assertMessageCount(1, "queue-a") - self.assertMessageCount(0, "queue-b") - self.assertMessageId("prepare-rollback", "queue-a") - - def test_select_required(self): - """ - check that an error is flagged if select is not issued before - start or end - """ - channel = self.channel - tx = self.xid("dummy") - try: - channel.dtx_demarcation_start(xid=tx) - - #if we get here we have failed, but need to do some cleanup: - channel.dtx_demarcation_end(xid=tx) - channel.dtx_coordination_rollback(xid=tx) - self.fail("Channel not selected for use with dtx, expected exception!") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_start_already_known(self): - """ - Verify that an attempt to start an association with a - transaction that is already known is not allowed (unless the - join flag is set). - """ - #create two channels on different connection & select them for use with dtx: - channel1 = self.channel - channel1.dtx_demarcation_select() - - other = self.connect() - channel2 = other.channel(1) - channel2.channel_open() - channel2.dtx_demarcation_select() - - #create a xid - tx = self.xid("dummy") - #start work on one channel under that xid: - channel1.dtx_demarcation_start(xid=tx) - #then start on the other without the join set - failed = False - try: - channel2.dtx_demarcation_start(xid=tx) - except Closed, e: - failed = True - error = e - - #cleanup: - if not failed: - channel2.dtx_demarcation_end(xid=tx) - other.close() - channel1.dtx_demarcation_end(xid=tx) - channel1.dtx_coordination_rollback(xid=tx) - - #verification: - if failed: self.assertConnectionException(503, e.args[0]) - else: self.fail("Xid already known, expected exception!") - - def test_forget_xid_on_completion(self): - """ - Verify that a xid is 'forgotten' - and can therefore be used - again - once it is completed. - """ - channel = self.channel - #do some transactional work & complete the transaction - self.test_simple_commit() - - #start association for the same xid as the previously completed txn - tx = self.xid("my-xid") - channel.dtx_demarcation_start(xid=tx) - channel.dtx_demarcation_end(xid=tx) - channel.dtx_coordination_rollback(xid=tx) - - def test_start_join_and_resume(self): - """ - Ensure the correct error is signalled when both the join and - resume flags are set on starting an association between a - channel and a transcation. - """ - channel = self.channel - channel.dtx_demarcation_select() - tx = self.xid("dummy") - try: - channel.dtx_demarcation_start(xid=tx, join=True, resume=True) - #failed, but need some cleanup: - channel.dtx_demarcation_end(xid=tx) - channel.dtx_coordination_rollback(xid=tx) - self.fail("Join and resume both set, expected exception!") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_start_join(self): - """ - Verify 'join' behaviour, where a channel is associated with a - transaction that is already associated with another channel. - """ - #create two channels & select them for use with dtx: - channel1 = self.channel - channel1.dtx_demarcation_select() - - channel2 = self.client.channel(2) - channel2.channel_open() - channel2.dtx_demarcation_select() - - #setup - channel1.queue_declare(queue="one", exclusive=True) - channel1.queue_declare(queue="two", exclusive=True) - channel1.message_transfer(routing_key="one", message_id="a", body="DtxMessage") - channel1.message_transfer(routing_key="two", message_id="b", body="DtxMessage") - - #create a xid - tx = self.xid("dummy") - #start work on one channel under that xid: - channel1.dtx_demarcation_start(xid=tx) - #then start on the other with the join flag set - channel2.dtx_demarcation_start(xid=tx, join=True) - - #do work through each channel - self.swap(channel1, "one", "two")#swap 'a' from 'one' to 'two' - self.swap(channel2, "two", "one")#swap 'b' from 'two' to 'one' - - #mark end on both channels - channel1.dtx_demarcation_end(xid=tx) - channel2.dtx_demarcation_end(xid=tx) - - #commit and check - channel1.dtx_coordination_commit(xid=tx, one_phase=True) - self.assertMessageCount(1, "one") - self.assertMessageCount(1, "two") - self.assertMessageId("a", "two") - self.assertMessageId("b", "one") - - - def test_suspend_resume(self): - """ - Test suspension and resumption of an association - """ - channel = self.channel - channel.dtx_demarcation_select() - - #setup - channel.queue_declare(queue="one", exclusive=True) - channel.queue_declare(queue="two", exclusive=True) - channel.message_transfer(routing_key="one", message_id="a", body="DtxMessage") - channel.message_transfer(routing_key="two", message_id="b", body="DtxMessage") - - tx = self.xid("dummy") - - channel.dtx_demarcation_start(xid=tx) - self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two' - channel.dtx_demarcation_end(xid=tx, suspend=True) - - channel.dtx_demarcation_start(xid=tx, resume=True) - self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one' - channel.dtx_demarcation_end(xid=tx) - - #commit and check - channel.dtx_coordination_commit(xid=tx, one_phase=True) - self.assertMessageCount(1, "one") - self.assertMessageCount(1, "two") - self.assertMessageId("a", "two") - self.assertMessageId("b", "one") - - def test_end_suspend_and_fail(self): - """ - Verify that the correct error is signalled if the suspend and - fail flag are both set when disassociating a transaction from - the channel - """ - channel = self.channel - channel.dtx_demarcation_select() - tx = self.xid("suspend_and_fail") - channel.dtx_demarcation_start(xid=tx) - try: - channel.dtx_demarcation_end(xid=tx, suspend=True, fail=True) - self.fail("Suspend and fail both set, expected exception!") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - #cleanup - other = self.connect() - channel = other.channel(1) - channel.channel_open() - channel.dtx_coordination_rollback(xid=tx) - channel.channel_close() - other.close() - - - def test_end_unknown_xid(self): - """ - Verifies that the correct exception is thrown when an attempt - is made to end the association for a xid not previously - associated with the channel - """ - channel = self.channel - channel.dtx_demarcation_select() - tx = self.xid("unknown-xid") - try: - channel.dtx_demarcation_end(xid=tx) - self.fail("Attempted to end association with unknown xid, expected exception!") - except Closed, e: - #FYI: this is currently *not* the exception specified, but I think the spec is wrong! Confirming... - self.assertConnectionException(503, e.args[0]) - - def test_end(self): - """ - Verify that the association is terminated by end and subsequent - operations are non-transactional - """ - channel = self.client.channel(2) - channel.channel_open() - channel.queue_declare(queue="tx-queue", exclusive=True) - - #publish a message under a transaction - channel.dtx_demarcation_select() - tx = self.xid("dummy") - channel.dtx_demarcation_start(xid=tx) - channel.message_transfer(routing_key="tx-queue", message_id="one", body="DtxMessage") - channel.dtx_demarcation_end(xid=tx) - - #now that association with txn is ended, publish another message - channel.message_transfer(routing_key="tx-queue", message_id="two", body="DtxMessage") - - #check the second message is available, but not the first - self.assertMessageCount(1, "tx-queue") - channel.message_consume(queue="tx-queue", destination="results", no_ack=False) - msg = self.client.queue("results").get(timeout=1) - self.assertEqual("two", msg.message_id) - channel.message_cancel(destination="results") - #ack the message then close the channel - msg.ok() - channel.channel_close() - - channel = self.channel - #commit the transaction and check that the first message (and - #only the first message) is then delivered - channel.dtx_coordination_commit(xid=tx, one_phase=True) - self.assertMessageCount(1, "tx-queue") - self.assertMessageId("one", "tx-queue") - - def test_invalid_commit_one_phase_true(self): - """ - Test that a commit with one_phase = True is rejected if the - transaction in question has already been prepared. - """ - other = self.connect() - tester = other.channel(1) - tester.channel_open() - tester.queue_declare(queue="dummy", exclusive=True) - tester.dtx_demarcation_select() - tx = self.xid("dummy") - tester.dtx_demarcation_start(xid=tx) - tester.message_transfer(routing_key="dummy", body="whatever") - tester.dtx_demarcation_end(xid=tx) - tester.dtx_coordination_prepare(xid=tx) - failed = False - try: - tester.dtx_coordination_commit(xid=tx, one_phase=True) - except Closed, e: - failed = True - error = e - - if failed: - self.channel.dtx_coordination_rollback(xid=tx) - self.assertConnectionException(503, e.args[0]) - else: - tester.channel_close() - other.close() - self.fail("Invalid use of one_phase=True, expected exception!") - - def test_invalid_commit_one_phase_false(self): - """ - Test that a commit with one_phase = False is rejected if the - transaction in question has not yet been prepared. - """ - """ - Test that a commit with one_phase = True is rejected if the - transaction in question has already been prepared. - """ - other = self.connect() - tester = other.channel(1) - tester.channel_open() - tester.queue_declare(queue="dummy", exclusive=True) - tester.dtx_demarcation_select() - tx = self.xid("dummy") - tester.dtx_demarcation_start(xid=tx) - tester.message_transfer(routing_key="dummy", body="whatever") - tester.dtx_demarcation_end(xid=tx) - failed = False - try: - tester.dtx_coordination_commit(xid=tx, one_phase=False) - except Closed, e: - failed = True - error = e - - if failed: - self.channel.dtx_coordination_rollback(xid=tx) - self.assertConnectionException(503, e.args[0]) - else: - tester.channel_close() - other.close() - self.fail("Invalid use of one_phase=False, expected exception!") - - def test_implicit_end(self): - """ - Test that an association is implicitly ended when the channel - is closed (whether by exception or explicit client request) - and the transaction in question is marked as rollback only. - """ - channel1 = self.channel - channel2 = self.client.channel(2) - channel2.channel_open() - - #setup: - channel2.queue_declare(queue="dummy", exclusive=True) - channel2.message_transfer(routing_key="dummy", body="whatever") - tx = self.xid("dummy") - - channel2.dtx_demarcation_select() - channel2.dtx_demarcation_start(xid=tx) - channel2.message_get(queue="dummy", destination="dummy") - self.client.queue("dummy").get(timeout=1).ok() - channel2.message_transfer(routing_key="dummy", body="whatever") - channel2.channel_close() - - self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).flags) - channel1.dtx_coordination_rollback(xid=tx) - - def test_get_timeout(self): - """ - Check that get-timeout returns the correct value, (and that a - transaction with a timeout can complete normally) - """ - channel = self.channel - tx = self.xid("dummy") - - channel.dtx_demarcation_select() - channel.dtx_demarcation_start(xid=tx) - self.assertEqual(0, channel.dtx_coordination_get_timeout(xid=tx).timeout) - channel.dtx_coordination_set_timeout(xid=tx, timeout=60) - self.assertEqual(60, channel.dtx_coordination_get_timeout(xid=tx).timeout) - self.assertEqual(self.XA_OK, channel.dtx_demarcation_end(xid=tx).flags) - self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags) - - def test_set_timeout(self): - """ - Test the timeout of a transaction results in the expected - behaviour - """ - #open new channel to allow self.channel to be used in checking te queue - channel = self.client.channel(2) - channel.channel_open() - #setup: - tx = self.xid("dummy") - channel.queue_declare(queue="queue-a", exclusive=True) - channel.queue_declare(queue="queue-b", exclusive=True) - channel.message_transfer(routing_key="queue-a", message_id="timeout", body="DtxMessage") - - channel.dtx_demarcation_select() - channel.dtx_demarcation_start(xid=tx) - self.swap(channel, "queue-a", "queue-b") - channel.dtx_coordination_set_timeout(xid=tx, timeout=2) - sleep(3) - #check that the work has been rolled back already - self.assertMessageCount(1, "queue-a") - self.assertMessageCount(0, "queue-b") - self.assertMessageId("timeout", "queue-a") - #check the correct codes are returned when we try to complete the txn - self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_demarcation_end(xid=tx).flags) - self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_coordination_rollback(xid=tx).flags) - - - - def test_recover(self): - """ - Test basic recover behaviour - """ - channel = self.channel - - channel.dtx_demarcation_select() - channel.queue_declare(queue="dummy", exclusive=True) - - prepared = [] - for i in range(1, 10): - tx = self.xid("tx%s" % (i)) - channel.dtx_demarcation_start(xid=tx) - channel.message_transfer(routing_key="dummy", body="message%s" % (i)) - channel.dtx_demarcation_end(xid=tx) - if i in [2, 5, 6, 8]: - channel.dtx_coordination_prepare(xid=tx) - prepared.append(tx) - else: - channel.dtx_coordination_rollback(xid=tx) - - indoubt = channel.dtx_coordination_recover().xids - #convert indoubt table to a list of xids (note: this will change for 0-10) - data = indoubt["xids"] - xids = [] - pos = 0 - while pos < len(data): - size = unpack("!B", data[pos])[0] - start = pos + 1 - end = start + size - xid = data[start:end] - xids.append(xid) - pos = end - - #rollback the prepared transactions returned by recover - for x in xids: - channel.dtx_coordination_rollback(xid=x) - - #validate against the expected list of prepared transactions - actual = set(xids) - expected = set(prepared) - intersection = actual.intersection(expected) - - if intersection != expected: - missing = expected.difference(actual) - extra = actual.difference(expected) - for x in missing: - channel.dtx_coordination_rollback(xid=x) - self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra)) - - def xid(self, txid, branchqual = ''): - return pack('LBB', 0, len(txid), len(branchqual)) + txid + branchqual - - def txswap(self, tx, id): - channel = self.channel - #declare two queues: - channel.queue_declare(queue="queue-a", exclusive=True) - channel.queue_declare(queue="queue-b", exclusive=True) - #put message with specified id on one queue: - channel.message_transfer(routing_key="queue-a", message_id=id, body="DtxMessage") - - #start the transaction: - channel.dtx_demarcation_select() - self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).flags) - - #'swap' the message from one queue to the other, under that transaction: - self.swap(self.channel, "queue-a", "queue-b") - - #mark the end of the transactional work: - self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_end(xid=tx).flags) - - def swap(self, channel, src, dest): - #consume from src: - channel.message_get(destination="temp-swap", queue=src) - msg = self.client.queue("temp-swap").get(timeout=1) - msg.ok(); - - #re-publish to dest - channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body) - - def assertMessageCount(self, expected, queue): - self.assertEqual(expected, self.channel.queue_declare(queue=queue, passive=True).message_count) - - def assertMessageId(self, expected, queue): - self.channel.message_consume(queue=queue, destination="results", no_ack=True) - self.assertEqual(expected, self.client.queue("results").get(timeout=1).message_id) - self.channel.message_cancel(destination="results") diff --git a/qpid/python/tests_0-9/example.py b/qpid/python/tests_0-9/example.py deleted file mode 100644 index 7ab4cc7d0a..0000000000 --- a/qpid/python/tests_0-9/example.py +++ /dev/null @@ -1,94 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from qpid.content import Content -from qpid.testlib import testrunner, TestBase - -class ExampleTest (TestBase): - """ - An example Qpid test, illustrating the unittest frameowkr and the - python Qpid client. The test class must inherit TestCase. The - test code uses the Qpid client to interact with a qpid broker and - verify it behaves as expected. - """ - - def test_example(self): - """ - An example test. Note that test functions must start with 'test_' - to be recognized by the test framework. - """ - - # By inheriting TestBase, self.client is automatically connected - # and self.channel is automatically opened as channel(1) - # Other channel methods mimic the protocol. - channel = self.channel - - # Now we can send regular commands. If you want to see what the method - # arguments mean or what other commands are available, you can use the - # python builtin help() method. For example: - #help(chan) - #help(chan.exchange_declare) - - # If you want browse the available protocol methods without being - # connected to a live server you can use the amqp-doc utility: - # - # Usage amqp-doc [<options>] <spec> [<pattern_1> ... <pattern_n>] - # - # Options: - # -e, --regexp use regex instead of glob when matching - - # Now that we know what commands are available we can use them to - # interact with the server. - - # Here we use ordinal arguments. - self.exchange_declare(channel, 0, "test", "direct") - - # Here we use keyword arguments. - self.queue_declare(channel, queue="test-queue") - channel.queue_bind(queue="test-queue", exchange="test", routing_key="key") - - # Call Channel.basic_consume to register as a consumer. - # All the protocol methods return a message object. The message object - # has fields corresponding to the reply method fields, plus a content - # field that is filled if the reply includes content. In this case the - # interesting field is the consumer_tag. - channel.message_consume(queue="test-queue", destination="consumer_tag") - - # We can use the Client.queue(...) method to access the queue - # corresponding to our consumer_tag. - queue = self.client.queue("consumer_tag") - - # Now lets publish a message and see if our consumer gets it. To do - # this we need to import the Content class. - body = "Hello World!" - channel.message_transfer(destination="test", - routing_key="key", - body = body) - - # Now we'll wait for the message to arrive. We can use the timeout - # argument in case the server hangs. By default queue.get() will wait - # until a message arrives or the connection to the server dies. - msg = queue.get(timeout=10) - - # And check that we got the right response with assertEqual - self.assertEqual(body, msg.body) - - # Now acknowledge the message. - msg.ok() - diff --git a/qpid/python/tests_0-9/exchange.py b/qpid/python/tests_0-9/exchange.py deleted file mode 100644 index 3a47ffff8c..0000000000 --- a/qpid/python/tests_0-9/exchange.py +++ /dev/null @@ -1,327 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -Tests for exchange behaviour. - -Test classes ending in 'RuleTests' are derived from rules in amqp.xml. -""" - -import Queue, logging -from qpid.testlib import TestBase -from qpid.content import Content -from qpid.client import Closed - - -class StandardExchangeVerifier: - """Verifies standard exchange behavior. - - Used as base class for classes that test standard exchanges.""" - - def verifyDirectExchange(self, ex): - """Verify that ex behaves like a direct exchange.""" - self.queue_declare(queue="q") - self.channel.queue_bind(queue="q", exchange=ex, routing_key="k") - self.assertPublishConsume(exchange=ex, queue="q", routing_key="k") - try: - self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk") - self.fail("Expected Empty exception") - except Queue.Empty: None # Expected - - def verifyFanOutExchange(self, ex): - """Verify that ex behaves like a fanout exchange.""" - self.queue_declare(queue="q") - self.channel.queue_bind(queue="q", exchange=ex) - self.queue_declare(queue="p") - self.channel.queue_bind(queue="p", exchange=ex) - for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex) - - def verifyTopicExchange(self, ex): - """Verify that ex behaves like a topic exchange""" - self.queue_declare(queue="a") - self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*") - q = self.consume("a") - self.assertPublishGet(q, ex, "a.b.x") - self.assertPublishGet(q, ex, "a.x.b.x") - self.assertPublishGet(q, ex, "a.x.x.b.x") - # Shouldn't match - self.channel.message_transfer(destination=ex, routing_key="a.b", body="") - self.channel.message_transfer(destination=ex, routing_key="a.b.x.y", body="") - self.channel.message_transfer(destination=ex, routing_key="x.a.b.x", body="") - self.channel.message_transfer(destination=ex, routing_key="a.b", body="") - self.assert_(q.empty()) - - def verifyHeadersExchange(self, ex): - """Verify that ex is a headers exchange""" - self.queue_declare(queue="q") - self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} ) - q = self.consume("q") - headers = {"name":"fred", "age":3} - self.assertPublishGet(q, exchange=ex, properties=headers) - self.channel.message_transfer(destination=ex, body="") # No headers, won't deliver - self.assertEmpty(q); - - -class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier): - """ - The server SHOULD implement these standard exchange types: topic, headers. - - Client attempts to declare an exchange with each of these standard types. - """ - - def testDirect(self): - """Declare and test a direct exchange""" - self.exchange_declare(0, exchange="d", type="direct") - self.verifyDirectExchange("d") - - def testFanout(self): - """Declare and test a fanout exchange""" - self.exchange_declare(0, exchange="f", type="fanout") - self.verifyFanOutExchange("f") - - def testTopic(self): - """Declare and test a topic exchange""" - self.exchange_declare(0, exchange="t", type="topic") - self.verifyTopicExchange("t") - - def testHeaders(self): - """Declare and test a headers exchange""" - self.exchange_declare(0, exchange="h", type="headers") - self.verifyHeadersExchange("h") - - -class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier): - """ - The server MUST, in each virtual host, pre-declare an exchange instance - for each standard exchange type that it implements, where the name of the - exchange instance is amq. followed by the exchange type name. - - Client creates a temporary queue and attempts to bind to each required - exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if - those types are defined). - """ - def testAmqDirect(self): self.verifyDirectExchange("amq.direct") - - def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout") - - def testAmqTopic(self): self.verifyTopicExchange("amq.topic") - - def testAmqMatch(self): self.verifyHeadersExchange("amq.match") - -class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier): - """ - The server MUST predeclare a direct exchange to act as the default exchange - for content Publish methods and for default queue bindings. - - Client checks that the default exchange is active by specifying a queue - binding with no exchange name, and publishing a message with a suitable - routing key but without specifying the exchange name, then ensuring that - the message arrives in the queue correctly. - """ - def testDefaultExchange(self): - # Test automatic binding by queue name. - self.queue_declare(queue="d") - self.assertPublishConsume(queue="d", routing_key="d") - # Test explicit bind to default queue - self.verifyDirectExchange("") - - -# TODO aconway 2006-09-27: Fill in empty tests: - -class DefaultAccessRuleTests(TestBase): - """ - The server MUST NOT allow clients to access the default exchange except - by specifying an empty exchange name in the Queue.Bind and content Publish - methods. - """ - -class ExtensionsRuleTests(TestBase): - """ - The server MAY implement other exchange types as wanted. - """ - - -class DeclareMethodMinimumRuleTests(TestBase): - """ - The server SHOULD support a minimum of 16 exchanges per virtual host and - ideally, impose no limit except as defined by available resources. - - The client creates as many exchanges as it can until the server reports - an error; the number of exchanges successfuly created must be at least - sixteen. - """ - - -class DeclareMethodTicketFieldValidityRuleTests(TestBase): - """ - The client MUST provide a valid access ticket giving "active" access to - the realm in which the exchange exists or will be created, or "passive" - access if the if-exists flag is set. - - Client creates access ticket with wrong access rights and attempts to use - in this method. - """ - - -class DeclareMethodExchangeFieldReservedRuleTests(TestBase): - """ - Exchange names starting with "amq." are reserved for predeclared and - standardised exchanges. The client MUST NOT attempt to create an exchange - starting with "amq.". - - - """ - - -class DeclareMethodTypeFieldTypedRuleTests(TestBase): - """ - Exchanges cannot be redeclared with different types. The client MUST not - attempt to redeclare an existing exchange with a different type than used - in the original Exchange.Declare method. - - - """ - - -class DeclareMethodTypeFieldSupportRuleTests(TestBase): - """ - The client MUST NOT attempt to create an exchange with a type that the - server does not support. - - - """ - - -class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase): - """ - If set, and the exchange does not already exist, the server MUST raise a - channel exception with reply code 404 (not found). - """ - def test(self): - try: - self.channel.exchange_declare(exchange="humpty_dumpty", passive=True) - self.fail("Expected 404 for passive declaration of unknown exchange.") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - -class DeclareMethodDurableFieldSupportRuleTests(TestBase): - """ - The server MUST support both durable and transient exchanges. - - - """ - - -class DeclareMethodDurableFieldStickyRuleTests(TestBase): - """ - The server MUST ignore the durable field if the exchange already exists. - - - """ - - -class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase): - """ - The server MUST ignore the auto-delete field if the exchange already - exists. - - - """ - - -class DeleteMethodTicketFieldValidityRuleTests(TestBase): - """ - The client MUST provide a valid access ticket giving "active" access - rights to the exchange's access realm. - - Client creates access ticket with wrong access rights and attempts to use - in this method. - """ - - -class DeleteMethodExchangeFieldExistsRuleTests(TestBase): - """ - The client MUST NOT attempt to delete an exchange that does not exist. - """ - - -class HeadersExchangeTests(TestBase): - """ - Tests for headers exchange functionality. - """ - def setUp(self): - TestBase.setUp(self) - self.queue_declare(queue="q") - self.q = self.consume("q") - - def myAssertPublishGet(self, headers): - self.assertPublishGet(self.q, exchange="amq.match", properties=headers) - - def myBasicPublish(self, headers): - self.channel.message_transfer(destination="amq.match", body="foobar", application_headers=headers) - - def testMatchAll(self): - self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3}) - self.myAssertPublishGet({"name":"fred", "age":3}) - self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"}) - - # None of these should match - self.myBasicPublish({}) - self.myBasicPublish({"name":"barney"}) - self.myBasicPublish({"name":10}) - self.myBasicPublish({"name":"fred", "age":2}) - self.assertEmpty(self.q) - - def testMatchAny(self): - self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3}) - self.myAssertPublishGet({"name":"fred"}) - self.myAssertPublishGet({"name":"fred", "ignoreme":10}) - self.myAssertPublishGet({"ignoreme":10, "age":3}) - - # Wont match - self.myBasicPublish({}) - self.myBasicPublish({"irrelevant":0}) - self.assertEmpty(self.q) - - -class MiscellaneousErrorsTests(TestBase): - """ - Test some miscellaneous error conditions - """ - def testTypeNotKnown(self): - try: - self.channel.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type") - self.fail("Expected 503 for declaration of unknown exchange type.") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def testDifferentDeclaredType(self): - self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="direct") - try: - self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic") - self.fail("Expected 530 for redeclaration of exchange with different type.") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - #cleanup - other = self.connect() - c2 = other.channel(1) - c2.channel_open() - c2.exchange_delete(exchange="test_different_declared_type_exchange") - diff --git a/qpid/python/tests_0-9/execution.py b/qpid/python/tests_0-9/execution.py deleted file mode 100644 index f2facfe42b..0000000000 --- a/qpid/python/tests_0-9/execution.py +++ /dev/null @@ -1,29 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from qpid.content import Content -from qpid.testlib import testrunner, TestBase - -class ExecutionTests (TestBase): - def test_flush(self): - channel = self.channel - for i in [1, 2, 3]: - channel.basic_publish() - channel.execution_flush() - assert(channel.completion.wait(channel.completion.command_id, timeout=1)) diff --git a/qpid/python/tests_0-9/message.py b/qpid/python/tests_0-9/message.py deleted file mode 100644 index b25016e680..0000000000 --- a/qpid/python/tests_0-9/message.py +++ /dev/null @@ -1,657 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase -from qpid.reference import Reference, ReferenceId - -class MessageTests(TestBase): - """Tests for 'methods' on the amqp message 'class'""" - - def test_consume_no_local(self): - """ - Test that the no_local flag is honoured in the consume method - """ - channel = self.channel - #setup, declare two queues: - channel.queue_declare(queue="test-queue-1a", exclusive=True) - channel.queue_declare(queue="test-queue-1b", exclusive=True) - #establish two consumers one of which excludes delivery of locally sent messages - channel.message_consume(destination="local_included", queue="test-queue-1a") - channel.message_consume(destination="local_excluded", queue="test-queue-1b", no_local=True) - - #send a message - channel.message_transfer(routing_key="test-queue-1a", body="consume_no_local") - channel.message_transfer(routing_key="test-queue-1b", body="consume_no_local") - - #check the queues of the two consumers - excluded = self.client.queue("local_excluded") - included = self.client.queue("local_included") - msg = included.get(timeout=1) - self.assertEqual("consume_no_local", msg.body) - try: - excluded.get(timeout=1) - self.fail("Received locally published message though no_local=true") - except Empty: None - - - def test_consume_exclusive(self): - """ - Test that the exclusive flag is honoured in the consume method - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-2", exclusive=True) - - #check that an exclusive consumer prevents other consumer being created: - channel.message_consume(destination="first", queue="test-queue-2", exclusive=True) - try: - channel.message_consume(destination="second", queue="test-queue-2") - self.fail("Expected consume request to fail due to previous exclusive consumer") - except Closed, e: - self.assertChannelException(403, e.args[0]) - - #open new channel and cleanup last consumer: - channel = self.client.channel(2) - channel.channel_open() - - #check that an exclusive consumer cannot be created if a consumer already exists: - channel.message_consume(destination="first", queue="test-queue-2") - try: - channel.message_consume(destination="second", queue="test-queue-2", exclusive=True) - self.fail("Expected exclusive consume request to fail due to previous consumer") - except Closed, e: - self.assertChannelException(403, e.args[0]) - - def test_consume_queue_errors(self): - """ - Test error conditions associated with the queue field of the consume method: - """ - channel = self.channel - try: - #queue specified but doesn't exist: - channel.message_consume(queue="invalid-queue") - self.fail("Expected failure when consuming from non-existent queue") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - channel = self.client.channel(2) - channel.channel_open() - try: - #queue not specified and none previously declared for channel: - channel.message_consume(queue="") - self.fail("Expected failure when consuming from unspecified queue") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - - def test_consume_unique_consumers(self): - """ - Ensure unique consumer tags are enforced - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-3", exclusive=True) - - #check that attempts to use duplicate tags are detected and prevented: - channel.message_consume(destination="first", queue="test-queue-3") - try: - channel.message_consume(destination="first", queue="test-queue-3") - self.fail("Expected consume request to fail due to non-unique tag") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - - def test_cancel(self): - """ - Test compliance of the basic.cancel method - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-4", exclusive=True) - channel.message_consume(destination="my-consumer", queue="test-queue-4") - channel.message_transfer(routing_key="test-queue-4", body="One") - - #cancel should stop messages being delivered - channel.message_cancel(destination="my-consumer") - channel.message_transfer(routing_key="test-queue-4", body="Two") - myqueue = self.client.queue("my-consumer") - msg = myqueue.get(timeout=1) - self.assertEqual("One", msg.body) - try: - msg = myqueue.get(timeout=1) - self.fail("Got message after cancellation: " + msg) - except Empty: None - - #cancellation of non-existant consumers should be handled without error - channel.message_cancel(destination="my-consumer") - channel.message_cancel(destination="this-never-existed") - - - def test_ack(self): - """ - Test basic ack/recover behaviour - """ - channel = self.channel - channel.queue_declare(queue="test-ack-queue", exclusive=True) - - channel.message_consume(queue="test-ack-queue", destination="consumer_tag", no_ack=False) - queue = self.client.queue("consumer_tag") - - channel.message_transfer(routing_key="test-ack-queue", body="One") - channel.message_transfer(routing_key="test-ack-queue", body="Two") - channel.message_transfer(routing_key="test-ack-queue", body="Three") - channel.message_transfer(routing_key="test-ack-queue", body="Four") - channel.message_transfer(routing_key="test-ack-queue", body="Five") - - msg1 = queue.get(timeout=1) - msg2 = queue.get(timeout=1) - msg3 = queue.get(timeout=1) - msg4 = queue.get(timeout=1) - msg5 = queue.get(timeout=1) - - self.assertEqual("One", msg1.body) - self.assertEqual("Two", msg2.body) - self.assertEqual("Three", msg3.body) - self.assertEqual("Four", msg4.body) - self.assertEqual("Five", msg5.body) - - msg1.ok(batchoffset=1)#One and Two - msg4.ok() - - channel.message_recover(requeue=False) - - msg3b = queue.get(timeout=1) - msg5b = queue.get(timeout=1) - - self.assertEqual("Three", msg3b.body) - self.assertEqual("Five", msg5b.body) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) - except Empty: None - - def test_recover_requeue(self): - """ - Test requeing on recovery - """ - channel = self.channel - channel.queue_declare(queue="test-requeue", exclusive=True) - - channel.message_consume(queue="test-requeue", destination="consumer_tag", no_ack=False) - queue = self.client.queue("consumer_tag") - - channel.message_transfer(routing_key="test-requeue", body="One") - channel.message_transfer(routing_key="test-requeue", body="Two") - channel.message_transfer(routing_key="test-requeue", body="Three") - channel.message_transfer(routing_key="test-requeue", body="Four") - channel.message_transfer(routing_key="test-requeue", body="Five") - - msg1 = queue.get(timeout=1) - msg2 = queue.get(timeout=1) - msg3 = queue.get(timeout=1) - msg4 = queue.get(timeout=1) - msg5 = queue.get(timeout=1) - - self.assertEqual("One", msg1.body) - self.assertEqual("Two", msg2.body) - self.assertEqual("Three", msg3.body) - self.assertEqual("Four", msg4.body) - self.assertEqual("Five", msg5.body) - - msg1.ok(batchoffset=1) #One and Two - msg4.ok() #Four - - channel.message_cancel(destination="consumer_tag") - - #publish a new message - channel.message_transfer(routing_key="test-requeue", body="Six") - #requeue unacked messages (Three and Five) - channel.message_recover(requeue=True) - - channel.message_consume(queue="test-requeue", destination="consumer_tag") - queue2 = self.client.queue("consumer_tag") - - msg3b = queue2.get(timeout=1) - msg5b = queue2.get(timeout=1) - - self.assertEqual("Three", msg3b.body) - self.assertEqual("Five", msg5b.body) - - self.assertEqual(True, msg3b.redelivered) - self.assertEqual(True, msg5b.redelivered) - - self.assertEqual("Six", queue2.get(timeout=1).body) - - try: - extra = queue2.get(timeout=1) - self.fail("Got unexpected message in second queue: " + extra.body) - except Empty: None - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message in original queue: " + extra.body) - except Empty: None - - - def test_qos_prefetch_count(self): - """ - Test that the prefetch count specified is honoured - """ - #setup: declare queue and subscribe - channel = self.channel - channel.queue_declare(queue="test-prefetch-count", exclusive=True) - subscription = channel.message_consume(queue="test-prefetch-count", destination="consumer_tag", no_ack=False) - queue = self.client.queue("consumer_tag") - - #set prefetch to 5: - channel.message_qos(prefetch_count=5) - - #publish 10 messages: - for i in range(1, 11): - channel.message_transfer(routing_key="test-prefetch-count", body="Message %d" % i) - - #only 5 messages should have been delivered: - for i in range(1, 6): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 6th message in original queue: " + extra.body) - except Empty: None - - #ack messages and check that the next set arrive ok: - #todo: once batching is implmented, send a single response for all messages - msg.ok(batchoffset=-4)#1-5 - - for i in range(6, 11): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - - msg.ok(batchoffset=-4)#6-10 - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 11th message in original queue: " + extra.body) - except Empty: None - - - - def test_qos_prefetch_size(self): - """ - Test that the prefetch size specified is honoured - """ - #setup: declare queue and subscribe - channel = self.channel - channel.queue_declare(queue="test-prefetch-size", exclusive=True) - subscription = channel.message_consume(queue="test-prefetch-size", destination="consumer_tag", no_ack=False) - queue = self.client.queue("consumer_tag") - - #set prefetch to 50 bytes (each message is 9 or 10 bytes): - channel.message_qos(prefetch_size=50) - - #publish 10 messages: - for i in range(1, 11): - channel.message_transfer(routing_key="test-prefetch-size", body="Message %d" % i) - - #only 5 messages should have been delivered (i.e. 45 bytes worth): - for i in range(1, 6): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 6th message in original queue: " + extra.body) - except Empty: None - - #ack messages and check that the next set arrive ok: - msg.ok(batchoffset=-4)#1-5 - - for i in range(6, 11): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - - msg.ok(batchoffset=-4)#6-10 - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 11th message in original queue: " + extra.body) - except Empty: None - - #make sure that a single oversized message still gets delivered - large = "abcdefghijklmnopqrstuvwxyz" - large = large + "-" + large; - channel.message_transfer(routing_key="test-prefetch-size", body=large) - msg = queue.get(timeout=1) - self.assertEqual(large, msg.body) - - def test_get(self): - """ - Test message_get method - """ - channel = self.channel - channel.queue_declare(queue="test-get", exclusive=True) - - #publish some messages (no_ack=True) - for i in range(1, 11): - channel.message_transfer(routing_key="test-get", body="Message %d" % i) - - #use message_get to read back the messages, and check that we get an empty at the end - for i in range(1, 11): - tag = "queue %d" % i - reply = channel.message_get(no_ack=True, queue="test-get", destination=tag) - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "ok") - self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body) - - reply = channel.message_get(no_ack=True, queue="test-get") - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "empty") - - #repeat for no_ack=False - for i in range(11, 21): - channel.message_transfer(routing_key="test-get", body="Message %d" % i) - - for i in range(11, 21): - tag = "queue %d" % i - reply = channel.message_get(no_ack=False, queue="test-get", destination=tag) - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "ok") - msg = self.client.queue(tag).get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - - if (i==13): - msg.ok(batchoffset=-2)#11, 12 & 13 - if(i in [15, 17, 19]): - msg.ok() - - reply = channel.message_get(no_ack=True, queue="test-get") - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "empty") - - #recover(requeue=True) - channel.message_recover(requeue=True) - - #get the unacked messages again (14, 16, 18, 20) - for i in [14, 16, 18, 20]: - tag = "queue %d" % i - reply = channel.message_get(no_ack=False, queue="test-get", destination=tag) - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "ok") - msg = self.client.queue(tag).get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - msg.ok() - #channel.message_ack(delivery_tag=reply.delivery_tag) - - reply = channel.message_get(no_ack=True, queue="test-get") - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "empty") - - channel.message_recover(requeue=True) - - reply = channel.message_get(no_ack=True, queue="test-get") - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "empty") - - def test_reference_simple(self): - """ - Test basic ability to handle references - """ - channel = self.channel - channel.queue_declare(queue="ref_queue", exclusive=True) - channel.message_consume(queue="ref_queue", destination="c1") - queue = self.client.queue("c1") - - refId = "myref" - channel.message_open(reference=refId) - channel.message_append(reference=refId, bytes="abcd") - channel.synchronous = False - ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId)) - channel.synchronous = True - - channel.message_append(reference=refId, bytes="efgh") - channel.message_append(reference=refId, bytes="ijkl") - channel.message_close(reference=refId) - - #first, wait for the ok for the transfer - ack.get_response(timeout=1) - - self.assertDataEquals(channel, queue.get(timeout=1), "abcdefghijkl") - - - def test_reference_large(self): - """ - Test basic ability to handle references whose content exceeds max frame size - """ - channel = self.channel - self.queue_declare(queue="ref_queue") - - #generate a big data string (> max frame size of consumer): - data = "0123456789" - for i in range(0, 10): - data += data - #send it inline - channel.synchronous = False - ack = channel.message_transfer(routing_key="ref_queue", body=data) - channel.synchronous = True - #first, wait for the ok for the transfer - ack.get_response(timeout=1) - - #create a new connection for consumer, with specific max frame size (< data) - other = self.connect(tune_params={"channel_max":10, "frame_max":5120, "heartbeat":0}) - ch2 = other.channel(1) - ch2.channel_open() - ch2.message_consume(queue="ref_queue", destination="c1") - queue = other.queue("c1") - - msg = queue.get(timeout=1) - self.assertTrue(isinstance(msg.body, ReferenceId)) - self.assertTrue(msg.reference) - self.assertEquals(data, msg.reference.get_complete()) - - def test_reference_completion(self): - """ - Test that reference transfer are not deemed complete until - closed (therefore are not acked or routed until that point) - """ - channel = self.channel - channel.queue_declare(queue="ref_queue", exclusive=True) - channel.message_consume(queue="ref_queue", destination="c1") - queue = self.client.queue("c1") - - refId = "myref" - channel.message_open(reference=refId) - channel.message_append(reference=refId, bytes="abcd") - channel.synchronous = False - ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId)) - channel.synchronous = True - - try: - msg = queue.get(timeout=1) - self.fail("Got unexpected message on queue: " + msg) - except Empty: None - - self.assertTrue(not ack.is_complete()) - - channel.message_close(reference=refId) - - #first, wait for the ok for the transfer - ack.get_response(timeout=1) - - self.assertDataEquals(channel, queue.get(timeout=1), "abcd") - - def test_reference_multi_transfer(self): - """ - Test that multiple transfer requests for the same reference are - correctly handled. - """ - channel = self.channel - #declare and consume from two queues - channel.queue_declare(queue="q-one", exclusive=True) - channel.queue_declare(queue="q-two", exclusive=True) - channel.message_consume(queue="q-one", destination="q-one") - channel.message_consume(queue="q-two", destination="q-two") - queue1 = self.client.queue("q-one") - queue2 = self.client.queue("q-two") - - #transfer a single ref to both queues (in separate commands) - channel.message_open(reference="my-ref") - channel.synchronous = False - ack1 = channel.message_transfer(routing_key="q-one", body=ReferenceId("my-ref")) - channel.message_append(reference="my-ref", bytes="my data") - ack2 = channel.message_transfer(routing_key="q-two", body=ReferenceId("my-ref")) - channel.synchronous = True - channel.message_close(reference="my-ref") - - #check that both queues have the message - self.assertDataEquals(channel, queue1.get(timeout=1), "my data") - self.assertDataEquals(channel, queue2.get(timeout=1), "my data") - self.assertEmpty(queue1) - self.assertEmpty(queue2) - - #transfer a single ref to the same queue twice (in separate commands) - channel.message_open(reference="my-ref") - channel.synchronous = False - ack1 = channel.message_transfer(routing_key="q-one", message_id="abc", body=ReferenceId("my-ref")) - channel.message_append(reference="my-ref", bytes="second message") - ack2 = channel.message_transfer(routing_key="q-one", message_id="xyz", body=ReferenceId("my-ref")) - channel.synchronous = True - channel.message_close(reference="my-ref") - - msg1 = queue1.get(timeout=1) - msg2 = queue1.get(timeout=1) - #order is undefined - if msg1.message_id == "abc": - self.assertEquals(msg2.message_id, "xyz") - else: - self.assertEquals(msg1.message_id, "xyz") - self.assertEquals(msg2.message_id, "abc") - - #would be legal for the incoming messages to be transfered - #inline or by reference in any combination - - if isinstance(msg1.body, ReferenceId): - self.assertEquals("second message", msg1.reference.get_complete()) - if isinstance(msg2.body, ReferenceId): - if msg1.body != msg2.body: - self.assertEquals("second message", msg2.reference.get_complete()) - #else ok, as same ref as msg1 - else: - self.assertEquals("second message", msg1.body) - if isinstance(msg2.body, ReferenceId): - self.assertEquals("second message", msg2.reference.get_complete()) - else: - self.assertEquals("second message", msg2.body) - - self.assertEmpty(queue1) - - def test_reference_unopened_on_append_error(self): - channel = self.channel - try: - channel.message_append(reference="unopened") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_reference_unopened_on_close_error(self): - channel = self.channel - try: - channel.message_close(reference="unopened") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_reference_unopened_on_transfer_error(self): - channel = self.channel - try: - channel.message_transfer(body=ReferenceId("unopened")) - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_reference_already_opened_error(self): - channel = self.channel - channel.message_open(reference="a") - try: - channel.message_open(reference="a") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_empty_reference(self): - channel = self.channel - channel.queue_declare(queue="ref_queue", exclusive=True) - channel.message_consume(queue="ref_queue", destination="c1") - queue = self.client.queue("c1") - - refId = "myref" - channel.message_open(reference=refId) - channel.synchronous = False - ack = channel.message_transfer(routing_key="ref_queue", message_id="empty-msg", body=ReferenceId(refId)) - channel.synchronous = True - channel.message_close(reference=refId) - - #first, wait for the ok for the transfer - ack.get_response(timeout=1) - - msg = queue.get(timeout=1) - self.assertEquals(msg.message_id, "empty-msg") - self.assertDataEquals(channel, msg, "") - - def test_reject(self): - channel = self.channel - channel.queue_declare(queue = "q", exclusive=True) - - channel.message_consume(queue = "q", destination = "consumer") - channel.message_transfer(routing_key = "q", body="blah, blah") - msg = self.client.queue("consumer").get(timeout = 1) - self.assertEquals(msg.body, "blah, blah") - channel.message_cancel(destination = "consumer") - msg.reject() - - channel.message_consume(queue = "q", destination = "checker") - msg = self.client.queue("checker").get(timeout = 1) - self.assertEquals(msg.body, "blah, blah") - - def test_checkpoint(self): - channel = self.channel - channel.queue_declare(queue = "q", exclusive=True) - - channel.message_open(reference="my-ref") - channel.message_append(reference="my-ref", bytes="abcdefgh") - channel.message_append(reference="my-ref", bytes="ijklmnop") - channel.message_checkpoint(reference="my-ref", identifier="my-checkpoint") - channel.channel_close() - - channel = self.client.channel(2) - channel.channel_open() - channel.message_consume(queue = "q", destination = "consumer") - offset = channel.message_resume(reference="my-ref", identifier="my-checkpoint").value - self.assertTrue(offset<=16) - channel.message_append(reference="my-ref", bytes="qrstuvwxyz") - channel.synchronous = False - channel.message_transfer(routing_key="q-one", message_id="abcd", body=ReferenceId("my-ref")) - channel.synchronous = True - channel.message_close(reference="my-ref") - - self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz") - self.assertEmpty(self.client.queue("consumer")) - - - def assertDataEquals(self, channel, msg, expected): - if isinstance(msg.body, ReferenceId): - data = msg.reference.get_complete() - else: - data = msg.body - self.assertEquals(expected, data) diff --git a/qpid/python/tests_0-9/query.py b/qpid/python/tests_0-9/query.py index c2e08c003c..cb66d079e5 100644 --- a/qpid/python/tests_0-9/query.py +++ b/qpid/python/tests_0-9/query.py @@ -19,7 +19,7 @@ from qpid.client import Client, Closed from qpid.queue import Empty from qpid.content import Content -from qpid.testlib import testrunner, TestBase +from qpid.testlib import TestBase class QueryTests(TestBase): """Tests for various query methods introduced in 0-10 and available in 0-9 for preview""" diff --git a/qpid/python/tests_0-9/queue.py b/qpid/python/tests_0-9/queue.py index e7fe0b3ed4..de1153307c 100644 --- a/qpid/python/tests_0-9/queue.py +++ b/qpid/python/tests_0-9/queue.py @@ -6,9 +6,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,137 +19,11 @@ from qpid.client import Client, Closed from qpid.queue import Empty from qpid.content import Content -from qpid.testlib import testrunner, TestBase +from qpid.testlib import TestBase class QueueTests(TestBase): """Tests for 'methods' on the amqp queue 'class'""" - def test_purge(self): - """ - Test that the purge method removes messages from the queue - """ - channel = self.channel - #setup, declare a queue and add some messages to it: - channel.exchange_declare(exchange="test-exchange", type="direct") - channel.queue_declare(queue="test-queue", exclusive=True) - channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") - channel.message_transfer(destination="test-exchange", routing_key="key", body="one") - channel.message_transfer(destination="test-exchange", routing_key="key", body="two") - channel.message_transfer(destination="test-exchange", routing_key="key", body="three") - - #check that the queue now reports 3 messages: - reply = channel.queue_declare(queue="test-queue") - self.assertEqual(3, reply.message_count) - - #now do the purge, then test that three messages are purged and the count drops to 0 - reply = channel.queue_purge(queue="test-queue"); - self.assertEqual(3, reply.message_count) - reply = channel.queue_declare(queue="test-queue") - self.assertEqual(0, reply.message_count) - - #send a further message and consume it, ensuring that the other messages are really gone - channel.message_transfer(destination="test-exchange", routing_key="key", body="four") - channel.message_consume(queue="test-queue", destination="tag", no_ack=True) - queue = self.client.queue("tag") - msg = queue.get(timeout=1) - self.assertEqual("four", msg.body) - - #check error conditions (use new channels): - channel = self.client.channel(2) - channel.channel_open() - try: - #queue specified but doesn't exist: - channel.queue_purge(queue="invalid-queue") - self.fail("Expected failure when purging non-existent queue") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - channel = self.client.channel(3) - channel.channel_open() - try: - #queue not specified and none previously declared for channel: - channel.queue_purge() - self.fail("Expected failure when purging unspecified queue") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - - #cleanup - other = self.connect() - channel = other.channel(1) - channel.channel_open() - channel.exchange_delete(exchange="test-exchange") - - def test_declare_exclusive(self): - """ - Test that the exclusive field is honoured in queue.declare - """ - # TestBase.setUp has already opened channel(1) - c1 = self.channel - # Here we open a second separate connection: - other = self.connect() - c2 = other.channel(1) - c2.channel_open() - - #declare an exclusive queue: - c1.queue_declare(queue="exclusive-queue", exclusive="True") - try: - #other connection should not be allowed to declare this: - c2.queue_declare(queue="exclusive-queue", exclusive="True") - self.fail("Expected second exclusive queue_declare to raise a channel exception") - except Closed, e: - self.assertChannelException(405, e.args[0]) - - - def test_declare_passive(self): - """ - Test that the passive field is honoured in queue.declare - """ - channel = self.channel - #declare an exclusive queue: - channel.queue_declare(queue="passive-queue-1", exclusive="True") - channel.queue_declare(queue="passive-queue-1", passive="True") - try: - #other connection should not be allowed to declare this: - channel.queue_declare(queue="passive-queue-2", passive="True") - self.fail("Expected passive declaration of non-existant queue to raise a channel exception") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - - def test_bind(self): - """ - Test various permutations of the queue.bind method - """ - channel = self.channel - channel.queue_declare(queue="queue-1", exclusive="True") - - #straightforward case, both exchange & queue exist so no errors expected: - channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1") - - #bind the default queue for the channel (i.e. last one declared): - channel.queue_bind(exchange="amq.direct", routing_key="key2") - - #use the queue name where neither routing key nor queue are specified: - channel.queue_bind(exchange="amq.direct") - - #try and bind to non-existant exchange - try: - channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1") - self.fail("Expected bind to non-existant exchange to fail") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - #need to reopen a channel: - channel = self.client.channel(2) - channel.channel_open() - - #try and bind non-existant queue: - try: - channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1") - self.fail("Expected bind of non-existant queue to fail") - except Closed, e: - self.assertChannelException(404, e.args[0]) - def test_unbind_direct(self): self.unbind_test(exchange="amq.direct", routing_key="key") @@ -165,12 +39,12 @@ class QueueTests(TestBase): def unbind_test(self, exchange, routing_key="", args=None, headers={}): #bind two queues and consume from them channel = self.channel - + channel.queue_declare(queue="queue-1", exclusive="True") channel.queue_declare(queue="queue-2", exclusive="True") - channel.message_consume(queue="queue-1", destination="queue-1", no_ack=True) - channel.message_consume(queue="queue-2", destination="queue-2", no_ack=True) + channel.basic_consume(queue="queue-1", consumer_tag="queue-1", no_ack=True) + channel.basic_consume(queue="queue-2", consumer_tag="queue-2", no_ack=True) queue1 = self.client.queue("queue-1") queue2 = self.client.queue("queue-2") @@ -179,130 +53,29 @@ class QueueTests(TestBase): channel.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args) #send a message that will match both bindings - channel.message_transfer(destination=exchange, routing_key=routing_key, application_headers=headers, body="one") - + channel.basic_publish(exchange=exchange, routing_key=routing_key, + content=Content("one", properties={"headers": headers})) + #unbind first queue channel.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) - + #send another message - channel.message_transfer(destination=exchange, routing_key=routing_key, application_headers=headers, body="two") + channel.basic_publish(exchange=exchange, routing_key=routing_key, + content=Content("two", properties={"headers": headers})) #check one queue has both messages and the other has only one - self.assertEquals("one", queue1.get(timeout=1).body) + self.assertEquals("one", queue1.get(timeout=1).content.body) try: msg = queue1.get(timeout=1) self.fail("Got extra message: %s" % msg.body) except Empty: pass - self.assertEquals("one", queue2.get(timeout=1).body) - self.assertEquals("two", queue2.get(timeout=1).body) + self.assertEquals("one", queue2.get(timeout=1).content.body) + self.assertEquals("two", queue2.get(timeout=1).content.body) try: msg = queue2.get(timeout=1) self.fail("Got extra message: " + msg) - except Empty: pass - - - def test_delete_simple(self): - """ - Test core queue deletion behaviour - """ - channel = self.channel - - #straight-forward case: - channel.queue_declare(queue="delete-me") - channel.message_transfer(routing_key="delete-me", body="a") - channel.message_transfer(routing_key="delete-me", body="b") - channel.message_transfer(routing_key="delete-me", body="c") - reply = channel.queue_delete(queue="delete-me") - self.assertEqual(3, reply.message_count) - #check that it has gone be declaring passively - try: - channel.queue_declare(queue="delete-me", passive="True") - self.fail("Queue has not been deleted") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - #check attempted deletion of non-existant queue is handled correctly: - channel = self.client.channel(2) - channel.channel_open() - try: - channel.queue_delete(queue="i-dont-exist", if_empty="True") - self.fail("Expected delete of non-existant queue to fail") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - - - def test_delete_ifempty(self): - """ - Test that if_empty field of queue_delete is honoured - """ - channel = self.channel - - #create a queue and add a message to it (use default binding): - channel.queue_declare(queue="delete-me-2") - channel.queue_declare(queue="delete-me-2", passive="True") - channel.message_transfer(routing_key="delete-me-2", body="message") - - #try to delete, but only if empty: - try: - channel.queue_delete(queue="delete-me-2", if_empty="True") - self.fail("Expected delete if_empty to fail for non-empty queue") - except Closed, e: - self.assertChannelException(406, e.args[0]) - - #need new channel now: - channel = self.client.channel(2) - channel.channel_open() - - #empty queue: - channel.message_consume(destination="consumer_tag", queue="delete-me-2", no_ack=True) - queue = self.client.queue("consumer_tag") - msg = queue.get(timeout=1) - self.assertEqual("message", msg.body) - channel.message_cancel(destination="consumer_tag") - - #retry deletion on empty queue: - channel.queue_delete(queue="delete-me-2", if_empty="True") - - #check that it has gone by declaring passively: - try: - channel.queue_declare(queue="delete-me-2", passive="True") - self.fail("Queue has not been deleted") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - def test_delete_ifunused(self): - """ - Test that if_unused field of queue_delete is honoured - """ - channel = self.channel - - #create a queue and register a consumer: - channel.queue_declare(queue="delete-me-3") - channel.queue_declare(queue="delete-me-3", passive="True") - channel.message_consume(destination="consumer_tag", queue="delete-me-3", no_ack=True) - - #need new channel now: - channel2 = self.client.channel(2) - channel2.channel_open() - #try to delete, but only if empty: - try: - channel2.queue_delete(queue="delete-me-3", if_unused="True") - self.fail("Expected delete if_unused to fail for queue with existing consumer") - except Closed, e: - self.assertChannelException(406, e.args[0]) - - - channel.message_cancel(destination="consumer_tag") - channel.queue_delete(queue="delete-me-3", if_unused="True") - #check that it has gone by declaring passively: - try: - channel.queue_declare(queue="delete-me-3", passive="True") - self.fail("Queue has not been deleted") - except Closed, e: - self.assertChannelException(404, e.args[0]) - + except Empty: pass def test_autodelete_shared(self): """ @@ -336,5 +109,3 @@ class QueueTests(TestBase): self.fail("Expected queue to have been deleted") except Closed, e: self.assertChannelException(404, e.args[0]) - - diff --git a/qpid/python/tests_0-9/testlib.py b/qpid/python/tests_0-9/testlib.py deleted file mode 100644 index f345fbbd80..0000000000 --- a/qpid/python/tests_0-9/testlib.py +++ /dev/null @@ -1,66 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# -# Tests for the testlib itself. -# - -from qpid.content import Content -from qpid.testlib import testrunner, TestBase -from Queue import Empty - -import sys -from traceback import * - -def mytrace(frame, event, arg): - print_stack(frame); - print "====" - return mytrace - -class TestBaseTest(TestBase): - """Verify TestBase functions work as expected""" - - def testAssertEmptyPass(self): - """Test assert empty works""" - self.queue_declare(queue="empty") - q = self.consume("empty") - self.assertEmpty(q) - try: - q.get(timeout=1) - self.fail("Queue is not empty.") - except Empty: None # Ignore - - def testAssertEmptyFail(self): - self.queue_declare(queue="full") - q = self.consume("full") - self.channel.message_transfer(routing_key="full", body="") - try: - self.assertEmpty(q); - self.fail("assertEmpty did not assert on non-empty queue") - except AssertionError: None # Ignore - - def testMessageProperties(self): - """Verify properties are passed with message""" - props={"x":1, "y":2} - self.queue_declare(queue="q") - q = self.consume("q") - self.assertPublishGet(q, routing_key="q", properties=props) - - - diff --git a/qpid/python/tests_0-9/tx.py b/qpid/python/tests_0-9/tx.py deleted file mode 100644 index 0f6b4f5cd1..0000000000 --- a/qpid/python/tests_0-9/tx.py +++ /dev/null @@ -1,188 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase - -class TxTests(TestBase): - """ - Tests for 'methods' on the amqp tx 'class' - """ - - def test_commit(self): - """ - Test that commited publishes are delivered and commited acks are not re-delivered - """ - channel = self.channel - queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-commit-a", "tx-commit-b", "tx-commit-c") - channel.tx_commit() - - #check results - for i in range(1, 5): - msg = queue_c.get(timeout=1) - self.assertEqual("TxMessage %d" % i, msg.body) - msg.ok() - - msg = queue_b.get(timeout=1) - self.assertEqual("TxMessage 6", msg.body) - msg.ok() - - msg = queue_a.get(timeout=1) - self.assertEqual("TxMessage 7", msg.body) - msg.ok() - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) - except Empty: None - - #cleanup - channel.tx_commit() - - def test_auto_rollback(self): - """ - Test that a channel closed with an open transaction is effectively rolled back - """ - channel = self.channel - queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c") - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) - except Empty: None - - channel.tx_rollback() - - #check results - for i in range(1, 5): - msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - msg.ok() - - msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.body) - msg.ok() - - msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.body) - msg.ok() - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) - except Empty: None - - #cleanup - channel.tx_commit() - - def test_rollback(self): - """ - Test that rolled back publishes are not delivered and rolled back acks are re-delivered - """ - channel = self.channel - queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c") - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) - except Empty: None - - channel.tx_rollback() - - #check results - for i in range(1, 5): - msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - msg.ok() - - msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.body) - msg.ok() - - msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.body) - msg.ok() - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) - except Empty: None - - #cleanup - channel.tx_commit() - - def perform_txn_work(self, channel, name_a, name_b, name_c): - """ - Utility method that does some setup and some work under a transaction. Used for testing both - commit and rollback - """ - #setup: - channel.queue_declare(queue=name_a, exclusive=True) - channel.queue_declare(queue=name_b, exclusive=True) - channel.queue_declare(queue=name_c, exclusive=True) - - key = "my_key_" + name_b - topic = "my_topic_" + name_c - - channel.queue_bind(queue=name_b, exchange="amq.direct", routing_key=key) - channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic) - - for i in range(1, 5): - channel.message_transfer(routing_key=name_a, body="Message %d" % i) - - channel.message_transfer(routing_key=key, destination="amq.direct", body="Message 6") - channel.message_transfer(routing_key=topic, destination="amq.topic", body="Message 7") - - channel.tx_select() - - #consume and ack messages - channel.message_consume(queue=name_a, destination="sub_a", no_ack=False) - queue_a = self.client.queue("sub_a") - for i in range(1, 5): - msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - - msg.ok(batchoffset=-3) - - channel.message_consume(queue=name_b, destination="sub_b", no_ack=False) - queue_b = self.client.queue("sub_b") - msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.body) - msg.ok() - - sub_c = channel.message_consume(queue=name_c, destination="sub_c", no_ack=False) - queue_c = self.client.queue("sub_c") - msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.body) - msg.ok() - - #publish messages - for i in range(1, 5): - channel.message_transfer(routing_key=topic, destination="amq.topic", body="TxMessage %d" % i) - - channel.message_transfer(routing_key=key, destination="amq.direct", body="TxMessage 6") - channel.message_transfer(routing_key=name_a, body="TxMessage 7") - - return queue_a, queue_b, queue_c |
