diff options
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/libqpidmessaging-api-symbols.txt | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 51 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h | 9 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/exceptions.cpp | 1 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_test.py | 4 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 66 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/qpid-txtest2.cpp | 12 |
7 files changed, 101 insertions, 44 deletions
diff --git a/qpid/cpp/src/libqpidmessaging-api-symbols.txt b/qpid/cpp/src/libqpidmessaging-api-symbols.txt index 7af446dc4a..23634857c2 100644 --- a/qpid/cpp/src/libqpidmessaging-api-symbols.txt +++ b/qpid/cpp/src/libqpidmessaging-api-symbols.txt @@ -232,6 +232,8 @@ qpid::messaging::TransactionError::TransactionError(std::string const&) qpid::messaging::TransactionError::~TransactionError() qpid::messaging::TransactionAborted::TransactionAborted(std::string const&) qpid::messaging::TransactionAborted::~TransactionAborted() +qpid::messaging::TransactionUnknown::TransactionUnknown(std::string const&) +qpid::messaging::TransactionUnknown::~TransactionUnknown() qpid::messaging::UnauthorizedAccess::UnauthorizedAccess(std::string const&) qpid::messaging::UnauthorizedAccess::~UnauthorizedAccess() qpid::messaging::ConnectionError::ConnectionError(std::string const&) diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 576156db00..9299ed7cb1 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -43,7 +43,9 @@ using qpid::messaging::KeyError; using qpid::messaging::NoMessageAvailable; using qpid::messaging::MessagingException; +using qpid::messaging::TransactionError; using qpid::messaging::TransactionAborted; +using qpid::messaging::TransactionUnknown; using qpid::messaging::SessionError; using qpid::messaging::MessageImplAccess; using qpid::messaging::Sender; @@ -56,37 +58,18 @@ namespace amqp0_10 { typedef qpid::sys::Mutex::ScopedLock ScopedLock; typedef qpid::sys::Mutex::ScopedUnlock ScopedUnlock; -SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t), aborted(false) {} +SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : + connection(&c), transactional(t) {} bool SessionImpl::isTransactional() const { return transactional; } -void SessionImpl::abortTransaction() -{ - ScopedLock l(lock); - aborted = true; - checkAbortedLH(l); -} - -void SessionImpl::checkAborted() -{ - ScopedLock l(lock); - checkAbortedLH(l); -} - -void SessionImpl::checkAbortedLH(const qpid::sys::Mutex::ScopedLock&) -{ - if (aborted) { - throw TransactionAborted("Transaction aborted due to transport failure"); - } -} - void SessionImpl::checkError() { ScopedLock l(lock); - checkAbortedLH(l); + txError.raise(); qpid::client::SessionBase_0_10Access s(session); try { s.get()->assertOpen(); @@ -120,9 +103,19 @@ void SessionImpl::sync(bool block) void SessionImpl::commit() { - if (!execute<Commit>()) { - throw TransactionAborted("Transaction aborted due to transport failure"); + try { + checkError(); + committing = true; + execute<Commit>(); } + catch (const TransactionError&) { + assert(txError); // Must be set by thrower of TransactionError + } + catch (const std::exception& e) { + txError = new TransactionAborted(Msg() << "Transaction aborted: " << e.what()); + } + committing = false; + checkError(); } void SessionImpl::rollback() @@ -385,7 +378,7 @@ bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout) { while (true) { - checkAborted(); + txError.raise(); try { std::string destination; if (incoming.getNextDestination(destination, adjust(timeout))) { @@ -568,7 +561,13 @@ void SessionImpl::senderCancelled(const std::string& name) void SessionImpl::reconnect() { - if (transactional) abortTransaction(); + if (transactional) { + if (committing) + txError = new TransactionUnknown("Transaction outcome unknown: transport failure"); + else + txError = new TransactionAborted("Transaction aborted: transport failure"); + txError.raise(); + } connection->reopen(); } diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h index b2e4cf3f78..3a160b2b91 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -30,6 +30,7 @@ #include "qpid/client/amqp0_10/IncomingMessages.h" #include "qpid/sys/Mutex.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/sys/ExceptionHolder.h" #include <boost/intrusive_ptr.hpp> namespace qpid { @@ -97,7 +98,7 @@ class SessionImpl : public qpid::messaging::SessionImpl template <class T> bool execute(T& f) { try { - checkAborted(); + txError.raise(); f(); return true; } catch (const qpid::TransportFailure&) { @@ -131,16 +132,14 @@ class SessionImpl : public qpid::messaging::SessionImpl Receivers receivers; Senders senders; const bool transactional; - bool aborted; + bool committing; + sys::ExceptionHolder txError; bool accept(ReceiverImpl*, qpid::messaging::Message*, IncomingMessages::MessageTransfer&); bool getIncoming(IncomingMessages::Handler& handler, qpid::messaging::Duration timeout); bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer); void reconnect(); bool backoff(); - void abortTransaction(); - void checkAborted(); - void checkAbortedLH(const qpid::sys::Mutex::ScopedLock&); void commitImpl(); void rollbackImpl(); diff --git a/qpid/cpp/src/qpid/messaging/exceptions.cpp b/qpid/cpp/src/qpid/messaging/exceptions.cpp index d21477b494..419c508626 100644 --- a/qpid/cpp/src/qpid/messaging/exceptions.cpp +++ b/qpid/cpp/src/qpid/messaging/exceptions.cpp @@ -53,6 +53,7 @@ SessionClosed::SessionClosed() : SessionError("Session Closed") {} TransactionError::TransactionError(const std::string& msg) : SessionError(msg) {} TransactionAborted::TransactionAborted(const std::string& msg) : TransactionError(msg) {} +TransactionUnknown::TransactionUnknown(const std::string& msg) : TransactionError(msg) {} UnauthorizedAccess::UnauthorizedAccess(const std::string& msg) : SessionError(msg) {} ConnectionError::ConnectionError(const std::string& msg) : MessagingException(msg) {} diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index 9adad45ed4..1c131e7872 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -350,7 +350,9 @@ class HaCluster(object): def connect(self, i, **kwargs): """Connect with reconnect_urls""" - return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","), **kwargs) + c = self[i].connect(reconnect=True, reconnect_urls=self.url.split(","), **kwargs) + self.test.teardown_add(c) # Clean up + return c def kill(self, i, promote_next=True, final=True): """Kill broker i, promote broker i+1""" diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index e1864725d2..1567bfd177 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -1349,7 +1349,7 @@ class TransactionTests(HaBrokerTest): if q.startswith("qpid.ha-tx")] def test_tx_simple_commit(self): - cluster = HaCluster(self, 2, test_store=True) + cluster = HaCluster(self, 2, test_store=True, wait=True) tx = self.tx_simple_setup(cluster) tx.sync() tx_queues = cluster[0].agent.tx_queues() @@ -1373,6 +1373,9 @@ class TransactionTests(HaBrokerTest): self.assertEqual(['0', '1', '2', 'x', 'y', 'z'], [m.content for m in ri]) r.session.acknowledge() for b in cluster: b.assert_browse_backup("b", [], msg=b) + c.close() + tx.connection.close() + def check_enq_deq(self, cluster, queue, expect): for b in cluster: @@ -1455,6 +1458,7 @@ class TransactionTests(HaBrokerTest): tx.rollback() tx.close() # For clean test. for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + tx.connection.close() def assert_simple_rollback_outcome(self, b, tx_queues): b.assert_browse_backup("a", ["x","y","z"], msg=b) @@ -1467,7 +1471,27 @@ class TransactionTests(HaBrokerTest): self.assertEqual(open_read(b.store_log), expect, msg=b) self.assert_tx_clean(b) + def test_tx_simple_failure(self): + """Verify we throw TransactionAborted if there is a store error during a transaction""" + cluster = HaCluster(self, 3, test_store=True) + tx = self.tx_simple_setup(cluster) + tx.sync() + tx_queues = cluster[0].agent.tx_queues() + tx.acknowledge() + l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. + try: + cluster.bounce(0) # Should cause roll-back + tx.connection.session() # Wait for reconnect + for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + self.assertRaises(qm.TransactionAborted, tx.sync) + self.assertRaises(qm.TransactionAborted, tx.commit) + try: tx.connection.close() + except qm.TransactionAborted: pass # Occasionally get exception on close. + for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + finally: l.restore() + def test_tx_simple_failover(self): + """Verify we throw TransactionAborted if there is a fail-over during a transaction""" cluster = HaCluster(self, 3, test_store=True) tx = self.tx_simple_setup(cluster) tx.sync() @@ -1485,6 +1509,35 @@ class TransactionTests(HaBrokerTest): for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) finally: l.restore() + def test_tx_unknown_failover(self): + """Verify we throw TransactionUnknown if there is a failure during commit""" + cluster = HaCluster(self, 3, test_store=True) + tx = self.tx_simple_setup(cluster) + tx.sync() + tx_queues = cluster[0].agent.tx_queues() + tx.acknowledge() + l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. + try: + os.kill(cluster[2].pid, signal.SIGSTOP) # Delay prepare response + class CommitThread(Thread): + def run(self): + try: tx.commit() + except Exception, e: + self.error = e + t = CommitThread() + t.start() # Commit in progress + t.join(timeout=0.01) + self.assertTrue(t.is_alive()) + cluster.bounce(0) + os.kill(cluster[2].pid, signal.SIGCONT) + t.join() + try: raise t.error + except qm.TransactionUnknown: pass + for b in cluster: self.assert_tx_clean(b) + try: tx.connection.close() + except TransactionUnknown: pass # Occasionally get exception on close. + finally: l.restore() + def test_tx_no_backups(self): """Test the special case of a TX where there are no backups""" @@ -1509,17 +1562,14 @@ class TransactionTests(HaBrokerTest): tx.close() self.assert_simple_rollback_outcome(cluster[0], tx_queues) - def assert_commit_raises(self, tx): - def commit_sync(): tx.commit(); tx.sync() - self.assertRaises(Exception, commit_sync) - def test_tx_backup_fail(self): cluster = HaCluster(self, 2, test_store=True, s_args=[[],["--test-store-name=bang"]]) c = cluster[0].connect(protocol=self.tx_protocol) tx = c.session(transactional=True) s = tx.sender("q;{create:always,node:{durable:true}}") for m in ["foo","TEST_STORE_DO bang: throw","bar"]: s.send(qm.Message(m, durable=True)) - self.assert_commit_raises(tx) + def commit_sync(): tx.commit(); tx.sync() + self.assertRaises(qm.TransactionAborted, commit_sync) for b in cluster: b.assert_browse_backup("q", []) self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<enqueue q bar tx=1>\n<abort tx=1>\n") self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<abort tx=1>\n") @@ -1538,7 +1588,7 @@ class TransactionTests(HaBrokerTest): cluster[1].kill(final=False) s.send("b") tx.commit() - tx.close() + tx.connection.close() for b in [cluster[0],cluster[2]]: self.assert_tx_clean(b) b.assert_browse_backup("q", ["a","b"], msg=b) @@ -1548,7 +1598,7 @@ class TransactionTests(HaBrokerTest): s.send("foo") cluster.restart(1) # Not a part of the current transaction. tx.commit() - tx.close() + tx.connection.close() for b in cluster: self.assert_tx_clean(b) # The new member is not in the tx but receives the results normal replication. for b in cluster: b.assert_browse_backup("q", ["a", "b", "foo"], msg=b) diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp index d64f13d9c5..6e2f81726e 100644 --- a/qpid/cpp/src/tests/qpid-txtest2.cpp +++ b/qpid/cpp/src/tests/qpid-txtest2.cpp @@ -66,7 +66,7 @@ struct Options : public qpid::Options { Options() : help(false), init(true), transfer(true), check(true), size(256), durable(true), queues(2), - base("tx-test2"), msgsPerTx(1), txCount(5), totalMsgCount(10), + base("tx"), msgsPerTx(1), txCount(5), totalMsgCount(10), capacity(1000), url("localhost"), port(0), quiet(false) { addOptions() @@ -140,8 +140,10 @@ std::string generateData(uint size) void generateSet(const std::string& base, uint count, StringSet& collection) { for (uint i = 0; i < count; i++) { + std::ostringstream digits; + digits << count; std::ostringstream out; - out << base << "-" << (i+1); + out << base << "-" << std::setw(digits.str().size()) << std::setfill('0') << (i+1); collection.push_back(out.str()); } } @@ -193,6 +195,8 @@ struct Transfer : public TransactionalClient, public Runnable Receiver receiver(session.createReceiver(source)); receiver.setCapacity(opts.capacity); for (uint t = 0; t < opts.txCount;) { + std::ostringstream id; + id << source << ">" << target << ":" << t+1; try { for (uint m = 0; m < opts.msgsPerTx; m++) { Message msg = receiver.fetch(Duration::SECOND*30); @@ -205,9 +209,9 @@ struct Transfer : public TransactionalClient, public Runnable } session.commit(); t++; - if (!opts.quiet) std::cout << "Transaction " << t << " of " << opts.txCount << " committed successfully" << std::endl; + if (!opts.quiet) std::cout << "Transaction " << id.str() << " of " << opts.txCount << " committed successfully" << std::endl; } catch (const TransactionAborted&) { - std::cout << "Transaction " << (t+1) << " of " << opts.txCount << " was aborted and will be retried" << std::endl; + std::cout << "Transaction " << id.str() << " of " << opts.txCount << " was aborted and will be retried" << std::endl; session = connection.createTransactionalSession(); sender = session.createSender(target); receiver = session.createReceiver(source); |
