diff options
| author | Alan Conway <aconway@apache.org> | 2014-09-05 01:39:01 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2014-09-05 01:39:01 +0000 |
| commit | 8ec55346fe58a1f2f30e3699937bdc643fc9bbed (patch) | |
| tree | 9b931172599d76a22fcebb7878dd9097d388fc4d /qpid/cpp/src | |
| parent | ec7ef1d67fd460152e31d04c6b96b11c76af3b39 (diff) | |
| download | qpid-python-8ec55346fe58a1f2f30e3699937bdc643fc9bbed.tar.gz | |
NO-JIRA: HA Fix ha_tests.py failures with SWIG 0.10 client.
- Fix un-necessary re-sends in amqp0_10::SenderImpl::replay.
- Throw NotFound and UnauthorizedAccess correctly from amqp0_10::SessionImpl and ConnectionImpl
- Fix ha_test wait_address and valid_address re-using a session after it is closed by NotFound.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1622592 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 21 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h | 8 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_test.py | 41 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 27 |
6 files changed, 57 insertions, 45 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index 3600e4d945..24145f0117 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -237,7 +237,7 @@ qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const st } catch (const qpid::TransportFailure&) { reopen(); } catch (const qpid::SessionException& e) { - throw qpid::messaging::SessionError(e.what()); + SessionImpl::rethrow(e); } catch (const std::exception& e) { throw qpid::messaging::MessagingException(e.what()); } diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index 28a1353895..7575aaa306 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -153,8 +153,9 @@ void SenderImpl::sendUnreliable(const qpid::messaging::Message& m) sink->send(session, name, msg); } -void SenderImpl::replay(const sys::Mutex::ScopedLock&) +void SenderImpl::replay(const sys::Mutex::ScopedLock& l) { + checkPendingSends(false, l); for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) { i->markRedelivered(); sink->send(session, name, *i); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 32f52adf43..1e2b68b24e 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -36,6 +36,7 @@ #include "qpid/messaging/Sender.h" #include "qpid/messaging/Receiver.h" #include "qpid/messaging/Session.h" +#include "qpid/framing/enum.h" #include <boost/format.hpp> #include <boost/function.hpp> #include <boost/intrusive_ptr.hpp> @@ -79,6 +80,10 @@ void SessionImpl::checkError() throw qpid::messaging::TargetCapacityExceeded(e.what()); } catch (const qpid::framing::UnauthorizedAccessException& e) { throw qpid::messaging::UnauthorizedAccess(e.what()); + } catch (const qpid::framing::NotFoundException& e) { + throw qpid::messaging::NotFound(e.what()); + } catch (const qpid::framing::ResourceDeletedException& e) { + throw qpid::messaging::NotFound(e.what()); } catch (const qpid::SessionException& e) { throw qpid::messaging::SessionError(e.what()); } catch (const qpid::ConnectionException& e) { @@ -405,10 +410,8 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag } catch (const qpid::framing::ResourceLimitExceededException& e) { if (backoff()) return false; else throw qpid::messaging::TargetCapacityExceeded(e.what()); - } catch (const qpid::framing::UnauthorizedAccessException& e) { - throw qpid::messaging::UnauthorizedAccess(e.what()); } catch (const qpid::SessionException& e) { - throw qpid::messaging::SessionError(e.what()); + rethrow(e); } catch (const qpid::ClosedException&) { throw qpid::messaging::SessionClosed(); } catch (const qpid::ConnectionException& e) { @@ -588,4 +591,16 @@ qpid::messaging::Connection SessionImpl::getConnection() const return qpid::messaging::Connection(connection.get()); } +void SessionImpl::rethrow(const qpid::SessionException& e) { + switch (e.code) { + case framing::execution::ERROR_CODE_NOT_ALLOWED: + case framing::execution::ERROR_CODE_UNAUTHORIZED_ACCESS: throw messaging::UnauthorizedAccess(e.what()); + + case framing::execution::ERROR_CODE_NOT_FOUND: + case framing::execution::ERROR_CODE_RESOURCE_DELETED: throw messaging::NotFound(e.what()); + + default: throw SessionError(e.what()); + } +} + }}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h index 3a160b2b91..2bb72aa877 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -109,8 +109,13 @@ class SessionImpl : public qpid::messaging::SessionImpl else throw qpid::messaging::TargetCapacityExceeded(e.what()); } catch (const qpid::framing::UnauthorizedAccessException& e) { throw qpid::messaging::UnauthorizedAccess(e.what()); + } catch (const qpid::framing::NotFoundException& e) { + throw qpid::messaging::NotFound(e.what()); + } catch (const qpid::framing::ResourceDeletedException& e) { + throw qpid::messaging::NotFound(e.what()); } catch (const qpid::SessionException& e) { - throw qpid::messaging::SessionError(e.what()); + rethrow(e); + return false; // Keep the compiler happy } catch (const qpid::ConnectionException& e) { throw qpid::messaging::ConnectionError(e.what()); } catch (const qpid::ChannelException& e) { @@ -119,6 +124,7 @@ class SessionImpl : public qpid::messaging::SessionImpl } static SessionImpl& convert(qpid::messaging::Session&); + static void rethrow(const qpid::SessionException&); private: typedef std::map<std::string, qpid::messaging::Receiver> Receivers; diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index 1c131e7872..40ea3854c9 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -241,11 +241,11 @@ acl allow all all def wait_address(self, address): """Wait for address to become valid on the broker.""" - bs = self.connect_admin().session() - try: wait_address(bs, address) - finally: bs.connection.close() + c = self.connect_admin() + try: wait_address(c, address) + finally: c.close() - def wait_backup(self, address): self.wait_address(address) + wait_backup = wait_address def browse(self, queue, timeout=0, transform=lambda m: m.content): c = self.connect_admin() @@ -253,21 +253,15 @@ acl allow all all return browse(c.session(), queue, timeout, transform) finally: c.close() - def assert_browse(self, queue, expected, **kwargs): - """Verify queue contents by browsing.""" - bs = self.connect().session() - try: - wait_address(bs, queue) - assert_browse_retry(bs, queue, expected, **kwargs) - finally: bs.connection.close() - def assert_browse_backup(self, queue, expected, **kwargs): """Combines wait_backup and assert_browse_retry.""" - bs = self.connect_admin().session() + c = self.connect_admin() try: - wait_address(bs, queue) - assert_browse_retry(bs, queue, expected, **kwargs) - finally: bs.connection.close() + wait_address(c, queue) + assert_browse_retry(c.session(), queue, expected, **kwargs) + finally: c.close() + + assert_browse = assert_browse_backup def assert_connect_fail(self): try: @@ -384,18 +378,17 @@ class HaCluster(object): def __iter__(self): return self._brokers.__iter__() -def wait_address(session, address): +def wait_address(connection, address): """Wait for an address to become valid.""" - def check(): - try: session.sender(address); return True - except qm.NotFound: return False - assert retry(check), "Timed out waiting for address %s"%(address) + assert retry(lambda: valid_address(connection, address)), "Timed out waiting for address %s"%(address) -def valid_address(session, address): +def valid_address(connection, address): """Test if an address is valid""" try: - session.receiver(address) + s = connection.session().receiver(address) + s.session.close() return True - except qm.NotFound: return False + except qm.NotFound: + return False diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 482296e47f..a43b939ee3 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -82,7 +82,7 @@ class ReplicationTests(HaBrokerTest): def verify(b, prefix, p): """Verify setup was replicated to backup b""" # Wait for configuration to replicate. - wait_address(b, prefix+"x"); + wait_address(b.connection, prefix+"x"); self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"]) self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "b") @@ -90,7 +90,7 @@ class ReplicationTests(HaBrokerTest): self.assert_browse_retry(b, prefix+"q1", ["1", "4"]) self.assert_browse_retry(b, prefix+"q2", []) # configuration only - assert not valid_address(b, prefix+"q3") + assert not valid_address(b.connection, prefix+"q3") # Verify exchange with replicate=all b.sender(prefix+"e1/key1").send(qm.Message(prefix+"e1")) @@ -104,8 +104,8 @@ class ReplicationTests(HaBrokerTest): self.assert_browse_retry(b, prefix+"q4", ["6","7"]) # Verify deletes - assert not valid_address(b, prefix+"dq") - assert not valid_address(b, prefix+"de") + assert not valid_address(b.connection, prefix+"dq") + assert not valid_address(b.connection, prefix+"de") l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. try: @@ -130,7 +130,7 @@ class ReplicationTests(HaBrokerTest): # Test a series of messages, enqueue all then dequeue all. primary.agent.addQueue("foo") s = p.sender("foo") - wait_address(b, "foo") + wait_address(b.connection, "foo") msgs = [str(i) for i in range(10)] for m in msgs: s.send(qm.Message(m)) self.assert_browse_retry(p, "foo", msgs) @@ -168,11 +168,8 @@ class ReplicationTests(HaBrokerTest): msgs = [str(i) for i in range(30)] b1 = backup1.connect_admin().session() - wait_address(b1, "q"); - self.assert_browse_retry(b1, "q", msgs) - b2 = backup2.connect_admin().session() - wait_address(b2, "q"); - self.assert_browse_retry(b2, "q", msgs) + backup1.assert_browse_backup("q", msgs) + backup2.assert_browse_backup("q", msgs) def test_send_receive(self): """Verify sequence numbers of messages sent by qpid-send""" @@ -556,16 +553,16 @@ class ReplicationTests(HaBrokerTest): s1 = cluster[1].connect_admin().session() cluster[1].wait_backup("q") - assert not valid_address(s1, "exad") - assert valid_address(s1, "ex") - assert valid_address(s1, "ad") - assert valid_address(s1, "time") + assert not valid_address(s1.connection, "exad") + assert valid_address(s1.connection, "ex") + assert valid_address(s1.connection, "ad") + assert valid_address(s1.connection, "time") # Verify that auto-delete queues are not kept alive by # replicating subscriptions ad.close() s0.sync() - assert not valid_address(s0, "ad") + assert not valid_address(s0.connection, "ad") def test_broker_info(self): """Check that broker information is correctly published via management""" |
