diff options
| author | Gordon Sim <gsim@apache.org> | 2010-04-09 15:08:47 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2010-04-09 15:08:47 +0000 |
| commit | ef958e7b221d38ec76c392f76a66978211d6d1f9 (patch) | |
| tree | 111ba60857b7613f18e64f5c817d6e271428013e /cpp/src/qpid/client | |
| parent | 2daf8e11866364ff4955ee69625bf401e9baa93c (diff) | |
| download | qpid-python-ef958e7b221d38ec76c392f76a66978211d6d1f9.tar.gz | |
QPID-664: changed connect() back to open(),removed detach(),defined new exception hierarchy, added ability to re-use reconnect/replay logic for resource-limit-exceeded errors
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@932451 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/SessionBase_0_10.cpp | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 102 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 52 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.h | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 43 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.h | 17 |
7 files changed, 151 insertions, 83 deletions
diff --git a/cpp/src/qpid/client/SessionBase_0_10.cpp b/cpp/src/qpid/client/SessionBase_0_10.cpp index 6aa13bb579..e114b7aacc 100644 --- a/cpp/src/qpid/client/SessionBase_0_10.cpp +++ b/cpp/src/qpid/client/SessionBase_0_10.cpp @@ -65,13 +65,6 @@ void SessionBase_0_10::sendCompletion() impl->sendCompletion(); } -void SessionBase_0_10::sendSyncRequest() -{ - ExecutionSyncBody b; - b.setSync(true); - impl->send(b); -} - uint16_t SessionBase_0_10::getChannel() const { return impl->getChannel(); } void SessionBase_0_10::suspend() { impl->suspend(); } diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index f64a46ba01..43b581861f 100644 --- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -26,7 +26,7 @@ #include "qpid/messaging/Address.h" #include "qpid/messaging/Message.h" #include "qpid/types/Variant.h" -#include "qpid/Exception.h" +#include "qpid/messaging/exceptions.h" #include "qpid/log/Statement.h" #include "qpid/framing/enum.h" #include "qpid/framing/ExchangeBoundResult.h" @@ -45,7 +45,10 @@ namespace amqp0_10 { using qpid::Exception; using qpid::messaging::Address; -using qpid::messaging::InvalidAddress; +using qpid::messaging::MalformedAddress; +using qpid::messaging::ResolutionError; +using qpid::messaging::NotFound; +using qpid::messaging::AssertionFailed; using qpid::framing::ExchangeBoundResult; using qpid::framing::ExchangeQueryResult; using qpid::framing::FieldTable; @@ -360,7 +363,7 @@ bool AddressResolution::is_reliable(const Address& address) std::string checkAddressType(qpid::client::Session session, const Address& address) { if (address.getName().empty()) { - throw InvalidAddress("Name cannot be null"); + throw MalformedAddress("Name cannot be null"); } std::string type = (Opt(address)/NODE/TYPE).str(); if (type.empty()) { @@ -376,7 +379,7 @@ std::string checkAddressType(qpid::client::Session session, const Address& addre type = TOPIC_ADDRESS; } else { //both a queue and exchange exist for that name - throw InvalidAddress("Ambiguous address, please specify queue or topic as node type"); + throw ResolutionError("Ambiguous address, please specify queue or topic as node type"); } } return type; @@ -396,7 +399,7 @@ std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Sess QPID_LOG(debug, "treating source address as queue: " << address); return source; } else { - throw InvalidAddress("Unrecognised type: " + type); + throw ResolutionError("Unrecognised type: " + type); } } @@ -414,7 +417,7 @@ std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session QPID_LOG(debug, "treating target address as queue: " << address); return sink; } else { - throw InvalidAddress("Unrecognised type: " + type); + throw ResolutionError("Unrecognised type: " + type); } } @@ -424,7 +427,7 @@ bool isBrowse(const Address& address) if (!mode.isVoid()) { std::string value = mode.asString(); if (value == BROWSE) return true; - else if (value != CONSUME) throw InvalidAddress("Invalid mode"); + else if (value != CONSUME) throw ResolutionError("Invalid mode"); } return false; } @@ -516,7 +519,7 @@ void Subscription::bindAll() b.arguments.setString("x-match", "all"); bindings.push_back(b); } else { //E.g. direct and xml - throw qpid::Exception(QPID_MSG("Cannot create binding to match all messages for exchange of type " << actualType)); + throw ResolutionError(QPID_MSG("Cannot create binding to match all messages for exchange of type " << actualType)); } } @@ -662,23 +665,26 @@ void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode) if (enabled(createPolicy, mode)) { QPID_LOG(debug, "Auto-creating queue '" << name << "'"); try { - sync(session).queueDeclare(arg::queue=name, - arg::durable=durable, - arg::autoDelete=autoDelete, - arg::exclusive=exclusive, - arg::alternateExchange=alternateExchange, - arg::arguments=arguments); - } catch (const qpid::Exception& e) { - throw InvalidAddress((boost::format("Could not create queue %1%; %2%") % name % e.what()).str()); + session.queueDeclare(arg::queue=name, + arg::durable=durable, + arg::autoDelete=autoDelete, + arg::exclusive=exclusive, + arg::alternateExchange=alternateExchange, + arg::arguments=arguments); + nodeBindings.bind(session); + session.sync(); + } catch (const qpid::framing::ResourceLockedException& e) { + throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str()); + } catch (const qpid::framing::NotAllowedException& e) { + throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str()); + } catch (const qpid::framing::NotFoundException& e) {//may be thrown when creating bindings + throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str()); } - nodeBindings.bind(session); } else { try { sync(session).queueDeclare(arg::queue=name, arg::passive=true); } catch (const qpid::framing::NotFoundException& /*e*/) { - throw InvalidAddress((boost::format("Queue %1% does not exist") % name).str()); - } catch (const std::exception& e) { - throw InvalidAddress(e.what()); + throw NotFound((boost::format("Queue %1% does not exist") % name).str()); } } } @@ -700,27 +706,27 @@ void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) if (enabled(assertPolicy, mode)) { QueueQueryResult result = sync(session).queueQuery(name); if (result.getQueue() != name) { - throw InvalidAddress((boost::format("Queue not found: %1%") % name).str()); + throw NotFound((boost::format("Queue not found: %1%") % name).str()); } else { if (durable && !result.getDurable()) { - throw InvalidAddress((boost::format("Queue not durable: %1%") % name).str()); + throw AssertionFailed((boost::format("Queue not durable: %1%") % name).str()); } if (autoDelete && !result.getAutoDelete()) { - throw InvalidAddress((boost::format("Queue not set to auto-delete: %1%") % name).str()); + throw AssertionFailed((boost::format("Queue not set to auto-delete: %1%") % name).str()); } if (exclusive && !result.getExclusive()) { - throw InvalidAddress((boost::format("Queue not exclusive: %1%") % name).str()); + throw AssertionFailed((boost::format("Queue not exclusive: %1%") % name).str()); } if (!alternateExchange.empty() && result.getAlternateExchange() != alternateExchange) { - throw InvalidAddress((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%") + throw AssertionFailed((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%") % name % alternateExchange % result.getAlternateExchange()).str()); } for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) { FieldTable::ValuePtr v = result.getArguments().get(i->first); if (!v) { - throw InvalidAddress((boost::format("Option %1% not set for %2%") % i->first % name).str()); + throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str()); } else if (*i->second != *v) { - throw InvalidAddress((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") + throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") % i->first % name % *(i->second) % *v).str()); } } @@ -746,23 +752,24 @@ void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode) try { std::string type = specifiedType; if (type.empty()) type = TOPIC_EXCHANGE; - sync(session).exchangeDeclare(arg::exchange=name, + session.exchangeDeclare(arg::exchange=name, arg::type=type, arg::durable=durable, arg::autoDelete=autoDelete, arg::alternateExchange=alternateExchange, arg::arguments=arguments); - } catch (const qpid::Exception& e) { - throw InvalidAddress((boost::format("Could not create exchange %1%; %2%") % name % e.what()).str()); + nodeBindings.bind(session); + session.sync(); + } catch (const qpid::framing::NotAllowedException& e) { + throw ResolutionError((boost::format("Create failed for exchange %1%; %2%") % name % e.what()).str()); + } catch (const qpid::framing::NotFoundException& e) {//can be caused when creating bindings + throw ResolutionError((boost::format("Create failed for exchange %1%; %2%") % name % e.what()).str()); } - nodeBindings.bind(session); } else { try { sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true); } catch (const qpid::framing::NotFoundException& /*e*/) { - throw InvalidAddress((boost::format("Exchange %1% does not exist") % name).str()); - } catch (const std::exception& e) { - throw InvalidAddress(e.what()); + throw NotFound((boost::format("Exchange %1% does not exist") % name).str()); } } } @@ -784,14 +791,14 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) if (enabled(assertPolicy, mode)) { ExchangeQueryResult result = sync(session).exchangeQuery(name); if (result.getNotFound()) { - throw InvalidAddress((boost::format("Exchange not found: %1%") % name).str()); + throw NotFound((boost::format("Exchange not found: %1%") % name).str()); } else { if (specifiedType.size() && result.getType() != specifiedType) { - throw InvalidAddress((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%") + throw AssertionFailed((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%") % name % specifiedType % result.getType()).str()); } if (durable && !result.getDurable()) { - throw InvalidAddress((boost::format("Exchange not durable: %1%") % name).str()); + throw AssertionFailed((boost::format("Exchange not durable: %1%") % name).str()); } //Note: Can't check auto-delete or alternate-exchange via //exchange-query-result as these are not returned @@ -799,9 +806,9 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) { FieldTable::ValuePtr v = result.getArguments().get(i->first); if (!v) { - throw InvalidAddress((boost::format("Option %1% not set for %2%") % i->first % name).str()); + throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str()); } else if (i->second != v) { - throw InvalidAddress((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") + throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") % i->first % name % *(i->second) % *v).str()); } } @@ -844,16 +851,11 @@ void Bindings::setDefaultQueue(const std::string& queue) void Bindings::bind(qpid::client::AsyncSession& session) { - try { - for (Bindings::const_iterator i = begin(); i != end(); ++i) { - session.exchangeBind(arg::queue=i->queue, - arg::exchange=i->exchange, - arg::bindingKey=i->key, - arg::arguments=i->arguments); - } - session.sync(); - } catch (const qpid::Exception& e) { - throw InvalidAddress((boost::format("Could not create node bindings: %1%") % e.what()).str()); + for (Bindings::const_iterator i = begin(); i != end(); ++i) { + session.exchangeBind(arg::queue=i->queue, + arg::exchange=i->exchange, + arg::bindingKey=i->key, + arg::arguments=i->arguments); } } @@ -873,7 +875,7 @@ void Bindings::check(qpid::client::AsyncSession& session) arg::exchange=i->exchange, arg::bindingKey=i->key); if (result.getQueueNotMatched() || result.getKeyNotMatched()) { - throw InvalidAddress((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]") + throw AssertionFailed((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]") % i->exchange % i->queue % i->key).str()); } } diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index 30b75ff4ff..7e5018fc5f 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -21,10 +21,12 @@ #include "ConnectionImpl.h" #include "SessionImpl.h" #include "SimpleUrlParser.h" +#include "qpid/messaging/exceptions.h" #include "qpid/messaging/Session.h" #include "qpid/messaging/PrivateImplRef.h" #include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" +#include "qpid/Url.h" #include <boost/intrusive_ptr.hpp> #include <vector> @@ -97,7 +99,7 @@ void convert(const Variant::Map& from, ConnectionSettings& to) ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) : reconnect(true), timeout(-1), limit(-1), minReconnectInterval(3), maxReconnectInterval(60), - retries(0) + retries(0), reconnectOnLimitExceeded(true) { QPID_LOG(debug, "Created connection with " << options); setOptions(options); @@ -117,7 +119,8 @@ void ConnectionImpl::setOptions(const Variant::Map& options) setIfFound(options, "reconnect-interval-min", minReconnectInterval); setIfFound(options, "reconnect-interval-max", maxReconnectInterval); } - setIfFound(options, "reconnect-urls", urls); + setIfFound(options, "reconnect-urls", urls); + setIfFound(options, "x-reconnect-on-limit-exceeded", reconnectOnLimitExceeded); } void ConnectionImpl::setOption(const std::string& name, const Variant& value) @@ -147,7 +150,7 @@ void ConnectionImpl::detach() connection.close(); } -bool ConnectionImpl::isConnected() +bool ConnectionImpl::isOpen() { qpid::sys::Mutex::ScopedLock l(lock); return connection.isOpen(); @@ -192,13 +195,13 @@ qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const st } try { getImplPtr(impl)->setSession(connection.newSession(name)); - } catch (const TransportFailure&) { - connect(); + } catch (const qpid::TransportFailure&) { + open(); } return impl; } -void ConnectionImpl::connect() +void ConnectionImpl::open() { qpid::sys::AbsTime start = qpid::sys::now(); qpid::sys::ScopedLock<qpid::sys::Semaphore> l(semaphore); @@ -217,9 +220,15 @@ bool expired(const qpid::sys::AbsTime& start, int64_t timeout) void ConnectionImpl::connect(const qpid::sys::AbsTime& started) { for (int64_t i = minReconnectInterval; !tryConnect(); i = std::min(i * 2, maxReconnectInterval)) { - if (!reconnect) throw TransportFailure("Failed to connect (reconnect disabled)"); - if (limit >= 0 && retries++ >= limit) throw TransportFailure("Failed to connect within reconnect limit"); - if (expired(started, timeout)) throw TransportFailure("Failed to connect within reconnect timeout"); + if (!reconnect) { + throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)"); + } + if (limit >= 0 && retries++ >= limit) { + throw qpid::messaging::TransportFailure("Failed to connect within reconnect limit"); + } + if (expired(started, timeout)) { + throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout"); + } else qpid::sys::sleep(i); } retries = 0; @@ -246,7 +255,7 @@ bool ConnectionImpl::tryConnect(const std::vector<std::string>& urls) } QPID_LOG(info, "Connected to " << *i); return true; - } catch (const Exception& e) { + } catch (const qpid::Exception& e) { //TODO: need to fix timeout on //qpid::client::Connection::open() so that it throws //TransportFailure rather than a ConnectionException @@ -264,8 +273,27 @@ bool ConnectionImpl::resetSessions() getImplPtr(i->second)->setSession(connection.newSession(i->first)); } return true; - } catch (const TransportFailure&) { - QPID_LOG(debug, "Connection failed while re-inialising sessions"); + } catch (const qpid::TransportFailure&) { + QPID_LOG(debug, "Connection failed while re-initialising sessions"); + return false; + } catch (const qpid::framing::ResourceLimitExceededException& e) { + if (reconnectOnLimitExceeded) { + QPID_LOG(debug, "Detaching and reconnecting due to: " << e.what()); + detach(); + return false; + } else { + throw qpid::messaging::TargetCapacityExceeded(e.what()); + } + } +} + +bool ConnectionImpl::backoff() +{ + if (reconnectOnLimitExceeded) { + detach(); + open(); + return true; + } else { return false; } } diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h index 9d992c1375..b6fd33cc49 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -23,7 +23,6 @@ */ #include "qpid/messaging/ConnectionImpl.h" #include "qpid/types/Variant.h" -#include "qpid/Url.h" #include "qpid/client/Connection.h" #include "qpid/client/ConnectionSettings.h" #include "qpid/sys/Mutex.h" @@ -40,14 +39,15 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl { public: ConnectionImpl(const std::string& url, const qpid::types::Variant::Map& options); - void connect(); - bool isConnected(); + void open(); + bool isOpen(); void close(); qpid::messaging::Session newSession(bool transactional, const std::string& name); qpid::messaging::Session getSession(const std::string& name) const; void closed(SessionImpl&); void detach(); void setOption(const std::string& name, const qpid::types::Variant& value); + bool backoff(); private: typedef std::map<std::string, qpid::messaging::Session> Sessions; @@ -63,6 +63,7 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl int64_t minReconnectInterval; int64_t maxReconnectInterval; int32_t retries; + bool reconnectOnLimitExceeded; void setOptions(const qpid::types::Variant::Map& options); void connect(const qpid::sys::AbsTime& started); diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp index c3367f8ab4..343b5cad37 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -22,6 +22,7 @@ #include "AddressResolution.h" #include "MessageSource.h" #include "SessionImpl.h" +#include "qpid/messaging/exceptions.h" #include "qpid/messaging/Receiver.h" #include "qpid/messaging/Session.h" @@ -29,6 +30,7 @@ namespace qpid { namespace client { namespace amqp0_10 { +using qpid::messaging::NoMessageAvailable; using qpid::messaging::Receiver; using qpid::messaging::Duration; @@ -44,14 +46,14 @@ void ReceiverImpl::received(qpid::messaging::Message&) qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout) { qpid::messaging::Message result; - if (!get(result, timeout)) throw Receiver::NoMessageAvailable(); + if (!get(result, timeout)) throw NoMessageAvailable(); return result; } qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout) { qpid::messaging::Message result; - if (!fetch(result, timeout)) throw Receiver::NoMessageAvailable(); + if (!fetch(result, timeout)) throw NoMessageAvailable(); return result; } diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 969ad93da9..33a3e226ff 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -24,6 +24,8 @@ #include "qpid/client/amqp0_10/SenderImpl.h" #include "qpid/client/amqp0_10/MessageSource.h" #include "qpid/client/amqp0_10/MessageSink.h" +#include "qpid/client/SessionBase_0_10Access.h" +#include "qpid/client/SessionImpl.h" #include "qpid/messaging/PrivateImplRef.h" #include "qpid/Exception.h" #include "qpid/log/Statement.h" @@ -34,12 +36,15 @@ #include "qpid/messaging/Sender.h" #include "qpid/messaging/Receiver.h" #include "qpid/messaging/Session.h" -#include "qpid/framing/reply_exceptions.h" #include <boost/format.hpp> #include <boost/function.hpp> #include <boost/intrusive_ptr.hpp> using qpid::messaging::KeyError; +using qpid::messaging::NoMessageAvailable; +using qpid::messaging::MessagingException; +using qpid::messaging::TransactionAborted; +using qpid::messaging::SessionError; using qpid::messaging::MessageImplAccess; using qpid::messaging::Sender; using qpid::messaging::Receiver; @@ -50,6 +55,11 @@ namespace amqp0_10 { SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t) {} +void SessionImpl::checkError() +{ + qpid::client::SessionBase_0_10Access s(session); + s.get()->assertOpen(); +} void SessionImpl::sync(bool block) { @@ -60,7 +70,7 @@ void SessionImpl::sync(bool block) void SessionImpl::commit() { if (!execute<Commit>()) { - throw Exception();//TODO: what type? + throw TransactionAborted("Transaction aborted due to transport failure"); } } @@ -141,6 +151,7 @@ void SessionImpl::setSession(qpid::client::Session s) for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) { getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver); } + session.sync(); } struct SessionImpl::CreateReceiver : Command @@ -219,7 +230,7 @@ SessionImpl& SessionImpl::convert(qpid::messaging::Session& s) { boost::intrusive_ptr<SessionImpl> impl = getImplPtr<qpid::messaging::Session, SessionImpl>(s); if (!impl) { - throw qpid::Exception(QPID_MSG("Configuration error; require qpid::client::amqp0_10::SessionImpl")); + throw SessionError(QPID_MSG("Configuration error; require qpid::client::amqp0_10::SessionImpl")); } return *impl; } @@ -297,7 +308,7 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag if (incoming.getNextDestination(destination, adjust(timeout))) { Receivers::const_iterator i = receivers.find(destination); if (i == receivers.end()) { - throw qpid::Exception(QPID_MSG("Received message for unknown destination " << destination)); + throw qpid::messaging::ReceiverError(QPID_MSG("Received message for unknown destination " << destination)); } else { receiver = i->second; } @@ -307,6 +318,17 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag } } catch (TransportFailure&) { reconnect(); + } catch (const qpid::framing::ResourceLimitExceededException& e) { + if (backoff()) return false; + else throw qpid::messaging::TargetCapacityExceeded(e.what()); + } catch (const qpid::framing::UnauthorizedAccessException& e) { + throw qpid::messaging::UnauthorizedAccess(e.what()); + } catch (const qpid::SessionException& e) { + throw qpid::messaging::SessionError(e.what()); + } catch (const qpid::ConnectionException& e) { + throw qpid::messaging::ConnectionError(e.what()); + } catch (const qpid::ChannelException& e) { + throw qpid::messaging::MessagingException(e.what()); } } } @@ -314,8 +336,8 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag qpid::messaging::Receiver SessionImpl::nextReceiver(qpid::messaging::Duration timeout) { qpid::messaging::Receiver receiver; - if (!nextReceiver(receiver, timeout)) throw Receiver::NoMessageAvailable(); - if (!receiver) throw qpid::Exception("Bad receiver returned!"); + if (!nextReceiver(receiver, timeout)) throw NoMessageAvailable(); + if (!receiver) throw SessionError("Bad receiver returned!"); return receiver; } @@ -377,7 +399,7 @@ uint32_t SessionImpl::pendingAckImpl(const std::string* destination) void SessionImpl::syncImpl(bool block) { if (block) session.sync(); - else session.sendSyncRequest(); + else session.flush(); } void SessionImpl::commitImpl() @@ -435,7 +457,12 @@ void SessionImpl::senderCancelled(const std::string& name) void SessionImpl::reconnect() { - connection->connect(); + connection->open(); +} + +bool SessionImpl::backoff() +{ + return connection->backoff(); } qpid::messaging::Connection SessionImpl::getConnection() const diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h index 8b098e65d6..e1229055f7 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -23,11 +23,13 @@ */ #include "qpid/messaging/SessionImpl.h" #include "qpid/messaging/Duration.h" +#include "qpid/messaging/exceptions.h" #include "qpid/client/Session.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/client/amqp0_10/AddressResolution.h" #include "qpid/client/amqp0_10/IncomingMessages.h" #include "qpid/sys/Mutex.h" +#include "qpid/framing/reply_exceptions.h" #include <boost/intrusive_ptr.hpp> namespace qpid { @@ -73,6 +75,7 @@ class SessionImpl : public qpid::messaging::SessionImpl qpid::messaging::Receiver nextReceiver(qpid::messaging::Duration timeout); qpid::messaging::Connection getConnection() const; + void checkError(); bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout); @@ -93,9 +96,20 @@ class SessionImpl : public qpid::messaging::SessionImpl qpid::sys::Mutex::ScopedLock l(lock); f(); return true; - } catch (TransportFailure&) { + } catch (const qpid::TransportFailure&) { reconnect(); return false; + } catch (const qpid::framing::ResourceLimitExceededException& e) { + if (backoff()) return false; + else throw qpid::messaging::TargetCapacityExceeded(e.what()); + } catch (const qpid::framing::UnauthorizedAccessException& e) { + throw qpid::messaging::UnauthorizedAccess(e.what()); + } catch (const qpid::SessionException& e) { + throw qpid::messaging::SessionError(e.what()); + } catch (const qpid::ConnectionException& e) { + throw qpid::messaging::ConnectionError(e.what()); + } catch (const qpid::ChannelException& e) { + throw qpid::messaging::MessagingException(e.what()); } } @@ -118,6 +132,7 @@ class SessionImpl : public qpid::messaging::SessionImpl bool getIncoming(IncomingMessages::Handler& handler, qpid::messaging::Duration timeout); bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer); void reconnect(); + bool backoff(); void commitImpl(); void rollbackImpl(); |
