diff options
Diffstat (limited to 'qpid/cpp/src/tests')
| -rwxr-xr-x | qpid/cpp/src/tests/ha_test.py | 4 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 66 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/qpid-txtest2.cpp | 12 |
3 files changed, 69 insertions, 13 deletions
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index 9adad45ed4..1c131e7872 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -350,7 +350,9 @@ class HaCluster(object): def connect(self, i, **kwargs): """Connect with reconnect_urls""" - return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","), **kwargs) + c = self[i].connect(reconnect=True, reconnect_urls=self.url.split(","), **kwargs) + self.test.teardown_add(c) # Clean up + return c def kill(self, i, promote_next=True, final=True): """Kill broker i, promote broker i+1""" diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index e1864725d2..1567bfd177 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -1349,7 +1349,7 @@ class TransactionTests(HaBrokerTest): if q.startswith("qpid.ha-tx")] def test_tx_simple_commit(self): - cluster = HaCluster(self, 2, test_store=True) + cluster = HaCluster(self, 2, test_store=True, wait=True) tx = self.tx_simple_setup(cluster) tx.sync() tx_queues = cluster[0].agent.tx_queues() @@ -1373,6 +1373,9 @@ class TransactionTests(HaBrokerTest): self.assertEqual(['0', '1', '2', 'x', 'y', 'z'], [m.content for m in ri]) r.session.acknowledge() for b in cluster: b.assert_browse_backup("b", [], msg=b) + c.close() + tx.connection.close() + def check_enq_deq(self, cluster, queue, expect): for b in cluster: @@ -1455,6 +1458,7 @@ class TransactionTests(HaBrokerTest): tx.rollback() tx.close() # For clean test. for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + tx.connection.close() def assert_simple_rollback_outcome(self, b, tx_queues): b.assert_browse_backup("a", ["x","y","z"], msg=b) @@ -1467,7 +1471,27 @@ class TransactionTests(HaBrokerTest): self.assertEqual(open_read(b.store_log), expect, msg=b) self.assert_tx_clean(b) + def test_tx_simple_failure(self): + """Verify we throw TransactionAborted if there is a store error during a transaction""" + cluster = HaCluster(self, 3, test_store=True) + tx = self.tx_simple_setup(cluster) + tx.sync() + tx_queues = cluster[0].agent.tx_queues() + tx.acknowledge() + l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. + try: + cluster.bounce(0) # Should cause roll-back + tx.connection.session() # Wait for reconnect + for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + self.assertRaises(qm.TransactionAborted, tx.sync) + self.assertRaises(qm.TransactionAborted, tx.commit) + try: tx.connection.close() + except qm.TransactionAborted: pass # Occasionally get exception on close. + for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + finally: l.restore() + def test_tx_simple_failover(self): + """Verify we throw TransactionAborted if there is a fail-over during a transaction""" cluster = HaCluster(self, 3, test_store=True) tx = self.tx_simple_setup(cluster) tx.sync() @@ -1485,6 +1509,35 @@ class TransactionTests(HaBrokerTest): for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) finally: l.restore() + def test_tx_unknown_failover(self): + """Verify we throw TransactionUnknown if there is a failure during commit""" + cluster = HaCluster(self, 3, test_store=True) + tx = self.tx_simple_setup(cluster) + tx.sync() + tx_queues = cluster[0].agent.tx_queues() + tx.acknowledge() + l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. + try: + os.kill(cluster[2].pid, signal.SIGSTOP) # Delay prepare response + class CommitThread(Thread): + def run(self): + try: tx.commit() + except Exception, e: + self.error = e + t = CommitThread() + t.start() # Commit in progress + t.join(timeout=0.01) + self.assertTrue(t.is_alive()) + cluster.bounce(0) + os.kill(cluster[2].pid, signal.SIGCONT) + t.join() + try: raise t.error + except qm.TransactionUnknown: pass + for b in cluster: self.assert_tx_clean(b) + try: tx.connection.close() + except TransactionUnknown: pass # Occasionally get exception on close. + finally: l.restore() + def test_tx_no_backups(self): """Test the special case of a TX where there are no backups""" @@ -1509,17 +1562,14 @@ class TransactionTests(HaBrokerTest): tx.close() self.assert_simple_rollback_outcome(cluster[0], tx_queues) - def assert_commit_raises(self, tx): - def commit_sync(): tx.commit(); tx.sync() - self.assertRaises(Exception, commit_sync) - def test_tx_backup_fail(self): cluster = HaCluster(self, 2, test_store=True, s_args=[[],["--test-store-name=bang"]]) c = cluster[0].connect(protocol=self.tx_protocol) tx = c.session(transactional=True) s = tx.sender("q;{create:always,node:{durable:true}}") for m in ["foo","TEST_STORE_DO bang: throw","bar"]: s.send(qm.Message(m, durable=True)) - self.assert_commit_raises(tx) + def commit_sync(): tx.commit(); tx.sync() + self.assertRaises(qm.TransactionAborted, commit_sync) for b in cluster: b.assert_browse_backup("q", []) self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<enqueue q bar tx=1>\n<abort tx=1>\n") self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<abort tx=1>\n") @@ -1538,7 +1588,7 @@ class TransactionTests(HaBrokerTest): cluster[1].kill(final=False) s.send("b") tx.commit() - tx.close() + tx.connection.close() for b in [cluster[0],cluster[2]]: self.assert_tx_clean(b) b.assert_browse_backup("q", ["a","b"], msg=b) @@ -1548,7 +1598,7 @@ class TransactionTests(HaBrokerTest): s.send("foo") cluster.restart(1) # Not a part of the current transaction. tx.commit() - tx.close() + tx.connection.close() for b in cluster: self.assert_tx_clean(b) # The new member is not in the tx but receives the results normal replication. for b in cluster: b.assert_browse_backup("q", ["a", "b", "foo"], msg=b) diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp index d64f13d9c5..6e2f81726e 100644 --- a/qpid/cpp/src/tests/qpid-txtest2.cpp +++ b/qpid/cpp/src/tests/qpid-txtest2.cpp @@ -66,7 +66,7 @@ struct Options : public qpid::Options { Options() : help(false), init(true), transfer(true), check(true), size(256), durable(true), queues(2), - base("tx-test2"), msgsPerTx(1), txCount(5), totalMsgCount(10), + base("tx"), msgsPerTx(1), txCount(5), totalMsgCount(10), capacity(1000), url("localhost"), port(0), quiet(false) { addOptions() @@ -140,8 +140,10 @@ std::string generateData(uint size) void generateSet(const std::string& base, uint count, StringSet& collection) { for (uint i = 0; i < count; i++) { + std::ostringstream digits; + digits << count; std::ostringstream out; - out << base << "-" << (i+1); + out << base << "-" << std::setw(digits.str().size()) << std::setfill('0') << (i+1); collection.push_back(out.str()); } } @@ -193,6 +195,8 @@ struct Transfer : public TransactionalClient, public Runnable Receiver receiver(session.createReceiver(source)); receiver.setCapacity(opts.capacity); for (uint t = 0; t < opts.txCount;) { + std::ostringstream id; + id << source << ">" << target << ":" << t+1; try { for (uint m = 0; m < opts.msgsPerTx; m++) { Message msg = receiver.fetch(Duration::SECOND*30); @@ -205,9 +209,9 @@ struct Transfer : public TransactionalClient, public Runnable } session.commit(); t++; - if (!opts.quiet) std::cout << "Transaction " << t << " of " << opts.txCount << " committed successfully" << std::endl; + if (!opts.quiet) std::cout << "Transaction " << id.str() << " of " << opts.txCount << " committed successfully" << std::endl; } catch (const TransactionAborted&) { - std::cout << "Transaction " << (t+1) << " of " << opts.txCount << " was aborted and will be retried" << std::endl; + std::cout << "Transaction " << id.str() << " of " << opts.txCount << " was aborted and will be retried" << std::endl; session = connection.createTransactionalSession(); sender = session.createSender(target); receiver = session.createReceiver(source); |
