summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/ha_tests.py
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-07-18 18:18:42 +0000
committerAlan Conway <aconway@apache.org>2014-07-18 18:18:42 +0000
commitcc8050efba878ea8615afb18b065760105e25fe7 (patch)
treeabea502915c40258dac9d3f82f8ca1ac0e88374f /qpid/cpp/src/tests/ha_tests.py
parent0e259111b19f2972933b9fb80070b1c4872c450e (diff)
downloadqpid-python-cc8050efba878ea8615afb18b065760105e25fe7.tar.gz
QPID-5888: transaction should always be aborted on failover
C++ and python clients were attempting to continue the transation transparently after failover which is in correct. They were re-sending messages in the transaction but there is no way to re-do transactional receives. The transaction must be aborted. The C++ and python clients have been modified to kill a transactional session with a TransactionAborted exception if there is a failover. Note the Java client already behaves correctly but not identically. It defers raising an exception until commit rather than failing immediately on failover, and the session can still be used. The following commits are involved: r1611349 QPID-5887: revised approach to implict abort r1610959 QPID-5887: allow qpid-txtest2 to be run by make test r1610958 QPID-5887: fix to new txtest2, acknowledge messages in the check phase to ensure queues remain drained for any subsequent runs r1609748 QPID-5887: abort transactional session on failover; added equivalent of txtest using messaging API This commit does the following: - Update ha_tests.py tx_simpler_failover test to expect transaction aborted. - Minor improvements to qpid-txtest2 - Fix native (non-swig) python client. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1611748 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py33
1 files changed, 20 insertions, 13 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 1e870c55a8..87d5c2a72d 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1318,13 +1318,14 @@ def open_read(name):
class TransactionTests(HaBrokerTest):
- def tx_simple_setup(self, broker):
+ def tx_simple_setup(self, cluster, broker=0):
"""Start a transaction, remove messages from queue a, add messages to queue b"""
- c = broker.connect(protocol=self.tx_protocol)
+ c = cluster.connect(broker, protocol=self.tx_protocol)
# Send messages to a, no transaction.
sa = c.session().sender("a;{create:always,node:{durable:true}}")
tx_msgs = ["x","y","z"]
for m in tx_msgs: sa.send(qm.Message(content=m, durable=True))
+ sa.close()
# Receive messages from a, in transaction.
tx = c.session(transactional=True)
@@ -1339,6 +1340,7 @@ class TransactionTests(HaBrokerTest):
for tx_m,m in zip(tx_msgs2, msgs):
txs.send(tx_m);
sb.send(m)
+ sb.close()
return tx
def tx_subscriptions(self, broker):
@@ -1348,7 +1350,7 @@ class TransactionTests(HaBrokerTest):
def test_tx_simple_commit(self):
cluster = HaCluster(self, 2, test_store=True)
- tx = self.tx_simple_setup(cluster[0])
+ tx = self.tx_simple_setup(cluster)
tx.sync()
tx_queues = cluster[0].agent.tx_queues()
@@ -1394,7 +1396,7 @@ class TransactionTests(HaBrokerTest):
def test_tx_simple_rollback(self):
cluster = HaCluster(self, 2, test_store=True)
- tx = self.tx_simple_setup(cluster[0])
+ tx = self.tx_simple_setup(cluster)
tx.sync()
tx_queues = cluster[0].agent.tx_queues()
tx.acknowledge()
@@ -1415,22 +1417,27 @@ class TransactionTests(HaBrokerTest):
def test_tx_simple_failover(self):
cluster = HaCluster(self, 3, test_store=True)
- tx = self.tx_simple_setup(cluster[0])
+ tx = self.tx_simple_setup(cluster)
tx.sync()
tx_queues = cluster[0].agent.tx_queues()
tx.acknowledge()
- cluster.bounce(0) # Should cause roll-back
- cluster[0].wait_status("ready") # Restarted.
- cluster[1].wait_status("active") # Promoted.
- cluster[2].wait_status("ready") # Failed over.
- for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+ l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
+ try:
+ cluster.bounce(0) # Should cause roll-back
+ tx.connection.session() # Wait for reconnect
+ for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+ self.assertRaises(qm.TransactionAborted, tx.sync)
+ self.assertRaises(qm.TransactionAborted, tx.commit)
+ tx.connection.close()
+ for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+ finally: l.restore()
def test_tx_no_backups(self):
"""Test the special case of a TX where there are no backups"""
# Test commit
cluster = HaCluster(self, 1, test_store=True)
- tx = self.tx_simple_setup(cluster[0])
+ tx = self.tx_simple_setup(cluster)
tx.acknowledge()
tx.commit()
tx.sync()
@@ -1440,7 +1447,7 @@ class TransactionTests(HaBrokerTest):
# Test rollback
cluster = HaCluster(self, 1, test_store=True)
- tx = self.tx_simple_setup(cluster[0])
+ tx = self.tx_simple_setup(cluster)
tx.sync()
tx_queues = cluster[0].agent.tx_queues()
tx.acknowledge()
@@ -1498,7 +1505,7 @@ class TransactionTests(HaBrokerTest):
"""Verify that TXs blocked in commit don't deadlock."""
cluster = HaCluster(self, 2, args=["--worker-threads=2"], test_store=True)
n = 10 # Number of concurrent transactions
- sessions = [cluster[0].connect(protocol=self.tx_protocol).session(transactional=True) for i in xrange(n)]
+ sessions = [cluster.connect(0, protocol=self.tx_protocol).session(transactional=True) for i in xrange(n)]
# Have the store delay the response for 10s
for s in sessions:
sn = s.sender("qq;{create:always,node:{durable:true}}")