From f6ed9e1b6a770caa5889f670cba0c74ab1c82357 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Tue, 17 Oct 2006 15:29:28 +0000 Subject: Added test for simple commit and rollback. Updated 'failing' lists Fix for amqp methods with no arguments git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@464943 13f79535-47bb-0310-9956-ffa450edef68 --- python/cpp_failing.txt | 2 + python/java_failing.txt | 16 ++++++ python/qpid/spec.py | 2 + python/tests/tx.py | 143 ++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 163 insertions(+) create mode 100644 python/tests/tx.py (limited to 'python') diff --git a/python/cpp_failing.txt b/python/cpp_failing.txt index e69de29bb2..167fc9cce5 100644 --- a/python/cpp_failing.txt +++ b/python/cpp_failing.txt @@ -0,0 +1,2 @@ +tests.tx.TxTests.test_commit +tests.tx.TxTests.test_rollback diff --git a/python/java_failing.txt b/python/java_failing.txt index 2e61363817..a752bc53c0 100644 --- a/python/java_failing.txt +++ b/python/java_failing.txt @@ -1,9 +1,23 @@ +tests.basic.BasicTests.test_cancel tests.basic.BasicTests.test_consume_exclusive tests.basic.BasicTests.test_consume_no_local tests.basic.BasicTests.test_consume_queue_errors tests.basic.BasicTests.test_consume_unique_consumers +tests.basic.BasicTests.test_get +tests.basic.BasicTests.test_qos_prefetch_size +tests.basic.BasicTests.test_recover_requeue +tests.exchange.ExchangeTests +tests.exchange.DefaultExchangeRuleTests.testDefaultExchange +tests.exchange.HeadersExchangeTests.testMatchAll +tests.exchange.HeadersExchangeTests.testMatchAny +tests.exchange.RecommendedTypesRuleTests.testDirect tests.exchange.RecommendedTypesRuleTests.testFanout +tests.exchange.RecommendedTypesRuleTests.testHeaders +tests.exchange.RecommendedTypesRuleTests.testTopic +tests.exchange.RequiredInstancesRuleTests.testAmqDirect tests.exchange.RequiredInstancesRuleTests.testAmqFanOut +tests.exchange.RequiredInstancesRuleTests.testAmqMatch +tests.exchange.RequiredInstancesRuleTests.testAmqTopic tests.queue.QueueTests.test_declare_exclusive tests.queue.QueueTests.test_declare_passive tests.queue.QueueTests.test_delete_ifempty @@ -11,3 +25,5 @@ tests.queue.QueueTests.test_delete_ifunused tests.queue.QueueTests.test_delete_simple tests.queue.QueueTests.test_purge tests.queue.QueueTests.test_bind +tests.testlib.TestBaseTest.testMessageProperties +tests.broker.BrokerTests.test_invalid_channel diff --git a/python/qpid/spec.py b/python/qpid/spec.py index 70e09aa1e9..b270c45e13 100644 --- a/python/qpid/spec.py +++ b/python/qpid/spec.py @@ -204,6 +204,8 @@ class Method(Metadata): code += " return self.invoke(%s" % Method.METHOD if argnames: code += ", (%s,)" % argnames + else: + code += ", ()" if self.content: code += ", content" code += ")" diff --git a/python/tests/tx.py b/python/tests/tx.py new file mode 100644 index 0000000000..b1030bf602 --- /dev/null +++ b/python/tests/tx.py @@ -0,0 +1,143 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class TxTests(TestBase): + """ + Tests for 'methods' on the amqp tx 'class' + """ + + def test_commit(self): + """ + Test that commited publishes are delivered and commited acks are not re-delivered + """ + channel = self.channel + queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-commit-a", "tx-commit-b", "tx-commit-c") + channel.tx_commit() + + #check results + for i in range(1, 5): + msg = queue_c.get(timeout=1) + self.assertEqual("TxMessage %d" % i, msg.content.body) + + msg = queue_b.get(timeout=1) + self.assertEqual("TxMessage 6", msg.content.body) + + msg = queue_a.get(timeout=1) + self.assertEqual("TxMessage 7", msg.content.body) + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + #cleanup + channel.basic_ack(delivery_tag=0, multiple=True) + channel.tx_commit() + + def test_rollback(self): + """ + Test that rolled back publishes are not delivered and rolled back acks are re-delivered + """ + channel = self.channel + queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c") + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + channel.tx_rollback() + + #check results + for i in range(1, 5): + msg = queue_a.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + msg = queue_b.get(timeout=1) + self.assertEqual("Message 6", msg.content.body) + + msg = queue_c.get(timeout=1) + self.assertEqual("Message 7", msg.content.body) + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + #cleanup + channel.basic_ack(delivery_tag=0, multiple=True) + channel.tx_commit() + + def perform_txn_work(self, channel, name_a, name_b, name_c): + """ + Utility method that does some setup and some work under a transaction. Used for testing both + commit and rollback + """ + #setup: + channel.queue_declare(queue=name_a, exclusive=True) + channel.queue_declare(queue=name_b, exclusive=True) + channel.queue_declare(queue=name_c, exclusive=True) + + key = "my_key_" + name_b + topic = "my_topic_" + name_c + + channel.queue_bind(queue=name_b, exchange="amq.direct", routing_key=key) + channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic) + + for i in range(1, 5): + channel.basic_publish(routing_key=name_a, content=Content("Message %d" % i)) + + channel.basic_publish(routing_key=key, exchange="amq.direct", content=Content("Message 6")) + channel.basic_publish(routing_key=topic, exchange="amq.topic", content=Content("Message 7")) + + + channel.tx_select() + + #consume and ack messages + sub_a = channel.basic_consume(queue=name_a, no_ack=False) + queue_a = self.client.queue(sub_a.consumer_tag) + for i in range(1, 5): + msg = queue_a.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + + sub_b = channel.basic_consume(queue=name_b, no_ack=False) + queue_b = self.client.queue(sub_b.consumer_tag) + msg = queue_b.get(timeout=1) + self.assertEqual("Message 6", msg.content.body) + channel.basic_ack(delivery_tag=msg.delivery_tag) + + sub_c = channel.basic_consume(queue=name_c, no_ack=False) + queue_c = self.client.queue(sub_c.consumer_tag) + msg = queue_c.get(timeout=1) + self.assertEqual("Message 7", msg.content.body) + channel.basic_ack(delivery_tag=msg.delivery_tag) + + #publish messages + for i in range(1, 5): + channel.basic_publish(routing_key=topic, exchange="amq.topic", content=Content("TxMessage %d" % i)) + + channel.basic_publish(routing_key=key, exchange="amq.direct", content=Content("TxMessage 6")) + channel.basic_publish(routing_key=name_a, content=Content("TxMessage 7")) + + return queue_a, queue_b, queue_c -- cgit v1.2.1