summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp')
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp179
1 files changed, 84 insertions, 95 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index 5a545c1f6a..cc6e9b9ab2 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -20,7 +20,6 @@
*/
#include "ConnectionImpl.h"
#include "SessionImpl.h"
-#include "SimpleUrlParser.h"
#include "qpid/messaging/exceptions.h"
#include "qpid/messaging/Session.h"
#include "qpid/messaging/PrivateImplRef.h"
@@ -39,26 +38,18 @@ using qpid::types::Variant;
using qpid::types::VAR_LIST;
using qpid::framing::Uuid;
-void convert(const Variant::List& from, std::vector<std::string>& to)
-{
- for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) {
- to.push_back(i->asString());
- }
+namespace {
+void merge(const std::string& value, std::vector<std::string>& list) {
+ if (std::find(list.begin(), list.end(), value) == list.end())
+ list.push_back(value);
}
-template <class T> bool setIfFound(const Variant::Map& map, const std::string& key, T& value)
+void merge(const Variant::List& from, std::vector<std::string>& to)
{
- Variant::Map::const_iterator i = map.find(key);
- if (i != map.end()) {
- value = (T) i->second;
- QPID_LOG(debug, "option " << key << " specified as " << i->second);
- return true;
- } else {
- return false;
- }
+ for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i)
+ merge(i->asString(), to);
}
-namespace {
std::string asString(const std::vector<std::string>& v) {
std::stringstream os;
os << "[";
@@ -71,49 +62,8 @@ std::string asString(const std::vector<std::string>& v) {
}
}
-template <> bool setIfFound< std::vector<std::string> >(const Variant::Map& map,
- const std::string& key,
- std::vector<std::string>& value)
-{
- Variant::Map::const_iterator i = map.find(key);
- if (i != map.end()) {
- value.clear();
- if (i->second.getType() == VAR_LIST) {
- convert(i->second.asList(), value);
- } else {
- value.push_back(i->second.asString());
- }
- QPID_LOG(debug, "option " << key << " specified as " << asString(value));
- return true;
- } else {
- return false;
- }
-}
-
-void convert(const Variant::Map& from, ConnectionSettings& to)
-{
- setIfFound(from, "username", to.username);
- setIfFound(from, "password", to.password);
- setIfFound(from, "sasl-mechanism", to.mechanism);
- setIfFound(from, "sasl-service", to.service);
- setIfFound(from, "sasl-min-ssf", to.minSsf);
- setIfFound(from, "sasl-max-ssf", to.maxSsf);
-
- setIfFound(from, "heartbeat", to.heartbeat);
- setIfFound(from, "tcp-nodelay", to.tcpNoDelay);
-
- setIfFound(from, "locale", to.locale);
- setIfFound(from, "max-channels", to.maxChannels);
- setIfFound(from, "max-frame-size", to.maxFrameSize);
- setIfFound(from, "bounds", to.bounds);
-
- setIfFound(from, "transport", to.protocol);
-
- setIfFound(from, "ssl-cert-name", to.sslCertName);
-}
-
ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
- reconnect(false), timeout(-1), limit(-1),
+ replaceUrls(false), reconnect(false), timeout(-1), limit(-1),
minReconnectInterval(3), maxReconnectInterval(60),
retries(0), reconnectOnLimitExceeded(true)
{
@@ -124,27 +74,69 @@ ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& optio
void ConnectionImpl::setOptions(const Variant::Map& options)
{
- sys::Mutex::ScopedLock l(lock);
- convert(options, settings);
- setIfFound(options, "reconnect", reconnect);
- setIfFound(options, "reconnect-timeout", timeout);
- setIfFound(options, "reconnect-limit", limit);
- int64_t reconnectInterval;
- if (setIfFound(options, "reconnect-interval", reconnectInterval)) {
- minReconnectInterval = maxReconnectInterval = reconnectInterval;
- } else {
- setIfFound(options, "reconnect-interval-min", minReconnectInterval);
- setIfFound(options, "reconnect-interval-max", maxReconnectInterval);
+ for (Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) {
+ setOption(i->first, i->second);
}
- setIfFound(options, "reconnect-urls", urls);
- setIfFound(options, "x-reconnect-on-limit-exceeded", reconnectOnLimitExceeded);
}
void ConnectionImpl::setOption(const std::string& name, const Variant& value)
{
- Variant::Map options;
- options[name] = value;
- setOptions(options);
+ sys::Mutex::ScopedLock l(lock);
+ if (name == "reconnect") {
+ reconnect = value;
+ } else if (name == "reconnect-timeout" || name == "reconnect_timeout") {
+ timeout = value;
+ } else if (name == "reconnect-limit" || name == "reconnect_limit") {
+ limit = value;
+ } else if (name == "reconnect-interval" || name == "reconnect_interval") {
+ maxReconnectInterval = minReconnectInterval = value;
+ } else if (name == "reconnect-interval-min" || name == "reconnect_interval_min") {
+ minReconnectInterval = value;
+ } else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") {
+ maxReconnectInterval = value;
+ } else if (name == "reconnect-urls-replace" || name == "reconnect_urls_replace") {
+ replaceUrls = value.asBool();
+ } else if (name == "reconnect-urls" || name == "reconnect_urls") {
+ if (replaceUrls) urls.clear();
+ if (value.getType() == VAR_LIST) {
+ merge(value.asList(), urls);
+ } else {
+ merge(value.asString(), urls);
+ }
+ } else if (name == "username") {
+ settings.username = value.asString();
+ } else if (name == "password") {
+ settings.password = value.asString();
+ } else if (name == "sasl-mechanism" || name == "sasl_mechanism" ||
+ name == "sasl-mechanisms" || name == "sasl_mechanisms") {
+ settings.mechanism = value.asString();
+ } else if (name == "sasl-service" || name == "sasl_service") {
+ settings.service = value.asString();
+ } else if (name == "sasl-min-ssf" || name == "sasl_min_ssf") {
+ settings.minSsf = value;
+ } else if (name == "sasl-max-ssf" || name == "sasl_max_ssf") {
+ settings.maxSsf = value;
+ } else if (name == "heartbeat") {
+ settings.heartbeat = value;
+ } else if (name == "tcp-nodelay" || name == "tcp_nodelay") {
+ settings.tcpNoDelay = value;
+ } else if (name == "locale") {
+ settings.locale = value.asString();
+ } else if (name == "max-channels" || name == "max_channels") {
+ settings.maxChannels = value;
+ } else if (name == "max-frame-size" || name == "max_frame_size") {
+ settings.maxFrameSize = value;
+ } else if (name == "bounds") {
+ settings.bounds = value;
+ } else if (name == "transport") {
+ settings.protocol = value.asString();
+ } else if (name == "ssl-cert-name" || name == "ssl_cert_name") {
+ settings.sslCertName = value.asString();
+ } else if (name == "x-reconnect-on-limit-exceeded" || name == "x_reconnect_on_limit_exceeded") {
+ reconnectOnLimitExceeded = value;
+ } else {
+ throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised"));
+ }
}
@@ -214,7 +206,7 @@ qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const st
sessions[name] = impl;
break;
} catch (const qpid::TransportFailure&) {
- open();
+ reopen();
} catch (const qpid::SessionException& e) {
throw qpid::messaging::SessionError(e.what());
} catch (const std::exception& e) {
@@ -235,6 +227,15 @@ void ConnectionImpl::open()
catch (const qpid::Exception& e) { throw messaging::ConnectionError(e.what()); }
}
+void ConnectionImpl::reopen()
+{
+ if (!reconnect) {
+ throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
+ }
+ open();
+}
+
+
bool expired(const qpid::sys::AbsTime& start, int64_t timeout)
{
if (timeout == 0) return true;
@@ -262,14 +263,9 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
}
void ConnectionImpl::mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&) {
- if (more.size()) {
- for (size_t i = 0; i < more.size(); ++i) {
- if (std::find(urls.begin(), urls.end(), more[i].str()) == urls.end()) {
- urls.push_back(more[i].str());
- }
- }
- QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls));
- }
+ for (std::vector<Url>::const_iterator i = more.begin(); i != more.end(); ++i)
+ merge(i->str(), urls);
+ QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls));
}
bool ConnectionImpl::tryConnect()
@@ -278,21 +274,14 @@ bool ConnectionImpl::tryConnect()
for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
try {
QPID_LOG(info, "Trying to connect to " << *i << "...");
- //TODO: when url support is more complete can avoid this test here
- if (i->find("amqp:") == 0) {
- Url url(*i);
- connection.open(url, settings);
- } else {
- SimpleUrlParser::parse(*i, settings);
- connection.open(settings);
- }
+ Url url(*i);
+ if (url.getUser().size()) settings.username = url.getUser();
+ if (url.getPass().size()) settings.password = url.getPass();
+ connection.open(url, settings);
QPID_LOG(info, "Connected to " << *i);
mergeUrls(connection.getInitialBrokers(), l);
return resetSessions(l);
- } catch (const qpid::ConnectionException& e) {
- //TODO: need to fix timeout on
- //qpid::client::Connection::open() so that it throws
- //TransportFailure rather than a ConnectionException
+ } catch (const qpid::TransportFailure& e) {
QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
}
}