summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/libqpidmessaging-api-symbols.txt2
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp51
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h9
-rw-r--r--qpid/cpp/src/qpid/messaging/exceptions.cpp1
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py4
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py66
-rw-r--r--qpid/cpp/src/tests/qpid-txtest2.cpp12
7 files changed, 101 insertions, 44 deletions
diff --git a/qpid/cpp/src/libqpidmessaging-api-symbols.txt b/qpid/cpp/src/libqpidmessaging-api-symbols.txt
index 7af446dc4a..23634857c2 100644
--- a/qpid/cpp/src/libqpidmessaging-api-symbols.txt
+++ b/qpid/cpp/src/libqpidmessaging-api-symbols.txt
@@ -232,6 +232,8 @@ qpid::messaging::TransactionError::TransactionError(std::string const&)
qpid::messaging::TransactionError::~TransactionError()
qpid::messaging::TransactionAborted::TransactionAborted(std::string const&)
qpid::messaging::TransactionAborted::~TransactionAborted()
+qpid::messaging::TransactionUnknown::TransactionUnknown(std::string const&)
+qpid::messaging::TransactionUnknown::~TransactionUnknown()
qpid::messaging::UnauthorizedAccess::UnauthorizedAccess(std::string const&)
qpid::messaging::UnauthorizedAccess::~UnauthorizedAccess()
qpid::messaging::ConnectionError::ConnectionError(std::string const&)
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 576156db00..9299ed7cb1 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -43,7 +43,9 @@
using qpid::messaging::KeyError;
using qpid::messaging::NoMessageAvailable;
using qpid::messaging::MessagingException;
+using qpid::messaging::TransactionError;
using qpid::messaging::TransactionAborted;
+using qpid::messaging::TransactionUnknown;
using qpid::messaging::SessionError;
using qpid::messaging::MessageImplAccess;
using qpid::messaging::Sender;
@@ -56,37 +58,18 @@ namespace amqp0_10 {
typedef qpid::sys::Mutex::ScopedLock ScopedLock;
typedef qpid::sys::Mutex::ScopedUnlock ScopedUnlock;
-SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t), aborted(false) {}
+SessionImpl::SessionImpl(ConnectionImpl& c, bool t) :
+ connection(&c), transactional(t) {}
bool SessionImpl::isTransactional() const
{
return transactional;
}
-void SessionImpl::abortTransaction()
-{
- ScopedLock l(lock);
- aborted = true;
- checkAbortedLH(l);
-}
-
-void SessionImpl::checkAborted()
-{
- ScopedLock l(lock);
- checkAbortedLH(l);
-}
-
-void SessionImpl::checkAbortedLH(const qpid::sys::Mutex::ScopedLock&)
-{
- if (aborted) {
- throw TransactionAborted("Transaction aborted due to transport failure");
- }
-}
-
void SessionImpl::checkError()
{
ScopedLock l(lock);
- checkAbortedLH(l);
+ txError.raise();
qpid::client::SessionBase_0_10Access s(session);
try {
s.get()->assertOpen();
@@ -120,9 +103,19 @@ void SessionImpl::sync(bool block)
void SessionImpl::commit()
{
- if (!execute<Commit>()) {
- throw TransactionAborted("Transaction aborted due to transport failure");
+ try {
+ checkError();
+ committing = true;
+ execute<Commit>();
}
+ catch (const TransactionError&) {
+ assert(txError); // Must be set by thrower of TransactionError
+ }
+ catch (const std::exception& e) {
+ txError = new TransactionAborted(Msg() << "Transaction aborted: " << e.what());
+ }
+ committing = false;
+ checkError();
}
void SessionImpl::rollback()
@@ -385,7 +378,7 @@ bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message,
bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout)
{
while (true) {
- checkAborted();
+ txError.raise();
try {
std::string destination;
if (incoming.getNextDestination(destination, adjust(timeout))) {
@@ -568,7 +561,13 @@ void SessionImpl::senderCancelled(const std::string& name)
void SessionImpl::reconnect()
{
- if (transactional) abortTransaction();
+ if (transactional) {
+ if (committing)
+ txError = new TransactionUnknown("Transaction outcome unknown: transport failure");
+ else
+ txError = new TransactionAborted("Transaction aborted: transport failure");
+ txError.raise();
+ }
connection->reopen();
}
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
index b2e4cf3f78..3a160b2b91 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -30,6 +30,7 @@
#include "qpid/client/amqp0_10/IncomingMessages.h"
#include "qpid/sys/Mutex.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/sys/ExceptionHolder.h"
#include <boost/intrusive_ptr.hpp>
namespace qpid {
@@ -97,7 +98,7 @@ class SessionImpl : public qpid::messaging::SessionImpl
template <class T> bool execute(T& f)
{
try {
- checkAborted();
+ txError.raise();
f();
return true;
} catch (const qpid::TransportFailure&) {
@@ -131,16 +132,14 @@ class SessionImpl : public qpid::messaging::SessionImpl
Receivers receivers;
Senders senders;
const bool transactional;
- bool aborted;
+ bool committing;
+ sys::ExceptionHolder txError;
bool accept(ReceiverImpl*, qpid::messaging::Message*, IncomingMessages::MessageTransfer&);
bool getIncoming(IncomingMessages::Handler& handler, qpid::messaging::Duration timeout);
bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer);
void reconnect();
bool backoff();
- void abortTransaction();
- void checkAborted();
- void checkAbortedLH(const qpid::sys::Mutex::ScopedLock&);
void commitImpl();
void rollbackImpl();
diff --git a/qpid/cpp/src/qpid/messaging/exceptions.cpp b/qpid/cpp/src/qpid/messaging/exceptions.cpp
index d21477b494..419c508626 100644
--- a/qpid/cpp/src/qpid/messaging/exceptions.cpp
+++ b/qpid/cpp/src/qpid/messaging/exceptions.cpp
@@ -53,6 +53,7 @@ SessionClosed::SessionClosed() : SessionError("Session Closed") {}
TransactionError::TransactionError(const std::string& msg) : SessionError(msg) {}
TransactionAborted::TransactionAborted(const std::string& msg) : TransactionError(msg) {}
+TransactionUnknown::TransactionUnknown(const std::string& msg) : TransactionError(msg) {}
UnauthorizedAccess::UnauthorizedAccess(const std::string& msg) : SessionError(msg) {}
ConnectionError::ConnectionError(const std::string& msg) : MessagingException(msg) {}
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index 9adad45ed4..1c131e7872 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -350,7 +350,9 @@ class HaCluster(object):
def connect(self, i, **kwargs):
"""Connect with reconnect_urls"""
- return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","), **kwargs)
+ c = self[i].connect(reconnect=True, reconnect_urls=self.url.split(","), **kwargs)
+ self.test.teardown_add(c) # Clean up
+ return c
def kill(self, i, promote_next=True, final=True):
"""Kill broker i, promote broker i+1"""
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index e1864725d2..1567bfd177 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1349,7 +1349,7 @@ class TransactionTests(HaBrokerTest):
if q.startswith("qpid.ha-tx")]
def test_tx_simple_commit(self):
- cluster = HaCluster(self, 2, test_store=True)
+ cluster = HaCluster(self, 2, test_store=True, wait=True)
tx = self.tx_simple_setup(cluster)
tx.sync()
tx_queues = cluster[0].agent.tx_queues()
@@ -1373,6 +1373,9 @@ class TransactionTests(HaBrokerTest):
self.assertEqual(['0', '1', '2', 'x', 'y', 'z'], [m.content for m in ri])
r.session.acknowledge()
for b in cluster: b.assert_browse_backup("b", [], msg=b)
+ c.close()
+ tx.connection.close()
+
def check_enq_deq(self, cluster, queue, expect):
for b in cluster:
@@ -1455,6 +1458,7 @@ class TransactionTests(HaBrokerTest):
tx.rollback()
tx.close() # For clean test.
for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+ tx.connection.close()
def assert_simple_rollback_outcome(self, b, tx_queues):
b.assert_browse_backup("a", ["x","y","z"], msg=b)
@@ -1467,7 +1471,27 @@ class TransactionTests(HaBrokerTest):
self.assertEqual(open_read(b.store_log), expect, msg=b)
self.assert_tx_clean(b)
+ def test_tx_simple_failure(self):
+ """Verify we throw TransactionAborted if there is a store error during a transaction"""
+ cluster = HaCluster(self, 3, test_store=True)
+ tx = self.tx_simple_setup(cluster)
+ tx.sync()
+ tx_queues = cluster[0].agent.tx_queues()
+ tx.acknowledge()
+ l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
+ try:
+ cluster.bounce(0) # Should cause roll-back
+ tx.connection.session() # Wait for reconnect
+ for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+ self.assertRaises(qm.TransactionAborted, tx.sync)
+ self.assertRaises(qm.TransactionAborted, tx.commit)
+ try: tx.connection.close()
+ except qm.TransactionAborted: pass # Occasionally get exception on close.
+ for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+ finally: l.restore()
+
def test_tx_simple_failover(self):
+ """Verify we throw TransactionAborted if there is a fail-over during a transaction"""
cluster = HaCluster(self, 3, test_store=True)
tx = self.tx_simple_setup(cluster)
tx.sync()
@@ -1485,6 +1509,35 @@ class TransactionTests(HaBrokerTest):
for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
finally: l.restore()
+ def test_tx_unknown_failover(self):
+ """Verify we throw TransactionUnknown if there is a failure during commit"""
+ cluster = HaCluster(self, 3, test_store=True)
+ tx = self.tx_simple_setup(cluster)
+ tx.sync()
+ tx_queues = cluster[0].agent.tx_queues()
+ tx.acknowledge()
+ l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
+ try:
+ os.kill(cluster[2].pid, signal.SIGSTOP) # Delay prepare response
+ class CommitThread(Thread):
+ def run(self):
+ try: tx.commit()
+ except Exception, e:
+ self.error = e
+ t = CommitThread()
+ t.start() # Commit in progress
+ t.join(timeout=0.01)
+ self.assertTrue(t.is_alive())
+ cluster.bounce(0)
+ os.kill(cluster[2].pid, signal.SIGCONT)
+ t.join()
+ try: raise t.error
+ except qm.TransactionUnknown: pass
+ for b in cluster: self.assert_tx_clean(b)
+ try: tx.connection.close()
+ except TransactionUnknown: pass # Occasionally get exception on close.
+ finally: l.restore()
+
def test_tx_no_backups(self):
"""Test the special case of a TX where there are no backups"""
@@ -1509,17 +1562,14 @@ class TransactionTests(HaBrokerTest):
tx.close()
self.assert_simple_rollback_outcome(cluster[0], tx_queues)
- def assert_commit_raises(self, tx):
- def commit_sync(): tx.commit(); tx.sync()
- self.assertRaises(Exception, commit_sync)
-
def test_tx_backup_fail(self):
cluster = HaCluster(self, 2, test_store=True, s_args=[[],["--test-store-name=bang"]])
c = cluster[0].connect(protocol=self.tx_protocol)
tx = c.session(transactional=True)
s = tx.sender("q;{create:always,node:{durable:true}}")
for m in ["foo","TEST_STORE_DO bang: throw","bar"]: s.send(qm.Message(m, durable=True))
- self.assert_commit_raises(tx)
+ def commit_sync(): tx.commit(); tx.sync()
+ self.assertRaises(qm.TransactionAborted, commit_sync)
for b in cluster: b.assert_browse_backup("q", [])
self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<enqueue q bar tx=1>\n<abort tx=1>\n")
self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<abort tx=1>\n")
@@ -1538,7 +1588,7 @@ class TransactionTests(HaBrokerTest):
cluster[1].kill(final=False)
s.send("b")
tx.commit()
- tx.close()
+ tx.connection.close()
for b in [cluster[0],cluster[2]]:
self.assert_tx_clean(b)
b.assert_browse_backup("q", ["a","b"], msg=b)
@@ -1548,7 +1598,7 @@ class TransactionTests(HaBrokerTest):
s.send("foo")
cluster.restart(1) # Not a part of the current transaction.
tx.commit()
- tx.close()
+ tx.connection.close()
for b in cluster: self.assert_tx_clean(b)
# The new member is not in the tx but receives the results normal replication.
for b in cluster: b.assert_browse_backup("q", ["a", "b", "foo"], msg=b)
diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp
index d64f13d9c5..6e2f81726e 100644
--- a/qpid/cpp/src/tests/qpid-txtest2.cpp
+++ b/qpid/cpp/src/tests/qpid-txtest2.cpp
@@ -66,7 +66,7 @@ struct Options : public qpid::Options {
Options() : help(false), init(true), transfer(true), check(true),
size(256), durable(true), queues(2),
- base("tx-test2"), msgsPerTx(1), txCount(5), totalMsgCount(10),
+ base("tx"), msgsPerTx(1), txCount(5), totalMsgCount(10),
capacity(1000), url("localhost"), port(0), quiet(false)
{
addOptions()
@@ -140,8 +140,10 @@ std::string generateData(uint size)
void generateSet(const std::string& base, uint count, StringSet& collection)
{
for (uint i = 0; i < count; i++) {
+ std::ostringstream digits;
+ digits << count;
std::ostringstream out;
- out << base << "-" << (i+1);
+ out << base << "-" << std::setw(digits.str().size()) << std::setfill('0') << (i+1);
collection.push_back(out.str());
}
}
@@ -193,6 +195,8 @@ struct Transfer : public TransactionalClient, public Runnable
Receiver receiver(session.createReceiver(source));
receiver.setCapacity(opts.capacity);
for (uint t = 0; t < opts.txCount;) {
+ std::ostringstream id;
+ id << source << ">" << target << ":" << t+1;
try {
for (uint m = 0; m < opts.msgsPerTx; m++) {
Message msg = receiver.fetch(Duration::SECOND*30);
@@ -205,9 +209,9 @@ struct Transfer : public TransactionalClient, public Runnable
}
session.commit();
t++;
- if (!opts.quiet) std::cout << "Transaction " << t << " of " << opts.txCount << " committed successfully" << std::endl;
+ if (!opts.quiet) std::cout << "Transaction " << id.str() << " of " << opts.txCount << " committed successfully" << std::endl;
} catch (const TransactionAborted&) {
- std::cout << "Transaction " << (t+1) << " of " << opts.txCount << " was aborted and will be retried" << std::endl;
+ std::cout << "Transaction " << id.str() << " of " << opts.txCount << " was aborted and will be retried" << std::endl;
session = connection.createTransactionalSession();
sender = session.createSender(target);
receiver = session.createReceiver(source);