diff options
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 148 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.h | 21 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 2 |
3 files changed, 113 insertions, 58 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index 4242850192..9c1c4e0735 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -1,4 +1,4 @@ -/* + /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -20,6 +20,7 @@ */ #include "ConnectionImpl.h" #include "SessionImpl.h" +#include "SimpleUrlParser.h" #include "qpid/messaging/Session.h" #include "qpid/client/PrivateImplRef.h" #include "qpid/framing/Uuid.h" @@ -33,13 +34,42 @@ namespace amqp0_10 { using qpid::messaging::Variant; using qpid::framing::Uuid; -using namespace qpid::sys; -template <class T> void setIfFound(const Variant::Map& map, const std::string& key, T& value) +void convert(const Variant::List& from, std::vector<std::string>& to) +{ + for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) { + to.push_back(i->asString()); + } +} + +template <class T> bool setIfFound(const Variant::Map& map, const std::string& key, T& value) { Variant::Map::const_iterator i = map.find(key); if (i != map.end()) { value = (T) i->second; + QPID_LOG(debug, "option " << key << " specified as " << i->second); + return true; + } else { + QPID_LOG(debug, "option " << key << " not specified"); + return false; + } +} + +template <> +bool setIfFound< std::vector<std::string> >(const Variant::Map& map, + const std::string& key, + std::vector<std::string>& value) +{ + Variant::Map::const_iterator i = map.find(key); + if (i != map.end()) { + if (i->second.getType() == qpid::messaging::VAR_LIST) { + convert(i->second.asList(), value); + } else { + value.push_back(i->second.asString()); + } + return true; + } else { + return false; } } @@ -59,24 +89,47 @@ void convert(const Variant::Map& from, ConnectionSettings& to) setIfFound(from, "max-channels", to.maxChannels); setIfFound(from, "max-frame-size", to.maxFrameSize); setIfFound(from, "bounds", to.bounds); + + setIfFound(from, "protocol", to.protocol); } ConnectionImpl::ConnectionImpl(const Variant::Map& options) : - reconnectionEnabled(true), timeout(-1), - minRetryInterval(1), maxRetryInterval(30) + reconnect(true), timeout(-1), limit(-1), + minReconnectInterval(3), maxReconnectInterval(60), + retries(0) +{ + QPID_LOG(debug, "Created connection with " << options); + setOptions(options); +} + +void ConnectionImpl::setOptions(const Variant::Map& options) { - QPID_LOG(debug, "Opening connection to " << url << " with " << options); convert(options, settings); - setIfFound(options, "reconnection-enabled", reconnectionEnabled); - setIfFound(options, "reconnection-timeout", timeout); - setIfFound(options, "min-retry-interval", minRetryInterval); - setIfFound(options, "max-retry-interval", maxRetryInterval); + setIfFound(options, "reconnect", reconnect); + setIfFound(options, "reconnect-timeout", timeout); + setIfFound(options, "reconnect-limit", limit); + int64_t reconnectInterval; + if (setIfFound(options, "reconnect-interval", reconnectInterval)) { + minReconnectInterval = maxReconnectInterval = reconnectInterval; + } else { + setIfFound(options, "min-reconnect-interval", minReconnectInterval); + setIfFound(options, "max-reconnect-interval", maxReconnectInterval); + } + setIfFound(options, "urls", urls); +} + +void ConnectionImpl::setOption(const std::string& name, const Variant& value) +{ + Variant::Map options; + options[name] = value; + setOptions(options); + QPID_LOG(debug, "Set " << name << " to " << value); } void ConnectionImpl::open(const std::string& u) { - url = u; - connection.open(url, settings); + urls.push_back(u); + connect(); } void ConnectionImpl::close() @@ -134,64 +187,65 @@ qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const st try { getImplPtr(impl)->setSession(connection.newSession(name)); } catch (const TransportFailure&) { - reconnect(); + connect(); } return impl; } -void ConnectionImpl::reconnect() +void ConnectionImpl::connect() { - AbsTime start = now(); - ScopedLock<Semaphore> l(semaphore); + qpid::sys::AbsTime start = qpid::sys::now(); + qpid::sys::ScopedLock<qpid::sys::Semaphore> l(semaphore); if (!connection.isOpen()) connect(start); } -bool expired(const AbsTime& start, int timeout) +bool expired(const qpid::sys::AbsTime& start, int64_t timeout) { if (timeout == 0) return true; if (timeout < 0) return false; - Duration used(start, now()); - Duration allowed = timeout * TIME_SEC; - return allowed > used; + qpid::sys::Duration used(start, qpid::sys::now()); + qpid::sys::Duration allowed = timeout * qpid::sys::TIME_SEC; + return allowed < used; } -void ConnectionImpl::connect(const AbsTime& started) +void ConnectionImpl::connect(const qpid::sys::AbsTime& started) { - for (int i = minRetryInterval; !tryConnect(); i = std::min(i * 2, maxRetryInterval)) { - if (expired(started, timeout)) throw TransportFailure(); + for (int64_t i = minReconnectInterval; !tryConnect(); i = std::min(i * 2, maxReconnectInterval)) { + if (!reconnect) throw TransportFailure("Failed to connect (reconnect disabled)"); + if (limit >= 0 && retries++ >= limit) throw TransportFailure("Failed to connect within reconnect limit"); + if (expired(started, timeout)) throw TransportFailure("Failed to connect within reconnect timeout"); else qpid::sys::sleep(i); } + retries = 0; } bool ConnectionImpl::tryConnect() { - if (tryConnect(url) || - (failoverListener.get() && tryConnect(failoverListener->getKnownBrokers()))) - { - return resetSessions(); - } else { - return false; - } + if (tryConnect(urls)) return resetSessions(); + else return false; } -bool ConnectionImpl::tryConnect(const Url& u) +bool ConnectionImpl::tryConnect(const std::vector<std::string>& urls) { - try { - QPID_LOG(info, "Trying to connect to " << url << "..."); - connection.open(u, settings); - failoverListener.reset(new FailoverListener(connection)); - return true; - } catch (const Exception& e) { - //TODO: need to fix timeout on open so that it throws TransportFailure - QPID_LOG(info, "Failed to connect to " << u << ": " << e.what()); - } - return false; -} - -bool ConnectionImpl::tryConnect(const std::vector<Url>& urls) -{ - for (std::vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) { - if (tryConnect(*i)) return true; + for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) { + try { + QPID_LOG(info, "Trying to connect to " << *i << "..."); + //TODO: when url support is more complete can avoid this test here + if (i->find("amqp:") == 0) { + Url url(*i); + connection.open(url, settings); + } else { + SimpleUrlParser::parse(*i, settings); + connection.open(settings); + } + QPID_LOG(info, "Connected to " << *i); + return true; + } catch (const Exception& e) { + //TODO: need to fix timeout on + //qpid::client::Connection::open() so that it throws + //TransportFailure rather than a ConnectionException + QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what()); + } } return false; } diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h index d9d0d1e065..37a78b2373 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -25,7 +25,6 @@ #include "qpid/messaging/Variant.h" #include "qpid/Url.h" #include "qpid/client/Connection.h" -#include "qpid/client/FailoverListener.h" #include "qpid/client/ConnectionSettings.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Semaphore.h" @@ -46,7 +45,8 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl qpid::messaging::Session newSession(bool transactional, const std::string& name); qpid::messaging::Session getSession(const std::string& name) const; void closed(SessionImpl&); - void reconnect(); + void connect(); + void setOption(const std::string& name, const qpid::messaging::Variant& value); private: typedef std::map<std::string, qpid::messaging::Session> Sessions; @@ -54,18 +54,19 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl qpid::sys::Semaphore semaphore;//used to coordinate reconnection Sessions sessions; qpid::client::Connection connection; - std::auto_ptr<FailoverListener> failoverListener; - qpid::Url url; + std::vector<std::string> urls; qpid::client::ConnectionSettings settings; - bool reconnectionEnabled; - int timeout; - int minRetryInterval; - int maxRetryInterval; + bool reconnect; + int64_t timeout; + int32_t limit; + int64_t minReconnectInterval; + int64_t maxReconnectInterval; + int32_t retries; + void setOptions(const qpid::messaging::Variant::Map& options); void connect(const qpid::sys::AbsTime& started); bool tryConnect(); - bool tryConnect(const std::vector<Url>& urls); - bool tryConnect(const Url&); + bool tryConnect(const std::vector<std::string>& urls); bool resetSessions(); }; }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 9823dba6e1..d9fd3a5da1 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -431,7 +431,7 @@ void SessionImpl::senderCancelled(const std::string& name) void SessionImpl::reconnect() { - connection.reconnect(); + connection.connect(); } qpid::messaging::Connection SessionImpl::getConnection() const |
