summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2010-03-11 18:23:46 +0000
committerGordon Sim <gsim@apache.org>2010-03-11 18:23:46 +0000
commitb00e749a170b09db7e24161baa809c11fa65531f (patch)
tree1356c7ecbc3555892d9be10ec9d86455335259fb /cpp/src/qpid
parent88086e0099c0fb67ac3a01c5f8793c0634b946a0 (diff)
downloadqpid-python-b00e749a170b09db7e24161baa809c11fa65531f.tar.gz
QPID-2382: Created separate utility class for handling updates from failover exchange; cleaned up reconnection options
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@921971 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp148
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.h21
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp2
-rw-r--r--cpp/src/qpid/messaging/AddressParser.cpp1
-rw-r--r--cpp/src/qpid/messaging/Connection.cpp37
-rw-r--r--cpp/src/qpid/messaging/ConnectionImpl.h2
6 files changed, 120 insertions, 91 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index 4242850192..9c1c4e0735 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -1,4 +1,4 @@
-/*
+ /*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -20,6 +20,7 @@
*/
#include "ConnectionImpl.h"
#include "SessionImpl.h"
+#include "SimpleUrlParser.h"
#include "qpid/messaging/Session.h"
#include "qpid/client/PrivateImplRef.h"
#include "qpid/framing/Uuid.h"
@@ -33,13 +34,42 @@ namespace amqp0_10 {
using qpid::messaging::Variant;
using qpid::framing::Uuid;
-using namespace qpid::sys;
-template <class T> void setIfFound(const Variant::Map& map, const std::string& key, T& value)
+void convert(const Variant::List& from, std::vector<std::string>& to)
+{
+ for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) {
+ to.push_back(i->asString());
+ }
+}
+
+template <class T> bool setIfFound(const Variant::Map& map, const std::string& key, T& value)
{
Variant::Map::const_iterator i = map.find(key);
if (i != map.end()) {
value = (T) i->second;
+ QPID_LOG(debug, "option " << key << " specified as " << i->second);
+ return true;
+ } else {
+ QPID_LOG(debug, "option " << key << " not specified");
+ return false;
+ }
+}
+
+template <>
+bool setIfFound< std::vector<std::string> >(const Variant::Map& map,
+ const std::string& key,
+ std::vector<std::string>& value)
+{
+ Variant::Map::const_iterator i = map.find(key);
+ if (i != map.end()) {
+ if (i->second.getType() == qpid::messaging::VAR_LIST) {
+ convert(i->second.asList(), value);
+ } else {
+ value.push_back(i->second.asString());
+ }
+ return true;
+ } else {
+ return false;
}
}
@@ -59,24 +89,47 @@ void convert(const Variant::Map& from, ConnectionSettings& to)
setIfFound(from, "max-channels", to.maxChannels);
setIfFound(from, "max-frame-size", to.maxFrameSize);
setIfFound(from, "bounds", to.bounds);
+
+ setIfFound(from, "protocol", to.protocol);
}
ConnectionImpl::ConnectionImpl(const Variant::Map& options) :
- reconnectionEnabled(true), timeout(-1),
- minRetryInterval(1), maxRetryInterval(30)
+ reconnect(true), timeout(-1), limit(-1),
+ minReconnectInterval(3), maxReconnectInterval(60),
+ retries(0)
+{
+ QPID_LOG(debug, "Created connection with " << options);
+ setOptions(options);
+}
+
+void ConnectionImpl::setOptions(const Variant::Map& options)
{
- QPID_LOG(debug, "Opening connection to " << url << " with " << options);
convert(options, settings);
- setIfFound(options, "reconnection-enabled", reconnectionEnabled);
- setIfFound(options, "reconnection-timeout", timeout);
- setIfFound(options, "min-retry-interval", minRetryInterval);
- setIfFound(options, "max-retry-interval", maxRetryInterval);
+ setIfFound(options, "reconnect", reconnect);
+ setIfFound(options, "reconnect-timeout", timeout);
+ setIfFound(options, "reconnect-limit", limit);
+ int64_t reconnectInterval;
+ if (setIfFound(options, "reconnect-interval", reconnectInterval)) {
+ minReconnectInterval = maxReconnectInterval = reconnectInterval;
+ } else {
+ setIfFound(options, "min-reconnect-interval", minReconnectInterval);
+ setIfFound(options, "max-reconnect-interval", maxReconnectInterval);
+ }
+ setIfFound(options, "urls", urls);
+}
+
+void ConnectionImpl::setOption(const std::string& name, const Variant& value)
+{
+ Variant::Map options;
+ options[name] = value;
+ setOptions(options);
+ QPID_LOG(debug, "Set " << name << " to " << value);
}
void ConnectionImpl::open(const std::string& u)
{
- url = u;
- connection.open(url, settings);
+ urls.push_back(u);
+ connect();
}
void ConnectionImpl::close()
@@ -134,64 +187,65 @@ qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const st
try {
getImplPtr(impl)->setSession(connection.newSession(name));
} catch (const TransportFailure&) {
- reconnect();
+ connect();
}
return impl;
}
-void ConnectionImpl::reconnect()
+void ConnectionImpl::connect()
{
- AbsTime start = now();
- ScopedLock<Semaphore> l(semaphore);
+ qpid::sys::AbsTime start = qpid::sys::now();
+ qpid::sys::ScopedLock<qpid::sys::Semaphore> l(semaphore);
if (!connection.isOpen()) connect(start);
}
-bool expired(const AbsTime& start, int timeout)
+bool expired(const qpid::sys::AbsTime& start, int64_t timeout)
{
if (timeout == 0) return true;
if (timeout < 0) return false;
- Duration used(start, now());
- Duration allowed = timeout * TIME_SEC;
- return allowed > used;
+ qpid::sys::Duration used(start, qpid::sys::now());
+ qpid::sys::Duration allowed = timeout * qpid::sys::TIME_SEC;
+ return allowed < used;
}
-void ConnectionImpl::connect(const AbsTime& started)
+void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
{
- for (int i = minRetryInterval; !tryConnect(); i = std::min(i * 2, maxRetryInterval)) {
- if (expired(started, timeout)) throw TransportFailure();
+ for (int64_t i = minReconnectInterval; !tryConnect(); i = std::min(i * 2, maxReconnectInterval)) {
+ if (!reconnect) throw TransportFailure("Failed to connect (reconnect disabled)");
+ if (limit >= 0 && retries++ >= limit) throw TransportFailure("Failed to connect within reconnect limit");
+ if (expired(started, timeout)) throw TransportFailure("Failed to connect within reconnect timeout");
else qpid::sys::sleep(i);
}
+ retries = 0;
}
bool ConnectionImpl::tryConnect()
{
- if (tryConnect(url) ||
- (failoverListener.get() && tryConnect(failoverListener->getKnownBrokers())))
- {
- return resetSessions();
- } else {
- return false;
- }
+ if (tryConnect(urls)) return resetSessions();
+ else return false;
}
-bool ConnectionImpl::tryConnect(const Url& u)
+bool ConnectionImpl::tryConnect(const std::vector<std::string>& urls)
{
- try {
- QPID_LOG(info, "Trying to connect to " << url << "...");
- connection.open(u, settings);
- failoverListener.reset(new FailoverListener(connection));
- return true;
- } catch (const Exception& e) {
- //TODO: need to fix timeout on open so that it throws TransportFailure
- QPID_LOG(info, "Failed to connect to " << u << ": " << e.what());
- }
- return false;
-}
-
-bool ConnectionImpl::tryConnect(const std::vector<Url>& urls)
-{
- for (std::vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
- if (tryConnect(*i)) return true;
+ for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
+ try {
+ QPID_LOG(info, "Trying to connect to " << *i << "...");
+ //TODO: when url support is more complete can avoid this test here
+ if (i->find("amqp:") == 0) {
+ Url url(*i);
+ connection.open(url, settings);
+ } else {
+ SimpleUrlParser::parse(*i, settings);
+ connection.open(settings);
+ }
+ QPID_LOG(info, "Connected to " << *i);
+ return true;
+ } catch (const Exception& e) {
+ //TODO: need to fix timeout on
+ //qpid::client::Connection::open() so that it throws
+ //TransportFailure rather than a ConnectionException
+ QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
+ }
}
return false;
}
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
index d9d0d1e065..37a78b2373 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
@@ -25,7 +25,6 @@
#include "qpid/messaging/Variant.h"
#include "qpid/Url.h"
#include "qpid/client/Connection.h"
-#include "qpid/client/FailoverListener.h"
#include "qpid/client/ConnectionSettings.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Semaphore.h"
@@ -46,7 +45,8 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
qpid::messaging::Session newSession(bool transactional, const std::string& name);
qpid::messaging::Session getSession(const std::string& name) const;
void closed(SessionImpl&);
- void reconnect();
+ void connect();
+ void setOption(const std::string& name, const qpid::messaging::Variant& value);
private:
typedef std::map<std::string, qpid::messaging::Session> Sessions;
@@ -54,18 +54,19 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
qpid::sys::Semaphore semaphore;//used to coordinate reconnection
Sessions sessions;
qpid::client::Connection connection;
- std::auto_ptr<FailoverListener> failoverListener;
- qpid::Url url;
+ std::vector<std::string> urls;
qpid::client::ConnectionSettings settings;
- bool reconnectionEnabled;
- int timeout;
- int minRetryInterval;
- int maxRetryInterval;
+ bool reconnect;
+ int64_t timeout;
+ int32_t limit;
+ int64_t minReconnectInterval;
+ int64_t maxReconnectInterval;
+ int32_t retries;
+ void setOptions(const qpid::messaging::Variant::Map& options);
void connect(const qpid::sys::AbsTime& started);
bool tryConnect();
- bool tryConnect(const std::vector<Url>& urls);
- bool tryConnect(const Url&);
+ bool tryConnect(const std::vector<std::string>& urls);
bool resetSessions();
};
}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 9823dba6e1..d9fd3a5da1 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -431,7 +431,7 @@ void SessionImpl::senderCancelled(const std::string& name)
void SessionImpl::reconnect()
{
- connection.reconnect();
+ connection.connect();
}
qpid::messaging::Connection SessionImpl::getConnection() const
diff --git a/cpp/src/qpid/messaging/AddressParser.cpp b/cpp/src/qpid/messaging/AddressParser.cpp
index 265b5fe195..4b29f126f2 100644
--- a/cpp/src/qpid/messaging/AddressParser.cpp
+++ b/cpp/src/qpid/messaging/AddressParser.cpp
@@ -198,6 +198,7 @@ bool AddressParser::readSimpleValue(Variant& value)
std::string s;
if (readWord(s)) {
value = s;
+ try { value = value.asInt32(); return true; } catch (const InvalidConversion&) {}
try { value = value.asInt64(); return true; } catch (const InvalidConversion&) {}
try { value = value.asDouble(); return true; } catch (const InvalidConversion&) {}
return true;
diff --git a/cpp/src/qpid/messaging/Connection.cpp b/cpp/src/qpid/messaging/Connection.cpp
index 230c9d5dbf..4873899787 100644
--- a/cpp/src/qpid/messaging/Connection.cpp
+++ b/cpp/src/qpid/messaging/Connection.cpp
@@ -67,40 +67,11 @@ Session Connection::newSession(bool transactional, const std::string& name)
return impl->newSession(transactional, name);
}
Session Connection::getSession(const std::string& name) const { return impl->getSession(name); }
-
-InvalidOptionString::InvalidOptionString(const std::string& msg) : Exception(msg) {}
-
-void parseKeyValuePair(const std::string& in, Variant::Map& out)
-{
- std::string::size_type i = in.find('=');
- if (i == std::string::npos || i == in.size() || in.find('=', i+1) != std::string::npos) {
- throw InvalidOptionString(QPID_MSG("Cannot parse name-value pair from " << in));
- } else {
- out[in.substr(0, i)] = in.substr(i+1);
- }
-}
-
-void parseOptionString(const std::string& in, Variant::Map& out)
-{
- std::string::size_type start = 0;
- std::string::size_type i = in.find('&');
- while (i != std::string::npos) {
- parseKeyValuePair(in.substr(start, i-start), out);
- if (i < in.size()) {
- start = i+1;
- i = in.find('&', start);
- } else {
- i = std::string::npos;
- }
- }
- parseKeyValuePair(in.substr(start), out);
+void Connection::setOption(const std::string& name, const Variant& value)
+{
+ impl->setOption(name, value);
}
-Variant::Map parseOptionString(const std::string& in)
-{
- Variant::Map map;
- parseOptionString(in, map);
- return map;
-}
+InvalidOptionString::InvalidOptionString(const std::string& msg) : Exception(msg) {}
}} // namespace qpid::messaging
diff --git a/cpp/src/qpid/messaging/ConnectionImpl.h b/cpp/src/qpid/messaging/ConnectionImpl.h
index 589c9fbe57..33ebcda950 100644
--- a/cpp/src/qpid/messaging/ConnectionImpl.h
+++ b/cpp/src/qpid/messaging/ConnectionImpl.h
@@ -31,6 +31,7 @@ namespace client {
namespace messaging {
class Session;
+class Variant;
class ConnectionImpl : public virtual qpid::RefCounted
{
@@ -40,6 +41,7 @@ class ConnectionImpl : public virtual qpid::RefCounted
virtual void close() = 0;
virtual Session newSession(bool transactional, const std::string& name) = 0;
virtual Session getSession(const std::string& name) const = 0;
+ virtual void setOption(const std::string& name, const Variant& value) = 0;
private:
};
}} // namespace qpid::messaging