diff options
| author | Gordon Sim <gsim@apache.org> | 2010-04-09 15:08:47 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2010-04-09 15:08:47 +0000 |
| commit | ef958e7b221d38ec76c392f76a66978211d6d1f9 (patch) | |
| tree | 111ba60857b7613f18e64f5c817d6e271428013e /cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | |
| parent | 2daf8e11866364ff4955ee69625bf401e9baa93c (diff) | |
| download | qpid-python-ef958e7b221d38ec76c392f76a66978211d6d1f9.tar.gz | |
QPID-664: changed connect() back to open(),removed detach(),defined new exception hierarchy, added ability to re-use reconnect/replay logic for resource-limit-exceeded errors
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@932451 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp')
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 52 |
1 files changed, 40 insertions, 12 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index 30b75ff4ff..7e5018fc5f 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -21,10 +21,12 @@ #include "ConnectionImpl.h" #include "SessionImpl.h" #include "SimpleUrlParser.h" +#include "qpid/messaging/exceptions.h" #include "qpid/messaging/Session.h" #include "qpid/messaging/PrivateImplRef.h" #include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" +#include "qpid/Url.h" #include <boost/intrusive_ptr.hpp> #include <vector> @@ -97,7 +99,7 @@ void convert(const Variant::Map& from, ConnectionSettings& to) ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) : reconnect(true), timeout(-1), limit(-1), minReconnectInterval(3), maxReconnectInterval(60), - retries(0) + retries(0), reconnectOnLimitExceeded(true) { QPID_LOG(debug, "Created connection with " << options); setOptions(options); @@ -117,7 +119,8 @@ void ConnectionImpl::setOptions(const Variant::Map& options) setIfFound(options, "reconnect-interval-min", minReconnectInterval); setIfFound(options, "reconnect-interval-max", maxReconnectInterval); } - setIfFound(options, "reconnect-urls", urls); + setIfFound(options, "reconnect-urls", urls); + setIfFound(options, "x-reconnect-on-limit-exceeded", reconnectOnLimitExceeded); } void ConnectionImpl::setOption(const std::string& name, const Variant& value) @@ -147,7 +150,7 @@ void ConnectionImpl::detach() connection.close(); } -bool ConnectionImpl::isConnected() +bool ConnectionImpl::isOpen() { qpid::sys::Mutex::ScopedLock l(lock); return connection.isOpen(); @@ -192,13 +195,13 @@ qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const st } try { getImplPtr(impl)->setSession(connection.newSession(name)); - } catch (const TransportFailure&) { - connect(); + } catch (const qpid::TransportFailure&) { + open(); } return impl; } -void ConnectionImpl::connect() +void ConnectionImpl::open() { qpid::sys::AbsTime start = qpid::sys::now(); qpid::sys::ScopedLock<qpid::sys::Semaphore> l(semaphore); @@ -217,9 +220,15 @@ bool expired(const qpid::sys::AbsTime& start, int64_t timeout) void ConnectionImpl::connect(const qpid::sys::AbsTime& started) { for (int64_t i = minReconnectInterval; !tryConnect(); i = std::min(i * 2, maxReconnectInterval)) { - if (!reconnect) throw TransportFailure("Failed to connect (reconnect disabled)"); - if (limit >= 0 && retries++ >= limit) throw TransportFailure("Failed to connect within reconnect limit"); - if (expired(started, timeout)) throw TransportFailure("Failed to connect within reconnect timeout"); + if (!reconnect) { + throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)"); + } + if (limit >= 0 && retries++ >= limit) { + throw qpid::messaging::TransportFailure("Failed to connect within reconnect limit"); + } + if (expired(started, timeout)) { + throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout"); + } else qpid::sys::sleep(i); } retries = 0; @@ -246,7 +255,7 @@ bool ConnectionImpl::tryConnect(const std::vector<std::string>& urls) } QPID_LOG(info, "Connected to " << *i); return true; - } catch (const Exception& e) { + } catch (const qpid::Exception& e) { //TODO: need to fix timeout on //qpid::client::Connection::open() so that it throws //TransportFailure rather than a ConnectionException @@ -264,8 +273,27 @@ bool ConnectionImpl::resetSessions() getImplPtr(i->second)->setSession(connection.newSession(i->first)); } return true; - } catch (const TransportFailure&) { - QPID_LOG(debug, "Connection failed while re-inialising sessions"); + } catch (const qpid::TransportFailure&) { + QPID_LOG(debug, "Connection failed while re-initialising sessions"); + return false; + } catch (const qpid::framing::ResourceLimitExceededException& e) { + if (reconnectOnLimitExceeded) { + QPID_LOG(debug, "Detaching and reconnecting due to: " << e.what()); + detach(); + return false; + } else { + throw qpid::messaging::TargetCapacityExceeded(e.what()); + } + } +} + +bool ConnectionImpl::backoff() +{ + if (reconnectOnLimitExceeded) { + detach(); + open(); + return true; + } else { return false; } } |
