summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-08-25 20:41:28 +0000
committerAlan Conway <aconway@apache.org>2011-08-25 20:41:28 +0000
commit2fdd2cc2ade41e213ae35818532574bbf40f4a00 (patch)
tree42fb45022ea08fee157abf50713b452acf5eda5d /cpp/src/tests
parent7f99badd1c330b3a6032b15a13aca1cde81274d3 (diff)
downloadqpid-python-2fdd2cc2ade41e213ae35818532574bbf40f4a00.tar.gz
QPID-3384: Enable DTX transactions in a cluster.
- Replicate DTX state to new members joining. - Use cluster timer for DTX timeouts. - Incidental: quote nulls in qpid::Msg messages (XIDs often have null characters) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1161742 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/brokertest.py4
-rw-r--r--cpp/src/tests/cluster_python_tests_failing.txt28
-rwxr-xr-xcpp/src/tests/cluster_tests.py204
3 files changed, 203 insertions, 33 deletions
diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py
index fd972b4394..7888f44c30 100644
--- a/cpp/src/tests/brokertest.py
+++ b/cpp/src/tests/brokertest.py
@@ -493,9 +493,7 @@ class BrokerTest(TestCase):
return cluster
def browse(self, session, queue, timeout=0):
- """Assert that the contents of messages on queue (as retrieved
- using session and timeout) exactly match the strings in
- expect_contents"""
+ """Return a list with the contents of each message on queue."""
r = session.receiver("%s;{mode:browse}"%(queue))
try:
contents = []
diff --git a/cpp/src/tests/cluster_python_tests_failing.txt b/cpp/src/tests/cluster_python_tests_failing.txt
index 7ba8089946..f8639d7b59 100644
--- a/cpp/src/tests/cluster_python_tests_failing.txt
+++ b/cpp/src/tests/cluster_python_tests_failing.txt
@@ -1,32 +1,4 @@
qpid_tests.broker_0_10.management.ManagementTest.test_purge_queue
qpid_tests.broker_0_10.management.ManagementTest.test_connection_close
-qpid_tests.broker_0_10.dtx.DtxTests.test_bad_resume
-qpid_tests.broker_0_10.dtx.DtxTests.test_commit_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_end
-qpid_tests.broker_0_10.dtx.DtxTests.test_end_suspend_and_fail
-qpid_tests.broker_0_10.dtx.DtxTests.test_end_unknown_xid
-qpid_tests.broker_0_10.dtx.DtxTests.test_forget_xid_on_completion
-qpid_tests.broker_0_10.dtx.DtxTests.test_get_timeout
-qpid_tests.broker_0_10.dtx.DtxTests.test_get_timeout_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_implicit_end
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_not_ended
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_one_phase_false
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_one_phase_true
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_prepare_not_ended
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_rollback_not_ended
-qpid_tests.broker_0_10.dtx.DtxTests.test_prepare_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_recover
-qpid_tests.broker_0_10.dtx.DtxTests.test_rollback_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_select_required
-qpid_tests.broker_0_10.dtx.DtxTests.test_set_timeout
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_commit
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_prepare_commit
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_prepare_rollback
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_rollback
-qpid_tests.broker_0_10.dtx.DtxTests.test_start_already_known
-qpid_tests.broker_0_10.dtx.DtxTests.test_start_join
-qpid_tests.broker_0_10.dtx.DtxTests.test_start_join_and_resume
-qpid_tests.broker_0_10.dtx.DtxTests.test_suspend_resume
-qpid_tests.broker_0_10.dtx.DtxTests.test_suspend_start_end_resume
qpid_tests.broker_0_10.message.MessageTests.test_ttl
qpid_tests.broker_0_10.management.ManagementTest.test_broker_connectivity_oldAPI
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index 807e9508c3..e67a691283 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -713,6 +713,204 @@ acl allow all all
cluster.start()
fetch(cluster[2])
+# Some utility code for transaction tests
+XA_RBROLLBACK = 1
+XA_RBTIMEOUT = 2
+XA_OK = 0
+dtx_branch_counter = 0
+
+class DtxTestFixture:
+ """Bundle together some common requirements for dtx tests."""
+ def __init__(self, test, broker, name, exclusive=False):
+ self.test = test
+ self.broker = broker
+ self.name = name
+ # Use old API. DTX is not supported in messaging API.
+ self.connection = broker.connect_old()
+ self.session = self.connection.session(name, 1) # 1 second timeout
+ self.queue = self.session.queue_declare(name, exclusive=exclusive)
+ self.xid = self.session.xid(format=0, global_id=name)
+ self.session.dtx_select()
+ self.consumer = None
+
+ def start(self):
+ self.test.assertEqual(XA_OK, self.session.dtx_start(xid=self.xid).status)
+
+ def end(self):
+ self.test.assertEqual(XA_OK, self.session.dtx_end(xid=self.xid).status)
+
+ def prepare(self):
+ self.test.assertEqual(XA_OK, self.session.dtx_prepare(xid=self.xid).status)
+
+ def commit(self, one_phase=True):
+ self.test.assertEqual(
+ XA_OK, self.session.dtx_commit(xid=self.xid, one_phase=one_phase).status)
+
+ def rollback(self):
+ self.test.assertEqual(XA_OK, self.session.dtx_rollback(xid=self.xid).status)
+
+ def send(self, messages):
+ for m in messages:
+ dp=self.session.delivery_properties(routing_key=self.name)
+ mp=self.session.message_properties()
+ self.session.message_transfer(message=qpid.datatypes.Message(dp, mp, m))
+
+ def accept(self):
+ """Accept 1 message from queue"""
+ consumer_tag="%s-consumer"%(self.name)
+ self.session.message_subscribe(queue=self.name, destination=consumer_tag)
+ self.session.message_flow(unit = self.session.credit_unit.message, value = 1, destination = consumer_tag)
+ self.session.message_flow(unit = self.session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag)
+ msg = self.session.incoming(consumer_tag).get(timeout=1)
+ self.session.message_cancel(destination=consumer_tag)
+ self.session.message_accept(qpid.datatypes.RangedSet(msg.id))
+ return msg
+
+
+ def verify(self, cluster, messages):
+ for b in cluster:
+ self.test.assert_browse(b.connect().session(), self.name, messages)
+
+
+class DtxTests(BrokerTest):
+
+ def test_dtx_update(self):
+ """Verify that DTX transaction state is updated to a new broker.
+ Start a collection of transactions, then add a new cluster member,
+ then verify they commit/rollback correctly on the new broker."""
+
+ # Note: multiple test have been bundled into one to avoid the need to start/stop
+ # multiple brokers per test.
+
+ cluster=self.cluster(1)
+
+ # Transaction that will be open when new member joins, then committed.
+ t1 = DtxTestFixture(self, cluster[0], "t1")
+ t1.start()
+ t1.send(["1", "2"])
+ t1.verify(cluster, []) # Not visible outside of transaction
+
+ # Transaction that will be open when new member joins, then rolled back.
+ t2 = DtxTestFixture(self, cluster[0], "t2")
+ t2.start()
+ t2.send(["1", "2"])
+
+ # Transaction that will be prepared when new member joins, then committed.
+ t3 = DtxTestFixture(self, cluster[0], "t3")
+ t3.start()
+ t3.send(["1", "2"])
+ t3.end()
+ t3.prepare()
+ t1.verify(cluster, []) # Not visible outside of transaction
+
+ # Transaction that will be prepared when new member joins, then rolled back.
+ t4 = DtxTestFixture(self, cluster[0], "t4")
+ t4.start()
+ t4.send(["1", "2"])
+ t4.end()
+ t4.prepare()
+
+ # Transaction using an exclusive queue
+ t5 = DtxTestFixture(self, cluster[0], "t5", exclusive=True)
+ t5.start()
+ t5.send(["1", "2"])
+
+ # Accept messages in a transaction before/after join then commit
+ t6 = DtxTestFixture(self, cluster[0], "t6")
+ t6.send(["a","b","c"])
+ t6.start()
+ t6.verify(cluster, ["a","b","c"])
+ self.assertEqual(t6.accept().body, "a");
+ t6.verify(cluster, ["b","c"])
+
+ # Accept messages in a transaction before/after join then roll back
+ t7 = DtxTestFixture(self, cluster[0], "t7")
+ t7.send(["a","b","c"])
+ t7.start()
+ t7.verify(cluster, ["a","b","c"])
+ self.assertEqual(t7.accept().body, "a");
+ t7.verify(cluster, ["b","c"])
+
+ # Start new member
+ cluster.start()
+
+ # Commit t1
+ t1.send(["3","4"])
+ t1.verify(cluster, [])
+ t1.end()
+ t1.commit(one_phase=True)
+ t1.verify(cluster, ["1","2","3","4"])
+
+ # Rollback t2
+ t2.send(["3","4"])
+ t2.verify(cluster, [])
+ t2.end()
+ t2.rollback()
+ t2.verify(cluster, [])
+
+ # Commit t3
+ t3.verify(cluster, [])
+ t3.commit(one_phase=False)
+ t3.verify(cluster, ["1","2"])
+
+ # Rollback t4
+ t4.verify(cluster, [])
+ t4.rollback()
+ t4.verify(cluster, [])
+
+ # Commit t5
+ t5.send(["3","4"])
+ t5.verify(cluster, [])
+ t5.end()
+ t5.commit(one_phase=True)
+ t5.verify(cluster, ["1","2","3","4"])
+
+ # Commit t7
+ t6.verify(cluster, ["b", "c"])
+ self.assertEqual(t6.accept().body, "b");
+ t6.verify(cluster, ["c"])
+ t6.end()
+ t6.commit(one_phase=True)
+ t6.verify(cluster, ["c"])
+ t6.session.close() # Make sure they're not requeued by the session.
+ t6.verify(cluster, ["c"])
+
+ # Rollback t7
+ t7.verify(cluster, ["b", "c"])
+ self.assertEqual(t7.accept().body, "b");
+ t7.verify(cluster, ["c"])
+ t7.end()
+ t7.rollback()
+ t7.verify(cluster, ["a", "b", "c"])
+
+
+class TxTests(BrokerTest):
+
+ def test_tx_update(self):
+ """Verify that transaction state is updated to a new broker"""
+
+ def make_message(session, body=None, key=None, id=None):
+ dp=session.delivery_properties(routing_key=key)
+ mp=session.message_properties(correlation_id=id)
+ return qpid.datatypes.Message(dp, mp, body)
+
+ cluster=self.cluster(1)
+ # Use old API. TX is not supported in messaging API.
+ c = cluster[0].connect_old()
+ s = c.session("tx-session", 1)
+ s.queue_declare(queue="q")
+ # Start transaction
+ s.tx_select()
+ s.message_transfer(message=make_message(s, "1", "q"))
+ # Start new member mid-transaction
+ cluster.start()
+ # Do more work
+ s.message_transfer(message=make_message(s, "2", "q"))
+ # Commit the transaction and verify the results.
+ s.tx_commit()
+ for b in cluster: self.assert_browse(b.connect().session(), "q", ["1","2"])
+
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
@@ -1001,6 +1199,8 @@ class LongTests(BrokerTest):
logger = logging.getLogger()
log_level = logger.getEffectiveLevel()
logger.setLevel(logging.ERROR)
+ sender = None
+ receiver = None
try:
# Start sender and receiver threads
receiver = Receiver(cluster[0], "q;{create:always}")
@@ -1031,8 +1231,8 @@ class LongTests(BrokerTest):
finally:
# Detach to avoid slow reconnect attempts during shut-down if test fails.
- sender.connection.detach()
- receiver.connection.detach()
+ if sender: sender.connection.detach()
+ if receiver: receiver.connection.detach()
logger.setLevel(log_level)
class StoreTests(BrokerTest):