summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp2
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py33
-rw-r--r--qpid/cpp/src/tests/qpid-txtest2.cpp22
3 files changed, 36 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index e5e696439b..767d857630 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -78,7 +78,7 @@ void SessionImpl::checkAborted()
void SessionImpl::checkAbortedLH(const qpid::sys::Mutex::ScopedLock&)
{
if (aborted) {
- throw TransactionAborted("Transaction implicitly aborted");
+ throw TransactionAborted("Transaction aborted due to transport failure");
}
}
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 1e870c55a8..87d5c2a72d 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1318,13 +1318,14 @@ def open_read(name):
class TransactionTests(HaBrokerTest):
- def tx_simple_setup(self, broker):
+ def tx_simple_setup(self, cluster, broker=0):
"""Start a transaction, remove messages from queue a, add messages to queue b"""
- c = broker.connect(protocol=self.tx_protocol)
+ c = cluster.connect(broker, protocol=self.tx_protocol)
# Send messages to a, no transaction.
sa = c.session().sender("a;{create:always,node:{durable:true}}")
tx_msgs = ["x","y","z"]
for m in tx_msgs: sa.send(qm.Message(content=m, durable=True))
+ sa.close()
# Receive messages from a, in transaction.
tx = c.session(transactional=True)
@@ -1339,6 +1340,7 @@ class TransactionTests(HaBrokerTest):
for tx_m,m in zip(tx_msgs2, msgs):
txs.send(tx_m);
sb.send(m)
+ sb.close()
return tx
def tx_subscriptions(self, broker):
@@ -1348,7 +1350,7 @@ class TransactionTests(HaBrokerTest):
def test_tx_simple_commit(self):
cluster = HaCluster(self, 2, test_store=True)
- tx = self.tx_simple_setup(cluster[0])
+ tx = self.tx_simple_setup(cluster)
tx.sync()
tx_queues = cluster[0].agent.tx_queues()
@@ -1394,7 +1396,7 @@ class TransactionTests(HaBrokerTest):
def test_tx_simple_rollback(self):
cluster = HaCluster(self, 2, test_store=True)
- tx = self.tx_simple_setup(cluster[0])
+ tx = self.tx_simple_setup(cluster)
tx.sync()
tx_queues = cluster[0].agent.tx_queues()
tx.acknowledge()
@@ -1415,22 +1417,27 @@ class TransactionTests(HaBrokerTest):
def test_tx_simple_failover(self):
cluster = HaCluster(self, 3, test_store=True)
- tx = self.tx_simple_setup(cluster[0])
+ tx = self.tx_simple_setup(cluster)
tx.sync()
tx_queues = cluster[0].agent.tx_queues()
tx.acknowledge()
- cluster.bounce(0) # Should cause roll-back
- cluster[0].wait_status("ready") # Restarted.
- cluster[1].wait_status("active") # Promoted.
- cluster[2].wait_status("ready") # Failed over.
- for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+ l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
+ try:
+ cluster.bounce(0) # Should cause roll-back
+ tx.connection.session() # Wait for reconnect
+ for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+ self.assertRaises(qm.TransactionAborted, tx.sync)
+ self.assertRaises(qm.TransactionAborted, tx.commit)
+ tx.connection.close()
+ for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+ finally: l.restore()
def test_tx_no_backups(self):
"""Test the special case of a TX where there are no backups"""
# Test commit
cluster = HaCluster(self, 1, test_store=True)
- tx = self.tx_simple_setup(cluster[0])
+ tx = self.tx_simple_setup(cluster)
tx.acknowledge()
tx.commit()
tx.sync()
@@ -1440,7 +1447,7 @@ class TransactionTests(HaBrokerTest):
# Test rollback
cluster = HaCluster(self, 1, test_store=True)
- tx = self.tx_simple_setup(cluster[0])
+ tx = self.tx_simple_setup(cluster)
tx.sync()
tx_queues = cluster[0].agent.tx_queues()
tx.acknowledge()
@@ -1498,7 +1505,7 @@ class TransactionTests(HaBrokerTest):
"""Verify that TXs blocked in commit don't deadlock."""
cluster = HaCluster(self, 2, args=["--worker-threads=2"], test_store=True)
n = 10 # Number of concurrent transactions
- sessions = [cluster[0].connect(protocol=self.tx_protocol).session(transactional=True) for i in xrange(n)]
+ sessions = [cluster.connect(0, protocol=self.tx_protocol).session(transactional=True) for i in xrange(n)]
# Have the store delay the response for 10s
for s in sessions:
sn = s.sender("qq;{create:always,node:{durable:true}}")
diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp
index cdd263a081..a744d07a12 100644
--- a/qpid/cpp/src/tests/qpid-txtest2.cpp
+++ b/qpid/cpp/src/tests/qpid-txtest2.cpp
@@ -88,6 +88,7 @@ struct Options : public qpid::Options {
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
+
bool parse(int argc, char** argv)
{
try {
@@ -109,9 +110,11 @@ struct Options : public qpid::Options {
std::cout << *this << std::endl << std::endl
<< "Transactionally moves messages between queues" << std::endl;
return false;
- } else {
- return true;
}
+ if (totalMsgCount < msgsPerTx) {
+ totalMsgCount = msgsPerTx; // Must have at least msgsPerTx total messages.
+ }
+ return true;
} catch (const std::exception& e) {
std::cerr << *this << std::endl << std::endl << e.what() << std::endl;
return false;
@@ -158,6 +161,7 @@ struct Client
virtual ~Client()
{
try {
+ session.sync();
session.close();
connection.close();
} catch(const std::exception& e) {
@@ -177,12 +181,14 @@ struct Transfer : public TransactionalClient, public Runnable
const std::string target;
const std::string source;
Thread thread;
+ bool failed;
- Transfer(const std::string& to, const std::string& from, const Options& opts) : TransactionalClient(opts), target(to), source(from) {}
+ Transfer(const std::string& to, const std::string& from, const Options& opts) : TransactionalClient(opts), target(to), source(from), failed(false) {}
void run()
{
try {
+
Sender sender(session.createSender(target));
Receiver receiver(session.createReceiver(source));
receiver.setCapacity(opts.capacity);
@@ -211,7 +217,8 @@ struct Transfer : public TransactionalClient, public Runnable
sender.close();
receiver.close();
} catch(const std::exception& e) {
- std::cout << "Transfer interrupted: " << e.what() << std::endl;
+ failed = true;
+ QPID_LOG(error, "Transfer " << source << " to " << target << " interrupted: " << e.what());
}
}
};
@@ -263,9 +270,11 @@ struct Controller : public Client
agents.back().thread = Thread(agents.back());
}
- for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++) {
+ for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++)
i->thread.join();
- }
+ for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++)
+ if (i->failed)
+ throw std::runtime_error("Transfer agents failed");
}
int check()
@@ -285,7 +294,6 @@ struct Controller : public Client
receiver.close();
if (!opts.quiet) std::cout << "Drained " << count << " messages from " << *i << std::endl;
}
-
sort(ids.begin(), ids.end());
sort(drained.begin(), drained.end());