diff options
| author | Alan Conway <aconway@apache.org> | 2011-08-25 20:41:28 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2011-08-25 20:41:28 +0000 |
| commit | 2fdd2cc2ade41e213ae35818532574bbf40f4a00 (patch) | |
| tree | 42fb45022ea08fee157abf50713b452acf5eda5d /cpp/src/tests | |
| parent | 7f99badd1c330b3a6032b15a13aca1cde81274d3 (diff) | |
| download | qpid-python-2fdd2cc2ade41e213ae35818532574bbf40f4a00.tar.gz | |
QPID-3384: Enable DTX transactions in a cluster.
- Replicate DTX state to new members joining.
- Use cluster timer for DTX timeouts.
- Incidental: quote nulls in qpid::Msg messages (XIDs often have null characters)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1161742 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
| -rw-r--r-- | cpp/src/tests/brokertest.py | 4 | ||||
| -rw-r--r-- | cpp/src/tests/cluster_python_tests_failing.txt | 28 | ||||
| -rwxr-xr-x | cpp/src/tests/cluster_tests.py | 204 |
3 files changed, 203 insertions, 33 deletions
diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py index fd972b4394..7888f44c30 100644 --- a/cpp/src/tests/brokertest.py +++ b/cpp/src/tests/brokertest.py @@ -493,9 +493,7 @@ class BrokerTest(TestCase): return cluster def browse(self, session, queue, timeout=0): - """Assert that the contents of messages on queue (as retrieved - using session and timeout) exactly match the strings in - expect_contents""" + """Return a list with the contents of each message on queue.""" r = session.receiver("%s;{mode:browse}"%(queue)) try: contents = [] diff --git a/cpp/src/tests/cluster_python_tests_failing.txt b/cpp/src/tests/cluster_python_tests_failing.txt index 7ba8089946..f8639d7b59 100644 --- a/cpp/src/tests/cluster_python_tests_failing.txt +++ b/cpp/src/tests/cluster_python_tests_failing.txt @@ -1,32 +1,4 @@ qpid_tests.broker_0_10.management.ManagementTest.test_purge_queue qpid_tests.broker_0_10.management.ManagementTest.test_connection_close -qpid_tests.broker_0_10.dtx.DtxTests.test_bad_resume -qpid_tests.broker_0_10.dtx.DtxTests.test_commit_unknown -qpid_tests.broker_0_10.dtx.DtxTests.test_end -qpid_tests.broker_0_10.dtx.DtxTests.test_end_suspend_and_fail -qpid_tests.broker_0_10.dtx.DtxTests.test_end_unknown_xid -qpid_tests.broker_0_10.dtx.DtxTests.test_forget_xid_on_completion -qpid_tests.broker_0_10.dtx.DtxTests.test_get_timeout -qpid_tests.broker_0_10.dtx.DtxTests.test_get_timeout_unknown -qpid_tests.broker_0_10.dtx.DtxTests.test_implicit_end -qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_not_ended -qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_one_phase_false -qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_one_phase_true -qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_prepare_not_ended -qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_rollback_not_ended -qpid_tests.broker_0_10.dtx.DtxTests.test_prepare_unknown -qpid_tests.broker_0_10.dtx.DtxTests.test_recover -qpid_tests.broker_0_10.dtx.DtxTests.test_rollback_unknown -qpid_tests.broker_0_10.dtx.DtxTests.test_select_required -qpid_tests.broker_0_10.dtx.DtxTests.test_set_timeout -qpid_tests.broker_0_10.dtx.DtxTests.test_simple_commit -qpid_tests.broker_0_10.dtx.DtxTests.test_simple_prepare_commit -qpid_tests.broker_0_10.dtx.DtxTests.test_simple_prepare_rollback -qpid_tests.broker_0_10.dtx.DtxTests.test_simple_rollback -qpid_tests.broker_0_10.dtx.DtxTests.test_start_already_known -qpid_tests.broker_0_10.dtx.DtxTests.test_start_join -qpid_tests.broker_0_10.dtx.DtxTests.test_start_join_and_resume -qpid_tests.broker_0_10.dtx.DtxTests.test_suspend_resume -qpid_tests.broker_0_10.dtx.DtxTests.test_suspend_start_end_resume qpid_tests.broker_0_10.message.MessageTests.test_ttl qpid_tests.broker_0_10.management.ManagementTest.test_broker_connectivity_oldAPI diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index 807e9508c3..e67a691283 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -713,6 +713,204 @@ acl allow all all cluster.start() fetch(cluster[2]) +# Some utility code for transaction tests +XA_RBROLLBACK = 1 +XA_RBTIMEOUT = 2 +XA_OK = 0 +dtx_branch_counter = 0 + +class DtxTestFixture: + """Bundle together some common requirements for dtx tests.""" + def __init__(self, test, broker, name, exclusive=False): + self.test = test + self.broker = broker + self.name = name + # Use old API. DTX is not supported in messaging API. + self.connection = broker.connect_old() + self.session = self.connection.session(name, 1) # 1 second timeout + self.queue = self.session.queue_declare(name, exclusive=exclusive) + self.xid = self.session.xid(format=0, global_id=name) + self.session.dtx_select() + self.consumer = None + + def start(self): + self.test.assertEqual(XA_OK, self.session.dtx_start(xid=self.xid).status) + + def end(self): + self.test.assertEqual(XA_OK, self.session.dtx_end(xid=self.xid).status) + + def prepare(self): + self.test.assertEqual(XA_OK, self.session.dtx_prepare(xid=self.xid).status) + + def commit(self, one_phase=True): + self.test.assertEqual( + XA_OK, self.session.dtx_commit(xid=self.xid, one_phase=one_phase).status) + + def rollback(self): + self.test.assertEqual(XA_OK, self.session.dtx_rollback(xid=self.xid).status) + + def send(self, messages): + for m in messages: + dp=self.session.delivery_properties(routing_key=self.name) + mp=self.session.message_properties() + self.session.message_transfer(message=qpid.datatypes.Message(dp, mp, m)) + + def accept(self): + """Accept 1 message from queue""" + consumer_tag="%s-consumer"%(self.name) + self.session.message_subscribe(queue=self.name, destination=consumer_tag) + self.session.message_flow(unit = self.session.credit_unit.message, value = 1, destination = consumer_tag) + self.session.message_flow(unit = self.session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag) + msg = self.session.incoming(consumer_tag).get(timeout=1) + self.session.message_cancel(destination=consumer_tag) + self.session.message_accept(qpid.datatypes.RangedSet(msg.id)) + return msg + + + def verify(self, cluster, messages): + for b in cluster: + self.test.assert_browse(b.connect().session(), self.name, messages) + + +class DtxTests(BrokerTest): + + def test_dtx_update(self): + """Verify that DTX transaction state is updated to a new broker. + Start a collection of transactions, then add a new cluster member, + then verify they commit/rollback correctly on the new broker.""" + + # Note: multiple test have been bundled into one to avoid the need to start/stop + # multiple brokers per test. + + cluster=self.cluster(1) + + # Transaction that will be open when new member joins, then committed. + t1 = DtxTestFixture(self, cluster[0], "t1") + t1.start() + t1.send(["1", "2"]) + t1.verify(cluster, []) # Not visible outside of transaction + + # Transaction that will be open when new member joins, then rolled back. + t2 = DtxTestFixture(self, cluster[0], "t2") + t2.start() + t2.send(["1", "2"]) + + # Transaction that will be prepared when new member joins, then committed. + t3 = DtxTestFixture(self, cluster[0], "t3") + t3.start() + t3.send(["1", "2"]) + t3.end() + t3.prepare() + t1.verify(cluster, []) # Not visible outside of transaction + + # Transaction that will be prepared when new member joins, then rolled back. + t4 = DtxTestFixture(self, cluster[0], "t4") + t4.start() + t4.send(["1", "2"]) + t4.end() + t4.prepare() + + # Transaction using an exclusive queue + t5 = DtxTestFixture(self, cluster[0], "t5", exclusive=True) + t5.start() + t5.send(["1", "2"]) + + # Accept messages in a transaction before/after join then commit + t6 = DtxTestFixture(self, cluster[0], "t6") + t6.send(["a","b","c"]) + t6.start() + t6.verify(cluster, ["a","b","c"]) + self.assertEqual(t6.accept().body, "a"); + t6.verify(cluster, ["b","c"]) + + # Accept messages in a transaction before/after join then roll back + t7 = DtxTestFixture(self, cluster[0], "t7") + t7.send(["a","b","c"]) + t7.start() + t7.verify(cluster, ["a","b","c"]) + self.assertEqual(t7.accept().body, "a"); + t7.verify(cluster, ["b","c"]) + + # Start new member + cluster.start() + + # Commit t1 + t1.send(["3","4"]) + t1.verify(cluster, []) + t1.end() + t1.commit(one_phase=True) + t1.verify(cluster, ["1","2","3","4"]) + + # Rollback t2 + t2.send(["3","4"]) + t2.verify(cluster, []) + t2.end() + t2.rollback() + t2.verify(cluster, []) + + # Commit t3 + t3.verify(cluster, []) + t3.commit(one_phase=False) + t3.verify(cluster, ["1","2"]) + + # Rollback t4 + t4.verify(cluster, []) + t4.rollback() + t4.verify(cluster, []) + + # Commit t5 + t5.send(["3","4"]) + t5.verify(cluster, []) + t5.end() + t5.commit(one_phase=True) + t5.verify(cluster, ["1","2","3","4"]) + + # Commit t7 + t6.verify(cluster, ["b", "c"]) + self.assertEqual(t6.accept().body, "b"); + t6.verify(cluster, ["c"]) + t6.end() + t6.commit(one_phase=True) + t6.verify(cluster, ["c"]) + t6.session.close() # Make sure they're not requeued by the session. + t6.verify(cluster, ["c"]) + + # Rollback t7 + t7.verify(cluster, ["b", "c"]) + self.assertEqual(t7.accept().body, "b"); + t7.verify(cluster, ["c"]) + t7.end() + t7.rollback() + t7.verify(cluster, ["a", "b", "c"]) + + +class TxTests(BrokerTest): + + def test_tx_update(self): + """Verify that transaction state is updated to a new broker""" + + def make_message(session, body=None, key=None, id=None): + dp=session.delivery_properties(routing_key=key) + mp=session.message_properties(correlation_id=id) + return qpid.datatypes.Message(dp, mp, body) + + cluster=self.cluster(1) + # Use old API. TX is not supported in messaging API. + c = cluster[0].connect_old() + s = c.session("tx-session", 1) + s.queue_declare(queue="q") + # Start transaction + s.tx_select() + s.message_transfer(message=make_message(s, "1", "q")) + # Start new member mid-transaction + cluster.start() + # Do more work + s.message_transfer(message=make_message(s, "2", "q")) + # Commit the transaction and verify the results. + s.tx_commit() + for b in cluster: self.assert_browse(b.connect().session(), "q", ["1","2"]) + + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): @@ -1001,6 +1199,8 @@ class LongTests(BrokerTest): logger = logging.getLogger() log_level = logger.getEffectiveLevel() logger.setLevel(logging.ERROR) + sender = None + receiver = None try: # Start sender and receiver threads receiver = Receiver(cluster[0], "q;{create:always}") @@ -1031,8 +1231,8 @@ class LongTests(BrokerTest): finally: # Detach to avoid slow reconnect attempts during shut-down if test fails. - sender.connection.detach() - receiver.connection.detach() + if sender: sender.connection.detach() + if receiver: receiver.connection.detach() logger.setLevel(log_level) class StoreTests(BrokerTest): |
