diff options
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 2 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 33 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/qpid-txtest2.cpp | 22 |
3 files changed, 36 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index e5e696439b..767d857630 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -78,7 +78,7 @@ void SessionImpl::checkAborted() void SessionImpl::checkAbortedLH(const qpid::sys::Mutex::ScopedLock&) { if (aborted) { - throw TransactionAborted("Transaction implicitly aborted"); + throw TransactionAborted("Transaction aborted due to transport failure"); } } diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 1e870c55a8..87d5c2a72d 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -1318,13 +1318,14 @@ def open_read(name): class TransactionTests(HaBrokerTest): - def tx_simple_setup(self, broker): + def tx_simple_setup(self, cluster, broker=0): """Start a transaction, remove messages from queue a, add messages to queue b""" - c = broker.connect(protocol=self.tx_protocol) + c = cluster.connect(broker, protocol=self.tx_protocol) # Send messages to a, no transaction. sa = c.session().sender("a;{create:always,node:{durable:true}}") tx_msgs = ["x","y","z"] for m in tx_msgs: sa.send(qm.Message(content=m, durable=True)) + sa.close() # Receive messages from a, in transaction. tx = c.session(transactional=True) @@ -1339,6 +1340,7 @@ class TransactionTests(HaBrokerTest): for tx_m,m in zip(tx_msgs2, msgs): txs.send(tx_m); sb.send(m) + sb.close() return tx def tx_subscriptions(self, broker): @@ -1348,7 +1350,7 @@ class TransactionTests(HaBrokerTest): def test_tx_simple_commit(self): cluster = HaCluster(self, 2, test_store=True) - tx = self.tx_simple_setup(cluster[0]) + tx = self.tx_simple_setup(cluster) tx.sync() tx_queues = cluster[0].agent.tx_queues() @@ -1394,7 +1396,7 @@ class TransactionTests(HaBrokerTest): def test_tx_simple_rollback(self): cluster = HaCluster(self, 2, test_store=True) - tx = self.tx_simple_setup(cluster[0]) + tx = self.tx_simple_setup(cluster) tx.sync() tx_queues = cluster[0].agent.tx_queues() tx.acknowledge() @@ -1415,22 +1417,27 @@ class TransactionTests(HaBrokerTest): def test_tx_simple_failover(self): cluster = HaCluster(self, 3, test_store=True) - tx = self.tx_simple_setup(cluster[0]) + tx = self.tx_simple_setup(cluster) tx.sync() tx_queues = cluster[0].agent.tx_queues() tx.acknowledge() - cluster.bounce(0) # Should cause roll-back - cluster[0].wait_status("ready") # Restarted. - cluster[1].wait_status("active") # Promoted. - cluster[2].wait_status("ready") # Failed over. - for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. + try: + cluster.bounce(0) # Should cause roll-back + tx.connection.session() # Wait for reconnect + for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + self.assertRaises(qm.TransactionAborted, tx.sync) + self.assertRaises(qm.TransactionAborted, tx.commit) + tx.connection.close() + for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + finally: l.restore() def test_tx_no_backups(self): """Test the special case of a TX where there are no backups""" # Test commit cluster = HaCluster(self, 1, test_store=True) - tx = self.tx_simple_setup(cluster[0]) + tx = self.tx_simple_setup(cluster) tx.acknowledge() tx.commit() tx.sync() @@ -1440,7 +1447,7 @@ class TransactionTests(HaBrokerTest): # Test rollback cluster = HaCluster(self, 1, test_store=True) - tx = self.tx_simple_setup(cluster[0]) + tx = self.tx_simple_setup(cluster) tx.sync() tx_queues = cluster[0].agent.tx_queues() tx.acknowledge() @@ -1498,7 +1505,7 @@ class TransactionTests(HaBrokerTest): """Verify that TXs blocked in commit don't deadlock.""" cluster = HaCluster(self, 2, args=["--worker-threads=2"], test_store=True) n = 10 # Number of concurrent transactions - sessions = [cluster[0].connect(protocol=self.tx_protocol).session(transactional=True) for i in xrange(n)] + sessions = [cluster.connect(0, protocol=self.tx_protocol).session(transactional=True) for i in xrange(n)] # Have the store delay the response for 10s for s in sessions: sn = s.sender("qq;{create:always,node:{durable:true}}") diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp index cdd263a081..a744d07a12 100644 --- a/qpid/cpp/src/tests/qpid-txtest2.cpp +++ b/qpid/cpp/src/tests/qpid-txtest2.cpp @@ -88,6 +88,7 @@ struct Options : public qpid::Options { ("help", qpid::optValue(help), "print this usage statement"); add(log); } + bool parse(int argc, char** argv) { try { @@ -109,9 +110,11 @@ struct Options : public qpid::Options { std::cout << *this << std::endl << std::endl << "Transactionally moves messages between queues" << std::endl; return false; - } else { - return true; } + if (totalMsgCount < msgsPerTx) { + totalMsgCount = msgsPerTx; // Must have at least msgsPerTx total messages. + } + return true; } catch (const std::exception& e) { std::cerr << *this << std::endl << std::endl << e.what() << std::endl; return false; @@ -158,6 +161,7 @@ struct Client virtual ~Client() { try { + session.sync(); session.close(); connection.close(); } catch(const std::exception& e) { @@ -177,12 +181,14 @@ struct Transfer : public TransactionalClient, public Runnable const std::string target; const std::string source; Thread thread; + bool failed; - Transfer(const std::string& to, const std::string& from, const Options& opts) : TransactionalClient(opts), target(to), source(from) {} + Transfer(const std::string& to, const std::string& from, const Options& opts) : TransactionalClient(opts), target(to), source(from), failed(false) {} void run() { try { + Sender sender(session.createSender(target)); Receiver receiver(session.createReceiver(source)); receiver.setCapacity(opts.capacity); @@ -211,7 +217,8 @@ struct Transfer : public TransactionalClient, public Runnable sender.close(); receiver.close(); } catch(const std::exception& e) { - std::cout << "Transfer interrupted: " << e.what() << std::endl; + failed = true; + QPID_LOG(error, "Transfer " << source << " to " << target << " interrupted: " << e.what()); } } }; @@ -263,9 +270,11 @@ struct Controller : public Client agents.back().thread = Thread(agents.back()); } - for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++) { + for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++) i->thread.join(); - } + for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++) + if (i->failed) + throw std::runtime_error("Transfer agents failed"); } int check() @@ -285,7 +294,6 @@ struct Controller : public Client receiver.close(); if (!opts.quiet) std::cout << "Drained " << count << " messages from " << *i << std::endl; } - sort(ids.begin(), ids.end()); sort(drained.begin(), drained.end()); |
