diff options
| author | Alan Conway <aconway@apache.org> | 2014-07-18 18:18:42 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2014-07-18 18:18:42 +0000 |
| commit | cc8050efba878ea8615afb18b065760105e25fe7 (patch) | |
| tree | abea502915c40258dac9d3f82f8ca1ac0e88374f | |
| parent | 0e259111b19f2972933b9fb80070b1c4872c450e (diff) | |
| download | qpid-python-cc8050efba878ea8615afb18b065760105e25fe7.tar.gz | |
QPID-5888: transaction should always be aborted on failover
C++ and python clients were attempting to continue the transation transparently
after failover which is in correct. They were re-sending messages in the
transaction but there is no way to re-do transactional receives. The transaction
must be aborted.
The C++ and python clients have been modified to kill a transactional session
with a TransactionAborted exception if there is a failover.
Note the Java client already behaves correctly but not identically.
It defers raising an exception until commit rather than failing
immediately on failover, and the session can still be used.
The following commits are involved:
r1611349 QPID-5887: revised approach to implict abort
r1610959 QPID-5887: allow qpid-txtest2 to be run by make test
r1610958 QPID-5887: fix to new txtest2, acknowledge messages in the check phase to ensure queues remain drained for any subsequent runs
r1609748 QPID-5887: abort transactional session on failover; added equivalent of txtest using messaging API
This commit does the following:
- Update ha_tests.py tx_simpler_failover test to expect transaction aborted.
- Minor improvements to qpid-txtest2
- Fix native (non-swig) python client.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1611748 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 2 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 33 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/qpid-txtest2.cpp | 22 | ||||
| -rw-r--r-- | qpid/python/qpid/messaging/driver.py | 6 |
4 files changed, 42 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index e5e696439b..767d857630 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -78,7 +78,7 @@ void SessionImpl::checkAborted() void SessionImpl::checkAbortedLH(const qpid::sys::Mutex::ScopedLock&) { if (aborted) { - throw TransactionAborted("Transaction implicitly aborted"); + throw TransactionAborted("Transaction aborted due to transport failure"); } } diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 1e870c55a8..87d5c2a72d 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -1318,13 +1318,14 @@ def open_read(name): class TransactionTests(HaBrokerTest): - def tx_simple_setup(self, broker): + def tx_simple_setup(self, cluster, broker=0): """Start a transaction, remove messages from queue a, add messages to queue b""" - c = broker.connect(protocol=self.tx_protocol) + c = cluster.connect(broker, protocol=self.tx_protocol) # Send messages to a, no transaction. sa = c.session().sender("a;{create:always,node:{durable:true}}") tx_msgs = ["x","y","z"] for m in tx_msgs: sa.send(qm.Message(content=m, durable=True)) + sa.close() # Receive messages from a, in transaction. tx = c.session(transactional=True) @@ -1339,6 +1340,7 @@ class TransactionTests(HaBrokerTest): for tx_m,m in zip(tx_msgs2, msgs): txs.send(tx_m); sb.send(m) + sb.close() return tx def tx_subscriptions(self, broker): @@ -1348,7 +1350,7 @@ class TransactionTests(HaBrokerTest): def test_tx_simple_commit(self): cluster = HaCluster(self, 2, test_store=True) - tx = self.tx_simple_setup(cluster[0]) + tx = self.tx_simple_setup(cluster) tx.sync() tx_queues = cluster[0].agent.tx_queues() @@ -1394,7 +1396,7 @@ class TransactionTests(HaBrokerTest): def test_tx_simple_rollback(self): cluster = HaCluster(self, 2, test_store=True) - tx = self.tx_simple_setup(cluster[0]) + tx = self.tx_simple_setup(cluster) tx.sync() tx_queues = cluster[0].agent.tx_queues() tx.acknowledge() @@ -1415,22 +1417,27 @@ class TransactionTests(HaBrokerTest): def test_tx_simple_failover(self): cluster = HaCluster(self, 3, test_store=True) - tx = self.tx_simple_setup(cluster[0]) + tx = self.tx_simple_setup(cluster) tx.sync() tx_queues = cluster[0].agent.tx_queues() tx.acknowledge() - cluster.bounce(0) # Should cause roll-back - cluster[0].wait_status("ready") # Restarted. - cluster[1].wait_status("active") # Promoted. - cluster[2].wait_status("ready") # Failed over. - for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. + try: + cluster.bounce(0) # Should cause roll-back + tx.connection.session() # Wait for reconnect + for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + self.assertRaises(qm.TransactionAborted, tx.sync) + self.assertRaises(qm.TransactionAborted, tx.commit) + tx.connection.close() + for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + finally: l.restore() def test_tx_no_backups(self): """Test the special case of a TX where there are no backups""" # Test commit cluster = HaCluster(self, 1, test_store=True) - tx = self.tx_simple_setup(cluster[0]) + tx = self.tx_simple_setup(cluster) tx.acknowledge() tx.commit() tx.sync() @@ -1440,7 +1447,7 @@ class TransactionTests(HaBrokerTest): # Test rollback cluster = HaCluster(self, 1, test_store=True) - tx = self.tx_simple_setup(cluster[0]) + tx = self.tx_simple_setup(cluster) tx.sync() tx_queues = cluster[0].agent.tx_queues() tx.acknowledge() @@ -1498,7 +1505,7 @@ class TransactionTests(HaBrokerTest): """Verify that TXs blocked in commit don't deadlock.""" cluster = HaCluster(self, 2, args=["--worker-threads=2"], test_store=True) n = 10 # Number of concurrent transactions - sessions = [cluster[0].connect(protocol=self.tx_protocol).session(transactional=True) for i in xrange(n)] + sessions = [cluster.connect(0, protocol=self.tx_protocol).session(transactional=True) for i in xrange(n)] # Have the store delay the response for 10s for s in sessions: sn = s.sender("qq;{create:always,node:{durable:true}}") diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp index cdd263a081..a744d07a12 100644 --- a/qpid/cpp/src/tests/qpid-txtest2.cpp +++ b/qpid/cpp/src/tests/qpid-txtest2.cpp @@ -88,6 +88,7 @@ struct Options : public qpid::Options { ("help", qpid::optValue(help), "print this usage statement"); add(log); } + bool parse(int argc, char** argv) { try { @@ -109,9 +110,11 @@ struct Options : public qpid::Options { std::cout << *this << std::endl << std::endl << "Transactionally moves messages between queues" << std::endl; return false; - } else { - return true; } + if (totalMsgCount < msgsPerTx) { + totalMsgCount = msgsPerTx; // Must have at least msgsPerTx total messages. + } + return true; } catch (const std::exception& e) { std::cerr << *this << std::endl << std::endl << e.what() << std::endl; return false; @@ -158,6 +161,7 @@ struct Client virtual ~Client() { try { + session.sync(); session.close(); connection.close(); } catch(const std::exception& e) { @@ -177,12 +181,14 @@ struct Transfer : public TransactionalClient, public Runnable const std::string target; const std::string source; Thread thread; + bool failed; - Transfer(const std::string& to, const std::string& from, const Options& opts) : TransactionalClient(opts), target(to), source(from) {} + Transfer(const std::string& to, const std::string& from, const Options& opts) : TransactionalClient(opts), target(to), source(from), failed(false) {} void run() { try { + Sender sender(session.createSender(target)); Receiver receiver(session.createReceiver(source)); receiver.setCapacity(opts.capacity); @@ -211,7 +217,8 @@ struct Transfer : public TransactionalClient, public Runnable sender.close(); receiver.close(); } catch(const std::exception& e) { - std::cout << "Transfer interrupted: " << e.what() << std::endl; + failed = true; + QPID_LOG(error, "Transfer " << source << " to " << target << " interrupted: " << e.what()); } } }; @@ -263,9 +270,11 @@ struct Controller : public Client agents.back().thread = Thread(agents.back()); } - for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++) { + for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++) i->thread.join(); - } + for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++) + if (i->failed) + throw std::runtime_error("Transfer agents failed"); } int check() @@ -285,7 +294,6 @@ struct Controller : public Client receiver.close(); if (!opts.quiet) std::cout << "Drained " << count << " messages from " << *i << std::endl; } - sort(ids.begin(), ids.end()); sort(drained.begin(), drained.end()); diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py index 6f1d0fba7d..6110befc69 100644 --- a/qpid/python/qpid/messaging/driver.py +++ b/qpid/python/qpid/messaging/driver.py @@ -676,6 +676,12 @@ class Engine: def close(self, e=None): self._reset() + # We cannot re-establish transactional sessions, they must be aborted. + # We could re-do transactional enqueues, but not dequeues. + for ssn in self.connection.sessions.values(): + if ssn.transactional: + ssn.error = TransactionAborted("Transaction aborted due to transport failure") + ssn.closed = True if e: self.connection.error = e self._status = CLOSED |
