diff options
| author | Alan Conway <aconway@apache.org> | 2007-03-21 01:26:37 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-03-21 01:26:37 +0000 |
| commit | ae01787ce90ee7cd1303ce7769328cd242e00f6e (patch) | |
| tree | 72df2f2f98c1f2cdc117debb2c6e1f9141bf1df7 /python/tests_0-9/tx.py | |
| parent | 26aa43e242082422647240054e16c003a54d0df9 (diff) | |
| download | qpid-python-ae01787ce90ee7cd1303ce7769328cd242e00f6e.tar.gz | |
* cpp-0-9: svn copy of 0-9 branch cpp, will rename to cpp on next update.
* cpp-0-9/gentools: independent copy of gentools for cpp.
Temporary measure till /java and /gentools are at 0-9
* python/tests_0-9: tests from 0-9 branch.
* python/qpid/testlib.py: Use tests_0-9 with 0-9 spec (default is still 0-8)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@520690 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/tests_0-9/tx.py')
| -rw-r--r-- | python/tests_0-9/tx.py | 188 |
1 files changed, 188 insertions, 0 deletions
diff --git a/python/tests_0-9/tx.py b/python/tests_0-9/tx.py new file mode 100644 index 0000000000..0f6b4f5cd1 --- /dev/null +++ b/python/tests_0-9/tx.py @@ -0,0 +1,188 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class TxTests(TestBase): + """ + Tests for 'methods' on the amqp tx 'class' + """ + + def test_commit(self): + """ + Test that commited publishes are delivered and commited acks are not re-delivered + """ + channel = self.channel + queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-commit-a", "tx-commit-b", "tx-commit-c") + channel.tx_commit() + + #check results + for i in range(1, 5): + msg = queue_c.get(timeout=1) + self.assertEqual("TxMessage %d" % i, msg.body) + msg.ok() + + msg = queue_b.get(timeout=1) + self.assertEqual("TxMessage 6", msg.body) + msg.ok() + + msg = queue_a.get(timeout=1) + self.assertEqual("TxMessage 7", msg.body) + msg.ok() + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + #cleanup + channel.tx_commit() + + def test_auto_rollback(self): + """ + Test that a channel closed with an open transaction is effectively rolled back + """ + channel = self.channel + queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c") + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + channel.tx_rollback() + + #check results + for i in range(1, 5): + msg = queue_a.get(timeout=1) + self.assertEqual("Message %d" % i, msg.body) + msg.ok() + + msg = queue_b.get(timeout=1) + self.assertEqual("Message 6", msg.body) + msg.ok() + + msg = queue_c.get(timeout=1) + self.assertEqual("Message 7", msg.body) + msg.ok() + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + #cleanup + channel.tx_commit() + + def test_rollback(self): + """ + Test that rolled back publishes are not delivered and rolled back acks are re-delivered + """ + channel = self.channel + queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c") + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + channel.tx_rollback() + + #check results + for i in range(1, 5): + msg = queue_a.get(timeout=1) + self.assertEqual("Message %d" % i, msg.body) + msg.ok() + + msg = queue_b.get(timeout=1) + self.assertEqual("Message 6", msg.body) + msg.ok() + + msg = queue_c.get(timeout=1) + self.assertEqual("Message 7", msg.body) + msg.ok() + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + #cleanup + channel.tx_commit() + + def perform_txn_work(self, channel, name_a, name_b, name_c): + """ + Utility method that does some setup and some work under a transaction. Used for testing both + commit and rollback + """ + #setup: + channel.queue_declare(queue=name_a, exclusive=True) + channel.queue_declare(queue=name_b, exclusive=True) + channel.queue_declare(queue=name_c, exclusive=True) + + key = "my_key_" + name_b + topic = "my_topic_" + name_c + + channel.queue_bind(queue=name_b, exchange="amq.direct", routing_key=key) + channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic) + + for i in range(1, 5): + channel.message_transfer(routing_key=name_a, body="Message %d" % i) + + channel.message_transfer(routing_key=key, destination="amq.direct", body="Message 6") + channel.message_transfer(routing_key=topic, destination="amq.topic", body="Message 7") + + channel.tx_select() + + #consume and ack messages + channel.message_consume(queue=name_a, destination="sub_a", no_ack=False) + queue_a = self.client.queue("sub_a") + for i in range(1, 5): + msg = queue_a.get(timeout=1) + self.assertEqual("Message %d" % i, msg.body) + + msg.ok(batchoffset=-3) + + channel.message_consume(queue=name_b, destination="sub_b", no_ack=False) + queue_b = self.client.queue("sub_b") + msg = queue_b.get(timeout=1) + self.assertEqual("Message 6", msg.body) + msg.ok() + + sub_c = channel.message_consume(queue=name_c, destination="sub_c", no_ack=False) + queue_c = self.client.queue("sub_c") + msg = queue_c.get(timeout=1) + self.assertEqual("Message 7", msg.body) + msg.ok() + + #publish messages + for i in range(1, 5): + channel.message_transfer(routing_key=topic, destination="amq.topic", body="TxMessage %d" % i) + + channel.message_transfer(routing_key=key, destination="amq.direct", body="TxMessage 6") + channel.message_transfer(routing_key=name_a, body="TxMessage 7") + + return queue_a, queue_b, queue_c |
