summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2010-04-09 15:08:47 +0000
committerGordon Sim <gsim@apache.org>2010-04-09 15:08:47 +0000
commitef958e7b221d38ec76c392f76a66978211d6d1f9 (patch)
tree111ba60857b7613f18e64f5c817d6e271428013e /cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
parent2daf8e11866364ff4955ee69625bf401e9baa93c (diff)
downloadqpid-python-ef958e7b221d38ec76c392f76a66978211d6d1f9.tar.gz
QPID-664: changed connect() back to open(),removed detach(),defined new exception hierarchy, added ability to re-use reconnect/replay logic for resource-limit-exceeded errors
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@932451 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp')
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp52
1 files changed, 40 insertions, 12 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index 30b75ff4ff..7e5018fc5f 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -21,10 +21,12 @@
#include "ConnectionImpl.h"
#include "SessionImpl.h"
#include "SimpleUrlParser.h"
+#include "qpid/messaging/exceptions.h"
#include "qpid/messaging/Session.h"
#include "qpid/messaging/PrivateImplRef.h"
#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
+#include "qpid/Url.h"
#include <boost/intrusive_ptr.hpp>
#include <vector>
@@ -97,7 +99,7 @@ void convert(const Variant::Map& from, ConnectionSettings& to)
ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
reconnect(true), timeout(-1), limit(-1),
minReconnectInterval(3), maxReconnectInterval(60),
- retries(0)
+ retries(0), reconnectOnLimitExceeded(true)
{
QPID_LOG(debug, "Created connection with " << options);
setOptions(options);
@@ -117,7 +119,8 @@ void ConnectionImpl::setOptions(const Variant::Map& options)
setIfFound(options, "reconnect-interval-min", minReconnectInterval);
setIfFound(options, "reconnect-interval-max", maxReconnectInterval);
}
- setIfFound(options, "reconnect-urls", urls);
+ setIfFound(options, "reconnect-urls", urls);
+ setIfFound(options, "x-reconnect-on-limit-exceeded", reconnectOnLimitExceeded);
}
void ConnectionImpl::setOption(const std::string& name, const Variant& value)
@@ -147,7 +150,7 @@ void ConnectionImpl::detach()
connection.close();
}
-bool ConnectionImpl::isConnected()
+bool ConnectionImpl::isOpen()
{
qpid::sys::Mutex::ScopedLock l(lock);
return connection.isOpen();
@@ -192,13 +195,13 @@ qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const st
}
try {
getImplPtr(impl)->setSession(connection.newSession(name));
- } catch (const TransportFailure&) {
- connect();
+ } catch (const qpid::TransportFailure&) {
+ open();
}
return impl;
}
-void ConnectionImpl::connect()
+void ConnectionImpl::open()
{
qpid::sys::AbsTime start = qpid::sys::now();
qpid::sys::ScopedLock<qpid::sys::Semaphore> l(semaphore);
@@ -217,9 +220,15 @@ bool expired(const qpid::sys::AbsTime& start, int64_t timeout)
void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
{
for (int64_t i = minReconnectInterval; !tryConnect(); i = std::min(i * 2, maxReconnectInterval)) {
- if (!reconnect) throw TransportFailure("Failed to connect (reconnect disabled)");
- if (limit >= 0 && retries++ >= limit) throw TransportFailure("Failed to connect within reconnect limit");
- if (expired(started, timeout)) throw TransportFailure("Failed to connect within reconnect timeout");
+ if (!reconnect) {
+ throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
+ }
+ if (limit >= 0 && retries++ >= limit) {
+ throw qpid::messaging::TransportFailure("Failed to connect within reconnect limit");
+ }
+ if (expired(started, timeout)) {
+ throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout");
+ }
else qpid::sys::sleep(i);
}
retries = 0;
@@ -246,7 +255,7 @@ bool ConnectionImpl::tryConnect(const std::vector<std::string>& urls)
}
QPID_LOG(info, "Connected to " << *i);
return true;
- } catch (const Exception& e) {
+ } catch (const qpid::Exception& e) {
//TODO: need to fix timeout on
//qpid::client::Connection::open() so that it throws
//TransportFailure rather than a ConnectionException
@@ -264,8 +273,27 @@ bool ConnectionImpl::resetSessions()
getImplPtr(i->second)->setSession(connection.newSession(i->first));
}
return true;
- } catch (const TransportFailure&) {
- QPID_LOG(debug, "Connection failed while re-inialising sessions");
+ } catch (const qpid::TransportFailure&) {
+ QPID_LOG(debug, "Connection failed while re-initialising sessions");
+ return false;
+ } catch (const qpid::framing::ResourceLimitExceededException& e) {
+ if (reconnectOnLimitExceeded) {
+ QPID_LOG(debug, "Detaching and reconnecting due to: " << e.what());
+ detach();
+ return false;
+ } else {
+ throw qpid::messaging::TargetCapacityExceeded(e.what());
+ }
+ }
+}
+
+bool ConnectionImpl::backoff()
+{
+ if (reconnectOnLimitExceeded) {
+ detach();
+ open();
+ return true;
+ } else {
return false;
}
}