summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-07-18 18:18:42 +0000
committerAlan Conway <aconway@apache.org>2014-07-18 18:18:42 +0000
commitcc8050efba878ea8615afb18b065760105e25fe7 (patch)
treeabea502915c40258dac9d3f82f8ca1ac0e88374f
parent0e259111b19f2972933b9fb80070b1c4872c450e (diff)
downloadqpid-python-cc8050efba878ea8615afb18b065760105e25fe7.tar.gz
QPID-5888: transaction should always be aborted on failover
C++ and python clients were attempting to continue the transation transparently after failover which is in correct. They were re-sending messages in the transaction but there is no way to re-do transactional receives. The transaction must be aborted. The C++ and python clients have been modified to kill a transactional session with a TransactionAborted exception if there is a failover. Note the Java client already behaves correctly but not identically. It defers raising an exception until commit rather than failing immediately on failover, and the session can still be used. The following commits are involved: r1611349 QPID-5887: revised approach to implict abort r1610959 QPID-5887: allow qpid-txtest2 to be run by make test r1610958 QPID-5887: fix to new txtest2, acknowledge messages in the check phase to ensure queues remain drained for any subsequent runs r1609748 QPID-5887: abort transactional session on failover; added equivalent of txtest using messaging API This commit does the following: - Update ha_tests.py tx_simpler_failover test to expect transaction aborted. - Minor improvements to qpid-txtest2 - Fix native (non-swig) python client. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1611748 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp2
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py33
-rw-r--r--qpid/cpp/src/tests/qpid-txtest2.cpp22
-rw-r--r--qpid/python/qpid/messaging/driver.py6
4 files changed, 42 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index e5e696439b..767d857630 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -78,7 +78,7 @@ void SessionImpl::checkAborted()
void SessionImpl::checkAbortedLH(const qpid::sys::Mutex::ScopedLock&)
{
if (aborted) {
- throw TransactionAborted("Transaction implicitly aborted");
+ throw TransactionAborted("Transaction aborted due to transport failure");
}
}
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 1e870c55a8..87d5c2a72d 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1318,13 +1318,14 @@ def open_read(name):
class TransactionTests(HaBrokerTest):
- def tx_simple_setup(self, broker):
+ def tx_simple_setup(self, cluster, broker=0):
"""Start a transaction, remove messages from queue a, add messages to queue b"""
- c = broker.connect(protocol=self.tx_protocol)
+ c = cluster.connect(broker, protocol=self.tx_protocol)
# Send messages to a, no transaction.
sa = c.session().sender("a;{create:always,node:{durable:true}}")
tx_msgs = ["x","y","z"]
for m in tx_msgs: sa.send(qm.Message(content=m, durable=True))
+ sa.close()
# Receive messages from a, in transaction.
tx = c.session(transactional=True)
@@ -1339,6 +1340,7 @@ class TransactionTests(HaBrokerTest):
for tx_m,m in zip(tx_msgs2, msgs):
txs.send(tx_m);
sb.send(m)
+ sb.close()
return tx
def tx_subscriptions(self, broker):
@@ -1348,7 +1350,7 @@ class TransactionTests(HaBrokerTest):
def test_tx_simple_commit(self):
cluster = HaCluster(self, 2, test_store=True)
- tx = self.tx_simple_setup(cluster[0])
+ tx = self.tx_simple_setup(cluster)
tx.sync()
tx_queues = cluster[0].agent.tx_queues()
@@ -1394,7 +1396,7 @@ class TransactionTests(HaBrokerTest):
def test_tx_simple_rollback(self):
cluster = HaCluster(self, 2, test_store=True)
- tx = self.tx_simple_setup(cluster[0])
+ tx = self.tx_simple_setup(cluster)
tx.sync()
tx_queues = cluster[0].agent.tx_queues()
tx.acknowledge()
@@ -1415,22 +1417,27 @@ class TransactionTests(HaBrokerTest):
def test_tx_simple_failover(self):
cluster = HaCluster(self, 3, test_store=True)
- tx = self.tx_simple_setup(cluster[0])
+ tx = self.tx_simple_setup(cluster)
tx.sync()
tx_queues = cluster[0].agent.tx_queues()
tx.acknowledge()
- cluster.bounce(0) # Should cause roll-back
- cluster[0].wait_status("ready") # Restarted.
- cluster[1].wait_status("active") # Promoted.
- cluster[2].wait_status("ready") # Failed over.
- for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+ l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
+ try:
+ cluster.bounce(0) # Should cause roll-back
+ tx.connection.session() # Wait for reconnect
+ for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+ self.assertRaises(qm.TransactionAborted, tx.sync)
+ self.assertRaises(qm.TransactionAborted, tx.commit)
+ tx.connection.close()
+ for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+ finally: l.restore()
def test_tx_no_backups(self):
"""Test the special case of a TX where there are no backups"""
# Test commit
cluster = HaCluster(self, 1, test_store=True)
- tx = self.tx_simple_setup(cluster[0])
+ tx = self.tx_simple_setup(cluster)
tx.acknowledge()
tx.commit()
tx.sync()
@@ -1440,7 +1447,7 @@ class TransactionTests(HaBrokerTest):
# Test rollback
cluster = HaCluster(self, 1, test_store=True)
- tx = self.tx_simple_setup(cluster[0])
+ tx = self.tx_simple_setup(cluster)
tx.sync()
tx_queues = cluster[0].agent.tx_queues()
tx.acknowledge()
@@ -1498,7 +1505,7 @@ class TransactionTests(HaBrokerTest):
"""Verify that TXs blocked in commit don't deadlock."""
cluster = HaCluster(self, 2, args=["--worker-threads=2"], test_store=True)
n = 10 # Number of concurrent transactions
- sessions = [cluster[0].connect(protocol=self.tx_protocol).session(transactional=True) for i in xrange(n)]
+ sessions = [cluster.connect(0, protocol=self.tx_protocol).session(transactional=True) for i in xrange(n)]
# Have the store delay the response for 10s
for s in sessions:
sn = s.sender("qq;{create:always,node:{durable:true}}")
diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp
index cdd263a081..a744d07a12 100644
--- a/qpid/cpp/src/tests/qpid-txtest2.cpp
+++ b/qpid/cpp/src/tests/qpid-txtest2.cpp
@@ -88,6 +88,7 @@ struct Options : public qpid::Options {
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
+
bool parse(int argc, char** argv)
{
try {
@@ -109,9 +110,11 @@ struct Options : public qpid::Options {
std::cout << *this << std::endl << std::endl
<< "Transactionally moves messages between queues" << std::endl;
return false;
- } else {
- return true;
}
+ if (totalMsgCount < msgsPerTx) {
+ totalMsgCount = msgsPerTx; // Must have at least msgsPerTx total messages.
+ }
+ return true;
} catch (const std::exception& e) {
std::cerr << *this << std::endl << std::endl << e.what() << std::endl;
return false;
@@ -158,6 +161,7 @@ struct Client
virtual ~Client()
{
try {
+ session.sync();
session.close();
connection.close();
} catch(const std::exception& e) {
@@ -177,12 +181,14 @@ struct Transfer : public TransactionalClient, public Runnable
const std::string target;
const std::string source;
Thread thread;
+ bool failed;
- Transfer(const std::string& to, const std::string& from, const Options& opts) : TransactionalClient(opts), target(to), source(from) {}
+ Transfer(const std::string& to, const std::string& from, const Options& opts) : TransactionalClient(opts), target(to), source(from), failed(false) {}
void run()
{
try {
+
Sender sender(session.createSender(target));
Receiver receiver(session.createReceiver(source));
receiver.setCapacity(opts.capacity);
@@ -211,7 +217,8 @@ struct Transfer : public TransactionalClient, public Runnable
sender.close();
receiver.close();
} catch(const std::exception& e) {
- std::cout << "Transfer interrupted: " << e.what() << std::endl;
+ failed = true;
+ QPID_LOG(error, "Transfer " << source << " to " << target << " interrupted: " << e.what());
}
}
};
@@ -263,9 +270,11 @@ struct Controller : public Client
agents.back().thread = Thread(agents.back());
}
- for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++) {
+ for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++)
i->thread.join();
- }
+ for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++)
+ if (i->failed)
+ throw std::runtime_error("Transfer agents failed");
}
int check()
@@ -285,7 +294,6 @@ struct Controller : public Client
receiver.close();
if (!opts.quiet) std::cout << "Drained " << count << " messages from " << *i << std::endl;
}
-
sort(ids.begin(), ids.end());
sort(drained.begin(), drained.end());
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py
index 6f1d0fba7d..6110befc69 100644
--- a/qpid/python/qpid/messaging/driver.py
+++ b/qpid/python/qpid/messaging/driver.py
@@ -676,6 +676,12 @@ class Engine:
def close(self, e=None):
self._reset()
+ # We cannot re-establish transactional sessions, they must be aborted.
+ # We could re-do transactional enqueues, but not dequeues.
+ for ssn in self.connection.sessions.values():
+ if ssn.transactional:
+ ssn.error = TransactionAborted("Transaction aborted due to transport failure")
+ ssn.closed = True
if e:
self.connection.error = e
self._status = CLOSED