summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-09-05 01:39:01 +0000
committerAlan Conway <aconway@apache.org>2014-09-05 01:39:01 +0000
commit8ec55346fe58a1f2f30e3699937bdc643fc9bbed (patch)
tree9b931172599d76a22fcebb7878dd9097d388fc4d /qpid/cpp/src
parentec7ef1d67fd460152e31d04c6b96b11c76af3b39 (diff)
downloadqpid-python-8ec55346fe58a1f2f30e3699937bdc643fc9bbed.tar.gz
NO-JIRA: HA Fix ha_tests.py failures with SWIG 0.10 client.
- Fix un-necessary re-sends in amqp0_10::SenderImpl::replay. - Throw NotFound and UnauthorizedAccess correctly from amqp0_10::SessionImpl and ConnectionImpl - Fix ha_test wait_address and valid_address re-using a session after it is closed by NotFound. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1622592 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp2
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp3
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp21
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h8
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py41
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py27
6 files changed, 57 insertions, 45 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index 3600e4d945..24145f0117 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -237,7 +237,7 @@ qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const st
} catch (const qpid::TransportFailure&) {
reopen();
} catch (const qpid::SessionException& e) {
- throw qpid::messaging::SessionError(e.what());
+ SessionImpl::rethrow(e);
} catch (const std::exception& e) {
throw qpid::messaging::MessagingException(e.what());
}
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
index 28a1353895..7575aaa306 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
@@ -153,8 +153,9 @@ void SenderImpl::sendUnreliable(const qpid::messaging::Message& m)
sink->send(session, name, msg);
}
-void SenderImpl::replay(const sys::Mutex::ScopedLock&)
+void SenderImpl::replay(const sys::Mutex::ScopedLock& l)
{
+ checkPendingSends(false, l);
for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
i->markRedelivered();
sink->send(session, name, *i);
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 32f52adf43..1e2b68b24e 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -36,6 +36,7 @@
#include "qpid/messaging/Sender.h"
#include "qpid/messaging/Receiver.h"
#include "qpid/messaging/Session.h"
+#include "qpid/framing/enum.h"
#include <boost/format.hpp>
#include <boost/function.hpp>
#include <boost/intrusive_ptr.hpp>
@@ -79,6 +80,10 @@ void SessionImpl::checkError()
throw qpid::messaging::TargetCapacityExceeded(e.what());
} catch (const qpid::framing::UnauthorizedAccessException& e) {
throw qpid::messaging::UnauthorizedAccess(e.what());
+ } catch (const qpid::framing::NotFoundException& e) {
+ throw qpid::messaging::NotFound(e.what());
+ } catch (const qpid::framing::ResourceDeletedException& e) {
+ throw qpid::messaging::NotFound(e.what());
} catch (const qpid::SessionException& e) {
throw qpid::messaging::SessionError(e.what());
} catch (const qpid::ConnectionException& e) {
@@ -405,10 +410,8 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag
} catch (const qpid::framing::ResourceLimitExceededException& e) {
if (backoff()) return false;
else throw qpid::messaging::TargetCapacityExceeded(e.what());
- } catch (const qpid::framing::UnauthorizedAccessException& e) {
- throw qpid::messaging::UnauthorizedAccess(e.what());
} catch (const qpid::SessionException& e) {
- throw qpid::messaging::SessionError(e.what());
+ rethrow(e);
} catch (const qpid::ClosedException&) {
throw qpid::messaging::SessionClosed();
} catch (const qpid::ConnectionException& e) {
@@ -588,4 +591,16 @@ qpid::messaging::Connection SessionImpl::getConnection() const
return qpid::messaging::Connection(connection.get());
}
+void SessionImpl::rethrow(const qpid::SessionException& e) {
+ switch (e.code) {
+ case framing::execution::ERROR_CODE_NOT_ALLOWED:
+ case framing::execution::ERROR_CODE_UNAUTHORIZED_ACCESS: throw messaging::UnauthorizedAccess(e.what());
+
+ case framing::execution::ERROR_CODE_NOT_FOUND:
+ case framing::execution::ERROR_CODE_RESOURCE_DELETED: throw messaging::NotFound(e.what());
+
+ default: throw SessionError(e.what());
+ }
+}
+
}}} // namespace qpid::client::amqp0_10
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
index 3a160b2b91..2bb72aa877 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -109,8 +109,13 @@ class SessionImpl : public qpid::messaging::SessionImpl
else throw qpid::messaging::TargetCapacityExceeded(e.what());
} catch (const qpid::framing::UnauthorizedAccessException& e) {
throw qpid::messaging::UnauthorizedAccess(e.what());
+ } catch (const qpid::framing::NotFoundException& e) {
+ throw qpid::messaging::NotFound(e.what());
+ } catch (const qpid::framing::ResourceDeletedException& e) {
+ throw qpid::messaging::NotFound(e.what());
} catch (const qpid::SessionException& e) {
- throw qpid::messaging::SessionError(e.what());
+ rethrow(e);
+ return false; // Keep the compiler happy
} catch (const qpid::ConnectionException& e) {
throw qpid::messaging::ConnectionError(e.what());
} catch (const qpid::ChannelException& e) {
@@ -119,6 +124,7 @@ class SessionImpl : public qpid::messaging::SessionImpl
}
static SessionImpl& convert(qpid::messaging::Session&);
+ static void rethrow(const qpid::SessionException&);
private:
typedef std::map<std::string, qpid::messaging::Receiver> Receivers;
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index 1c131e7872..40ea3854c9 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -241,11 +241,11 @@ acl allow all all
def wait_address(self, address):
"""Wait for address to become valid on the broker."""
- bs = self.connect_admin().session()
- try: wait_address(bs, address)
- finally: bs.connection.close()
+ c = self.connect_admin()
+ try: wait_address(c, address)
+ finally: c.close()
- def wait_backup(self, address): self.wait_address(address)
+ wait_backup = wait_address
def browse(self, queue, timeout=0, transform=lambda m: m.content):
c = self.connect_admin()
@@ -253,21 +253,15 @@ acl allow all all
return browse(c.session(), queue, timeout, transform)
finally: c.close()
- def assert_browse(self, queue, expected, **kwargs):
- """Verify queue contents by browsing."""
- bs = self.connect().session()
- try:
- wait_address(bs, queue)
- assert_browse_retry(bs, queue, expected, **kwargs)
- finally: bs.connection.close()
-
def assert_browse_backup(self, queue, expected, **kwargs):
"""Combines wait_backup and assert_browse_retry."""
- bs = self.connect_admin().session()
+ c = self.connect_admin()
try:
- wait_address(bs, queue)
- assert_browse_retry(bs, queue, expected, **kwargs)
- finally: bs.connection.close()
+ wait_address(c, queue)
+ assert_browse_retry(c.session(), queue, expected, **kwargs)
+ finally: c.close()
+
+ assert_browse = assert_browse_backup
def assert_connect_fail(self):
try:
@@ -384,18 +378,17 @@ class HaCluster(object):
def __iter__(self): return self._brokers.__iter__()
-def wait_address(session, address):
+def wait_address(connection, address):
"""Wait for an address to become valid."""
- def check():
- try: session.sender(address); return True
- except qm.NotFound: return False
- assert retry(check), "Timed out waiting for address %s"%(address)
+ assert retry(lambda: valid_address(connection, address)), "Timed out waiting for address %s"%(address)
-def valid_address(session, address):
+def valid_address(connection, address):
"""Test if an address is valid"""
try:
- session.receiver(address)
+ s = connection.session().receiver(address)
+ s.session.close()
return True
- except qm.NotFound: return False
+ except qm.NotFound:
+ return False
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 482296e47f..a43b939ee3 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -82,7 +82,7 @@ class ReplicationTests(HaBrokerTest):
def verify(b, prefix, p):
"""Verify setup was replicated to backup b"""
# Wait for configuration to replicate.
- wait_address(b, prefix+"x");
+ wait_address(b.connection, prefix+"x");
self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"])
self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "b")
@@ -90,7 +90,7 @@ class ReplicationTests(HaBrokerTest):
self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
self.assert_browse_retry(b, prefix+"q2", []) # configuration only
- assert not valid_address(b, prefix+"q3")
+ assert not valid_address(b.connection, prefix+"q3")
# Verify exchange with replicate=all
b.sender(prefix+"e1/key1").send(qm.Message(prefix+"e1"))
@@ -104,8 +104,8 @@ class ReplicationTests(HaBrokerTest):
self.assert_browse_retry(b, prefix+"q4", ["6","7"])
# Verify deletes
- assert not valid_address(b, prefix+"dq")
- assert not valid_address(b, prefix+"de")
+ assert not valid_address(b.connection, prefix+"dq")
+ assert not valid_address(b.connection, prefix+"de")
l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
try:
@@ -130,7 +130,7 @@ class ReplicationTests(HaBrokerTest):
# Test a series of messages, enqueue all then dequeue all.
primary.agent.addQueue("foo")
s = p.sender("foo")
- wait_address(b, "foo")
+ wait_address(b.connection, "foo")
msgs = [str(i) for i in range(10)]
for m in msgs: s.send(qm.Message(m))
self.assert_browse_retry(p, "foo", msgs)
@@ -168,11 +168,8 @@ class ReplicationTests(HaBrokerTest):
msgs = [str(i) for i in range(30)]
b1 = backup1.connect_admin().session()
- wait_address(b1, "q");
- self.assert_browse_retry(b1, "q", msgs)
- b2 = backup2.connect_admin().session()
- wait_address(b2, "q");
- self.assert_browse_retry(b2, "q", msgs)
+ backup1.assert_browse_backup("q", msgs)
+ backup2.assert_browse_backup("q", msgs)
def test_send_receive(self):
"""Verify sequence numbers of messages sent by qpid-send"""
@@ -556,16 +553,16 @@ class ReplicationTests(HaBrokerTest):
s1 = cluster[1].connect_admin().session()
cluster[1].wait_backup("q")
- assert not valid_address(s1, "exad")
- assert valid_address(s1, "ex")
- assert valid_address(s1, "ad")
- assert valid_address(s1, "time")
+ assert not valid_address(s1.connection, "exad")
+ assert valid_address(s1.connection, "ex")
+ assert valid_address(s1.connection, "ad")
+ assert valid_address(s1.connection, "time")
# Verify that auto-delete queues are not kept alive by
# replicating subscriptions
ad.close()
s0.sync()
- assert not valid_address(s0, "ad")
+ assert not valid_address(s0.connection, "ad")
def test_broker_info(self):
"""Check that broker information is correctly published via management"""