summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests')
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py4
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py66
-rw-r--r--qpid/cpp/src/tests/qpid-txtest2.cpp12
3 files changed, 69 insertions, 13 deletions
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index 9adad45ed4..1c131e7872 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -350,7 +350,9 @@ class HaCluster(object):
def connect(self, i, **kwargs):
"""Connect with reconnect_urls"""
- return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","), **kwargs)
+ c = self[i].connect(reconnect=True, reconnect_urls=self.url.split(","), **kwargs)
+ self.test.teardown_add(c) # Clean up
+ return c
def kill(self, i, promote_next=True, final=True):
"""Kill broker i, promote broker i+1"""
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index e1864725d2..1567bfd177 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1349,7 +1349,7 @@ class TransactionTests(HaBrokerTest):
if q.startswith("qpid.ha-tx")]
def test_tx_simple_commit(self):
- cluster = HaCluster(self, 2, test_store=True)
+ cluster = HaCluster(self, 2, test_store=True, wait=True)
tx = self.tx_simple_setup(cluster)
tx.sync()
tx_queues = cluster[0].agent.tx_queues()
@@ -1373,6 +1373,9 @@ class TransactionTests(HaBrokerTest):
self.assertEqual(['0', '1', '2', 'x', 'y', 'z'], [m.content for m in ri])
r.session.acknowledge()
for b in cluster: b.assert_browse_backup("b", [], msg=b)
+ c.close()
+ tx.connection.close()
+
def check_enq_deq(self, cluster, queue, expect):
for b in cluster:
@@ -1455,6 +1458,7 @@ class TransactionTests(HaBrokerTest):
tx.rollback()
tx.close() # For clean test.
for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+ tx.connection.close()
def assert_simple_rollback_outcome(self, b, tx_queues):
b.assert_browse_backup("a", ["x","y","z"], msg=b)
@@ -1467,7 +1471,27 @@ class TransactionTests(HaBrokerTest):
self.assertEqual(open_read(b.store_log), expect, msg=b)
self.assert_tx_clean(b)
+ def test_tx_simple_failure(self):
+ """Verify we throw TransactionAborted if there is a store error during a transaction"""
+ cluster = HaCluster(self, 3, test_store=True)
+ tx = self.tx_simple_setup(cluster)
+ tx.sync()
+ tx_queues = cluster[0].agent.tx_queues()
+ tx.acknowledge()
+ l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
+ try:
+ cluster.bounce(0) # Should cause roll-back
+ tx.connection.session() # Wait for reconnect
+ for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+ self.assertRaises(qm.TransactionAborted, tx.sync)
+ self.assertRaises(qm.TransactionAborted, tx.commit)
+ try: tx.connection.close()
+ except qm.TransactionAborted: pass # Occasionally get exception on close.
+ for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+ finally: l.restore()
+
def test_tx_simple_failover(self):
+ """Verify we throw TransactionAborted if there is a fail-over during a transaction"""
cluster = HaCluster(self, 3, test_store=True)
tx = self.tx_simple_setup(cluster)
tx.sync()
@@ -1485,6 +1509,35 @@ class TransactionTests(HaBrokerTest):
for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
finally: l.restore()
+ def test_tx_unknown_failover(self):
+ """Verify we throw TransactionUnknown if there is a failure during commit"""
+ cluster = HaCluster(self, 3, test_store=True)
+ tx = self.tx_simple_setup(cluster)
+ tx.sync()
+ tx_queues = cluster[0].agent.tx_queues()
+ tx.acknowledge()
+ l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
+ try:
+ os.kill(cluster[2].pid, signal.SIGSTOP) # Delay prepare response
+ class CommitThread(Thread):
+ def run(self):
+ try: tx.commit()
+ except Exception, e:
+ self.error = e
+ t = CommitThread()
+ t.start() # Commit in progress
+ t.join(timeout=0.01)
+ self.assertTrue(t.is_alive())
+ cluster.bounce(0)
+ os.kill(cluster[2].pid, signal.SIGCONT)
+ t.join()
+ try: raise t.error
+ except qm.TransactionUnknown: pass
+ for b in cluster: self.assert_tx_clean(b)
+ try: tx.connection.close()
+ except TransactionUnknown: pass # Occasionally get exception on close.
+ finally: l.restore()
+
def test_tx_no_backups(self):
"""Test the special case of a TX where there are no backups"""
@@ -1509,17 +1562,14 @@ class TransactionTests(HaBrokerTest):
tx.close()
self.assert_simple_rollback_outcome(cluster[0], tx_queues)
- def assert_commit_raises(self, tx):
- def commit_sync(): tx.commit(); tx.sync()
- self.assertRaises(Exception, commit_sync)
-
def test_tx_backup_fail(self):
cluster = HaCluster(self, 2, test_store=True, s_args=[[],["--test-store-name=bang"]])
c = cluster[0].connect(protocol=self.tx_protocol)
tx = c.session(transactional=True)
s = tx.sender("q;{create:always,node:{durable:true}}")
for m in ["foo","TEST_STORE_DO bang: throw","bar"]: s.send(qm.Message(m, durable=True))
- self.assert_commit_raises(tx)
+ def commit_sync(): tx.commit(); tx.sync()
+ self.assertRaises(qm.TransactionAborted, commit_sync)
for b in cluster: b.assert_browse_backup("q", [])
self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<enqueue q bar tx=1>\n<abort tx=1>\n")
self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<abort tx=1>\n")
@@ -1538,7 +1588,7 @@ class TransactionTests(HaBrokerTest):
cluster[1].kill(final=False)
s.send("b")
tx.commit()
- tx.close()
+ tx.connection.close()
for b in [cluster[0],cluster[2]]:
self.assert_tx_clean(b)
b.assert_browse_backup("q", ["a","b"], msg=b)
@@ -1548,7 +1598,7 @@ class TransactionTests(HaBrokerTest):
s.send("foo")
cluster.restart(1) # Not a part of the current transaction.
tx.commit()
- tx.close()
+ tx.connection.close()
for b in cluster: self.assert_tx_clean(b)
# The new member is not in the tx but receives the results normal replication.
for b in cluster: b.assert_browse_backup("q", ["a", "b", "foo"], msg=b)
diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp
index d64f13d9c5..6e2f81726e 100644
--- a/qpid/cpp/src/tests/qpid-txtest2.cpp
+++ b/qpid/cpp/src/tests/qpid-txtest2.cpp
@@ -66,7 +66,7 @@ struct Options : public qpid::Options {
Options() : help(false), init(true), transfer(true), check(true),
size(256), durable(true), queues(2),
- base("tx-test2"), msgsPerTx(1), txCount(5), totalMsgCount(10),
+ base("tx"), msgsPerTx(1), txCount(5), totalMsgCount(10),
capacity(1000), url("localhost"), port(0), quiet(false)
{
addOptions()
@@ -140,8 +140,10 @@ std::string generateData(uint size)
void generateSet(const std::string& base, uint count, StringSet& collection)
{
for (uint i = 0; i < count; i++) {
+ std::ostringstream digits;
+ digits << count;
std::ostringstream out;
- out << base << "-" << (i+1);
+ out << base << "-" << std::setw(digits.str().size()) << std::setfill('0') << (i+1);
collection.push_back(out.str());
}
}
@@ -193,6 +195,8 @@ struct Transfer : public TransactionalClient, public Runnable
Receiver receiver(session.createReceiver(source));
receiver.setCapacity(opts.capacity);
for (uint t = 0; t < opts.txCount;) {
+ std::ostringstream id;
+ id << source << ">" << target << ":" << t+1;
try {
for (uint m = 0; m < opts.msgsPerTx; m++) {
Message msg = receiver.fetch(Duration::SECOND*30);
@@ -205,9 +209,9 @@ struct Transfer : public TransactionalClient, public Runnable
}
session.commit();
t++;
- if (!opts.quiet) std::cout << "Transaction " << t << " of " << opts.txCount << " committed successfully" << std::endl;
+ if (!opts.quiet) std::cout << "Transaction " << id.str() << " of " << opts.txCount << " committed successfully" << std::endl;
} catch (const TransactionAborted&) {
- std::cout << "Transaction " << (t+1) << " of " << opts.txCount << " was aborted and will be retried" << std::endl;
+ std::cout << "Transaction " << id.str() << " of " << opts.txCount << " was aborted and will be retried" << std::endl;
session = connection.createTransactionalSession();
sender = session.createSender(target);
receiver = session.createReceiver(source);