diff options
| author | Alan Conway <aconway@apache.org> | 2014-08-28 21:47:44 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2014-08-28 21:47:44 +0000 |
| commit | 9ae659723b21d9d1c547cf85bf9aba0019b081d5 (patch) | |
| tree | 85b5f3312fae5e481c921d92012536fce9fa5fff /qpid/cpp/src | |
| parent | 74fabf0e6bbbb07db6d9ac54af5738a60124b68d (diff) | |
| download | qpid-python-9ae659723b21d9d1c547cf85bf9aba0019b081d5.tar.gz | |
QPID-5975: HA extra/missing messages when running qpid-txtest2 in a loop with failover.
This is partly not-a-bug, there is a client error handling issue that has been
corrected.
qpid-txtest2 initializes a queue with messages at the start and drains the
queues at the end. These operations are *not transactional*. Therefore
duplicates are expected if there is a failover during initialization or
draining. When duplicates were observed, there was indeed a failover at one of
these times.
Making these operations transactional is not enough to pass, now we see the test
fail with "no messages to fetch". This is explained as follows:
If there is a failover during a transaction, TransactionAborted is raised. The
client assumes the transaction was rolled back and re-plays it. However, if the
failover occurs at a critical point *after* the client has sent commit
but *before* it has received a response, then the the client *does not know*
whether the transaction was committed or rolled-back on the new primary.
Re-playing in this case can duplicate the transaction. Each transaction moves
messages from one queue to another so as long as transactions are atomic the
total number of messages will not change. However, if transactions are
duplicated, a transactional session may try to move more messages than exist on
the queue, hence "no messages to fetch". For example if thread 1 moves N
messages from q1 to q2, and thread 2 tries to move N+M messages back, then
thread 2 will fail.
This problem has been corrected as follows: C++ and python clients now raise the
following exceptions:
- TransactionAborted: The transaction has definitely been rolled back due to a
connection failure before commit or a broker error (e.g. a store error) during commit.
It can safely be replayed.
- TransactionUnknown: The transaction outcome is unknown because the connection
failed at the critical time. There's no simple automatic way to know what
happened without examining the state of the broker queues.
Unfortunately With this fix qpid-txtest2 is no longer useful test for TX
failover because it regularly raises TransactionUnknown and there's not much we
can do with that.
A better test of TX atomicity with failover is to run a pair of
qpid-send/qpid-receive with fail-over and verify that the number of
enqueues/dequeues and message depth are a multiple of the transaction size. See
the JIRA for such a test. (Note these test also sometimes raise
TransactionUnknown but it doesn't matter since all we are checking is that
messages go on and off the queues in multiple of the TX size.) )
Note: the original bug also reported seeing missing messages from
qpid-txtest2. I don't have a good explanation for that but since the
qpid-send/receive test shows that transactions are atomic I am going to let that
go for now.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1621211 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/libqpidmessaging-api-symbols.txt | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 51 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h | 9 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/exceptions.cpp | 1 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_test.py | 4 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 66 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/qpid-txtest2.cpp | 12 |
7 files changed, 101 insertions, 44 deletions
diff --git a/qpid/cpp/src/libqpidmessaging-api-symbols.txt b/qpid/cpp/src/libqpidmessaging-api-symbols.txt index 7af446dc4a..23634857c2 100644 --- a/qpid/cpp/src/libqpidmessaging-api-symbols.txt +++ b/qpid/cpp/src/libqpidmessaging-api-symbols.txt @@ -232,6 +232,8 @@ qpid::messaging::TransactionError::TransactionError(std::string const&) qpid::messaging::TransactionError::~TransactionError() qpid::messaging::TransactionAborted::TransactionAborted(std::string const&) qpid::messaging::TransactionAborted::~TransactionAborted() +qpid::messaging::TransactionUnknown::TransactionUnknown(std::string const&) +qpid::messaging::TransactionUnknown::~TransactionUnknown() qpid::messaging::UnauthorizedAccess::UnauthorizedAccess(std::string const&) qpid::messaging::UnauthorizedAccess::~UnauthorizedAccess() qpid::messaging::ConnectionError::ConnectionError(std::string const&) diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 576156db00..9299ed7cb1 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -43,7 +43,9 @@ using qpid::messaging::KeyError; using qpid::messaging::NoMessageAvailable; using qpid::messaging::MessagingException; +using qpid::messaging::TransactionError; using qpid::messaging::TransactionAborted; +using qpid::messaging::TransactionUnknown; using qpid::messaging::SessionError; using qpid::messaging::MessageImplAccess; using qpid::messaging::Sender; @@ -56,37 +58,18 @@ namespace amqp0_10 { typedef qpid::sys::Mutex::ScopedLock ScopedLock; typedef qpid::sys::Mutex::ScopedUnlock ScopedUnlock; -SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t), aborted(false) {} +SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : + connection(&c), transactional(t) {} bool SessionImpl::isTransactional() const { return transactional; } -void SessionImpl::abortTransaction() -{ - ScopedLock l(lock); - aborted = true; - checkAbortedLH(l); -} - -void SessionImpl::checkAborted() -{ - ScopedLock l(lock); - checkAbortedLH(l); -} - -void SessionImpl::checkAbortedLH(const qpid::sys::Mutex::ScopedLock&) -{ - if (aborted) { - throw TransactionAborted("Transaction aborted due to transport failure"); - } -} - void SessionImpl::checkError() { ScopedLock l(lock); - checkAbortedLH(l); + txError.raise(); qpid::client::SessionBase_0_10Access s(session); try { s.get()->assertOpen(); @@ -120,9 +103,19 @@ void SessionImpl::sync(bool block) void SessionImpl::commit() { - if (!execute<Commit>()) { - throw TransactionAborted("Transaction aborted due to transport failure"); + try { + checkError(); + committing = true; + execute<Commit>(); } + catch (const TransactionError&) { + assert(txError); // Must be set by thrower of TransactionError + } + catch (const std::exception& e) { + txError = new TransactionAborted(Msg() << "Transaction aborted: " << e.what()); + } + committing = false; + checkError(); } void SessionImpl::rollback() @@ -385,7 +378,7 @@ bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout) { while (true) { - checkAborted(); + txError.raise(); try { std::string destination; if (incoming.getNextDestination(destination, adjust(timeout))) { @@ -568,7 +561,13 @@ void SessionImpl::senderCancelled(const std::string& name) void SessionImpl::reconnect() { - if (transactional) abortTransaction(); + if (transactional) { + if (committing) + txError = new TransactionUnknown("Transaction outcome unknown: transport failure"); + else + txError = new TransactionAborted("Transaction aborted: transport failure"); + txError.raise(); + } connection->reopen(); } diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h index b2e4cf3f78..3a160b2b91 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -30,6 +30,7 @@ #include "qpid/client/amqp0_10/IncomingMessages.h" #include "qpid/sys/Mutex.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/sys/ExceptionHolder.h" #include <boost/intrusive_ptr.hpp> namespace qpid { @@ -97,7 +98,7 @@ class SessionImpl : public qpid::messaging::SessionImpl template <class T> bool execute(T& f) { try { - checkAborted(); + txError.raise(); f(); return true; } catch (const qpid::TransportFailure&) { @@ -131,16 +132,14 @@ class SessionImpl : public qpid::messaging::SessionImpl Receivers receivers; Senders senders; const bool transactional; - bool aborted; + bool committing; + sys::ExceptionHolder txError; bool accept(ReceiverImpl*, qpid::messaging::Message*, IncomingMessages::MessageTransfer&); bool getIncoming(IncomingMessages::Handler& handler, qpid::messaging::Duration timeout); bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer); void reconnect(); bool backoff(); - void abortTransaction(); - void checkAborted(); - void checkAbortedLH(const qpid::sys::Mutex::ScopedLock&); void commitImpl(); void rollbackImpl(); diff --git a/qpid/cpp/src/qpid/messaging/exceptions.cpp b/qpid/cpp/src/qpid/messaging/exceptions.cpp index d21477b494..419c508626 100644 --- a/qpid/cpp/src/qpid/messaging/exceptions.cpp +++ b/qpid/cpp/src/qpid/messaging/exceptions.cpp @@ -53,6 +53,7 @@ SessionClosed::SessionClosed() : SessionError("Session Closed") {} TransactionError::TransactionError(const std::string& msg) : SessionError(msg) {} TransactionAborted::TransactionAborted(const std::string& msg) : TransactionError(msg) {} +TransactionUnknown::TransactionUnknown(const std::string& msg) : TransactionError(msg) {} UnauthorizedAccess::UnauthorizedAccess(const std::string& msg) : SessionError(msg) {} ConnectionError::ConnectionError(const std::string& msg) : MessagingException(msg) {} diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index 9adad45ed4..1c131e7872 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -350,7 +350,9 @@ class HaCluster(object): def connect(self, i, **kwargs): """Connect with reconnect_urls""" - return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","), **kwargs) + c = self[i].connect(reconnect=True, reconnect_urls=self.url.split(","), **kwargs) + self.test.teardown_add(c) # Clean up + return c def kill(self, i, promote_next=True, final=True): """Kill broker i, promote broker i+1""" diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index e1864725d2..1567bfd177 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -1349,7 +1349,7 @@ class TransactionTests(HaBrokerTest): if q.startswith("qpid.ha-tx")] def test_tx_simple_commit(self): - cluster = HaCluster(self, 2, test_store=True) + cluster = HaCluster(self, 2, test_store=True, wait=True) tx = self.tx_simple_setup(cluster) tx.sync() tx_queues = cluster[0].agent.tx_queues() @@ -1373,6 +1373,9 @@ class TransactionTests(HaBrokerTest): self.assertEqual(['0', '1', '2', 'x', 'y', 'z'], [m.content for m in ri]) r.session.acknowledge() for b in cluster: b.assert_browse_backup("b", [], msg=b) + c.close() + tx.connection.close() + def check_enq_deq(self, cluster, queue, expect): for b in cluster: @@ -1455,6 +1458,7 @@ class TransactionTests(HaBrokerTest): tx.rollback() tx.close() # For clean test. for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + tx.connection.close() def assert_simple_rollback_outcome(self, b, tx_queues): b.assert_browse_backup("a", ["x","y","z"], msg=b) @@ -1467,7 +1471,27 @@ class TransactionTests(HaBrokerTest): self.assertEqual(open_read(b.store_log), expect, msg=b) self.assert_tx_clean(b) + def test_tx_simple_failure(self): + """Verify we throw TransactionAborted if there is a store error during a transaction""" + cluster = HaCluster(self, 3, test_store=True) + tx = self.tx_simple_setup(cluster) + tx.sync() + tx_queues = cluster[0].agent.tx_queues() + tx.acknowledge() + l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. + try: + cluster.bounce(0) # Should cause roll-back + tx.connection.session() # Wait for reconnect + for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + self.assertRaises(qm.TransactionAborted, tx.sync) + self.assertRaises(qm.TransactionAborted, tx.commit) + try: tx.connection.close() + except qm.TransactionAborted: pass # Occasionally get exception on close. + for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + finally: l.restore() + def test_tx_simple_failover(self): + """Verify we throw TransactionAborted if there is a fail-over during a transaction""" cluster = HaCluster(self, 3, test_store=True) tx = self.tx_simple_setup(cluster) tx.sync() @@ -1485,6 +1509,35 @@ class TransactionTests(HaBrokerTest): for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) finally: l.restore() + def test_tx_unknown_failover(self): + """Verify we throw TransactionUnknown if there is a failure during commit""" + cluster = HaCluster(self, 3, test_store=True) + tx = self.tx_simple_setup(cluster) + tx.sync() + tx_queues = cluster[0].agent.tx_queues() + tx.acknowledge() + l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. + try: + os.kill(cluster[2].pid, signal.SIGSTOP) # Delay prepare response + class CommitThread(Thread): + def run(self): + try: tx.commit() + except Exception, e: + self.error = e + t = CommitThread() + t.start() # Commit in progress + t.join(timeout=0.01) + self.assertTrue(t.is_alive()) + cluster.bounce(0) + os.kill(cluster[2].pid, signal.SIGCONT) + t.join() + try: raise t.error + except qm.TransactionUnknown: pass + for b in cluster: self.assert_tx_clean(b) + try: tx.connection.close() + except TransactionUnknown: pass # Occasionally get exception on close. + finally: l.restore() + def test_tx_no_backups(self): """Test the special case of a TX where there are no backups""" @@ -1509,17 +1562,14 @@ class TransactionTests(HaBrokerTest): tx.close() self.assert_simple_rollback_outcome(cluster[0], tx_queues) - def assert_commit_raises(self, tx): - def commit_sync(): tx.commit(); tx.sync() - self.assertRaises(Exception, commit_sync) - def test_tx_backup_fail(self): cluster = HaCluster(self, 2, test_store=True, s_args=[[],["--test-store-name=bang"]]) c = cluster[0].connect(protocol=self.tx_protocol) tx = c.session(transactional=True) s = tx.sender("q;{create:always,node:{durable:true}}") for m in ["foo","TEST_STORE_DO bang: throw","bar"]: s.send(qm.Message(m, durable=True)) - self.assert_commit_raises(tx) + def commit_sync(): tx.commit(); tx.sync() + self.assertRaises(qm.TransactionAborted, commit_sync) for b in cluster: b.assert_browse_backup("q", []) self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<enqueue q bar tx=1>\n<abort tx=1>\n") self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<abort tx=1>\n") @@ -1538,7 +1588,7 @@ class TransactionTests(HaBrokerTest): cluster[1].kill(final=False) s.send("b") tx.commit() - tx.close() + tx.connection.close() for b in [cluster[0],cluster[2]]: self.assert_tx_clean(b) b.assert_browse_backup("q", ["a","b"], msg=b) @@ -1548,7 +1598,7 @@ class TransactionTests(HaBrokerTest): s.send("foo") cluster.restart(1) # Not a part of the current transaction. tx.commit() - tx.close() + tx.connection.close() for b in cluster: self.assert_tx_clean(b) # The new member is not in the tx but receives the results normal replication. for b in cluster: b.assert_browse_backup("q", ["a", "b", "foo"], msg=b) diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp index d64f13d9c5..6e2f81726e 100644 --- a/qpid/cpp/src/tests/qpid-txtest2.cpp +++ b/qpid/cpp/src/tests/qpid-txtest2.cpp @@ -66,7 +66,7 @@ struct Options : public qpid::Options { Options() : help(false), init(true), transfer(true), check(true), size(256), durable(true), queues(2), - base("tx-test2"), msgsPerTx(1), txCount(5), totalMsgCount(10), + base("tx"), msgsPerTx(1), txCount(5), totalMsgCount(10), capacity(1000), url("localhost"), port(0), quiet(false) { addOptions() @@ -140,8 +140,10 @@ std::string generateData(uint size) void generateSet(const std::string& base, uint count, StringSet& collection) { for (uint i = 0; i < count; i++) { + std::ostringstream digits; + digits << count; std::ostringstream out; - out << base << "-" << (i+1); + out << base << "-" << std::setw(digits.str().size()) << std::setfill('0') << (i+1); collection.push_back(out.str()); } } @@ -193,6 +195,8 @@ struct Transfer : public TransactionalClient, public Runnable Receiver receiver(session.createReceiver(source)); receiver.setCapacity(opts.capacity); for (uint t = 0; t < opts.txCount;) { + std::ostringstream id; + id << source << ">" << target << ":" << t+1; try { for (uint m = 0; m < opts.msgsPerTx; m++) { Message msg = receiver.fetch(Duration::SECOND*30); @@ -205,9 +209,9 @@ struct Transfer : public TransactionalClient, public Runnable } session.commit(); t++; - if (!opts.quiet) std::cout << "Transaction " << t << " of " << opts.txCount << " committed successfully" << std::endl; + if (!opts.quiet) std::cout << "Transaction " << id.str() << " of " << opts.txCount << " committed successfully" << std::endl; } catch (const TransactionAborted&) { - std::cout << "Transaction " << (t+1) << " of " << opts.txCount << " was aborted and will be retried" << std::endl; + std::cout << "Transaction " << id.str() << " of " << opts.txCount << " was aborted and will be retried" << std::endl; session = connection.createTransactionalSession(); sender = session.createSender(target); receiver = session.createReceiver(source); |
