diff options
| author | Gordon Sim <gsim@apache.org> | 2013-09-06 19:37:52 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-09-06 19:37:52 +0000 |
| commit | 9f89516b7a87b0a60abc665c4d83ae7daecbc8ad (patch) | |
| tree | f8fdb59196e37d70fe902244afa28e5a442abd6d /qpid/cpp/src | |
| parent | e1739e9a6f90a8029a099806e3778205f4f6e8d2 (diff) | |
| download | qpid-python-9f89516b7a87b0a60abc665c4d83ae7daecbc8ad.tar.gz | |
QPID-4932: expose reconnect&replay logic for application to control itself
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1520673 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/client/Connection.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/Connection.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 47 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/Connection.cpp | 14 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/ConnectionImpl.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 96 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h | 8 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp | 13 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h | 3 |
10 files changed, 155 insertions, 38 deletions
diff --git a/qpid/cpp/src/qpid/client/Connection.cpp b/qpid/cpp/src/qpid/client/Connection.cpp index 8b4eafccaa..26e69233af 100644 --- a/qpid/cpp/src/qpid/client/Connection.cpp +++ b/qpid/cpp/src/qpid/client/Connection.cpp @@ -127,7 +127,7 @@ void Connection::open(const ConnectionSettings& settings) impl->registerFailureCallback ( failureCallback ); } -const ConnectionSettings& Connection::getNegotiatedSettings() +const ConnectionSettings& Connection::getNegotiatedSettings() const { if (!isOpen()) throw Exception(QPID_MSG("Connection is not open.")); diff --git a/qpid/cpp/src/qpid/client/Connection.h b/qpid/cpp/src/qpid/client/Connection.h index c0db0f301d..fb502cb40a 100644 --- a/qpid/cpp/src/qpid/client/Connection.h +++ b/qpid/cpp/src/qpid/client/Connection.h @@ -216,7 +216,7 @@ class QPID_CLIENT_CLASS_EXTERN Connection /** * Return the set of client negotiated settings */ - QPID_CLIENT_EXTERN const ConnectionSettings& getNegotiatedSettings(); + QPID_CLIENT_EXTERN const ConnectionSettings& getNegotiatedSettings() const; friend struct ConnectionAccess; ///<@internal friend class SessionBase_0_10; ///<@internal diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index 366218c002..9406c992fe 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -43,6 +43,7 @@ using qpid::framing::Uuid; namespace { const std::string TCP("tcp"); +const std::string COLON(":"); double FOREVER(std::numeric_limits<double>::max()); // Time values in seconds can be specified as integer or floating point values. @@ -86,7 +87,7 @@ bool expired(const sys::AbsTime& start, double timeout) } // namespace ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) : - replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1), + replaceUrls(false), autoReconnect(false), timeout(FOREVER), limit(-1), minReconnectInterval(0.001), maxReconnectInterval(2), retries(0), reconnectOnLimitExceeded(true), disableAutoDecode(false) { @@ -106,7 +107,7 @@ void ConnectionImpl::setOption(const std::string& name, const Variant& value) { sys::Mutex::ScopedLock l(lock); if (name == "reconnect") { - reconnect = value; + autoReconnect = value; } else if (name == "reconnect-timeout" || name == "reconnect_timeout") { timeout = timeValue(value); } else if (name == "reconnect-limit" || name == "reconnect_limit") { @@ -256,7 +257,7 @@ void ConnectionImpl::open() void ConnectionImpl::reopen() { - if (!reconnect) { + if (!autoReconnect) { throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)"); } open(); @@ -267,7 +268,7 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started) { QPID_LOG(debug, "Starting connection, urls=" << asString(urls)); for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) { - if (!reconnect) { + if (!autoReconnect) { throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)"); } if (limit >= 0 && retries++ >= limit) { @@ -343,6 +344,44 @@ bool ConnectionImpl::backoff() } } +void ConnectionImpl::reconnect(const std::string& u) +{ + sys::Mutex::ScopedLock l(lock); + try { + QPID_LOG(info, "Trying to connect to " << u << "..."); + Url url(u, settings.protocol.size() ? settings.protocol : TCP); + if (url.getUser().size()) settings.username = url.getUser(); + if (url.getPass().size()) settings.password = url.getPass(); + connection.open(url, settings); + QPID_LOG(info, "Connected to " << u); + mergeUrls(connection.getInitialBrokers(), l); + if (!resetSessions(l)) throw qpid::messaging::TransportFailure("Could not re-establish sessions"); + } catch (const qpid::TransportFailure& e) { + QPID_LOG(info, "Failed to connect to " << u << ": " << e.what()); + throw qpid::messaging::TransportFailure(e.what()); + } catch (const std::exception& e) { + QPID_LOG(info, "Error while connecting to " << u << ": " << e.what()); + throw qpid::messaging::MessagingException(e.what()); + } +} + +void ConnectionImpl::reconnect() +{ + if (!tryConnect()) { + throw qpid::messaging::TransportFailure("Could not reconnect"); + } +} +std::string ConnectionImpl::getUrl() const +{ + if (isOpen()) { + std::stringstream u; + u << connection.getNegotiatedSettings().protocol << COLON << connection.getNegotiatedSettings().host << COLON << connection.getNegotiatedSettings().port; + return u.str(); + } else { + return std::string(); + } +} + std::string ConnectionImpl::getAuthenticatedUsername() { return connection.getNegotiatedSettings().username; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h index 00ac30a6df..ae839dc690 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -53,6 +53,9 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl void setOption(const std::string& name, const qpid::types::Variant& value); bool backoff(); std::string getAuthenticatedUsername(); + void reconnect(const std::string& url); + void reconnect(); + std::string getUrl() const; bool getAutoDecode() const; private: typedef std::map<std::string, qpid::messaging::Session> Sessions; @@ -64,7 +67,7 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl bool replaceUrls; // Replace rather than merging with reconnect-urls std::vector<std::string> urls; qpid::client::ConnectionSettings settings; - bool reconnect; + bool autoReconnect; double timeout; int32_t limit; double minReconnectInterval; diff --git a/qpid/cpp/src/qpid/messaging/Connection.cpp b/qpid/cpp/src/qpid/messaging/Connection.cpp index fde931038b..0993ef5dd9 100644 --- a/qpid/cpp/src/qpid/messaging/Connection.cpp +++ b/qpid/cpp/src/qpid/messaging/Connection.cpp @@ -90,4 +90,18 @@ std::string Connection::getAuthenticatedUsername() { return impl->getAuthenticatedUsername(); } + +void Connection::reconnect(const std::string& url) +{ + impl->reconnect(url); +} +void Connection::reconnect() +{ + impl->reconnect(); +} +std::string Connection::getUrl() const +{ + return impl->getUrl(); +} + }} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/ConnectionImpl.h b/qpid/cpp/src/qpid/messaging/ConnectionImpl.h index 1e11d9a6d5..92c6d91b10 100644 --- a/qpid/cpp/src/qpid/messaging/ConnectionImpl.h +++ b/qpid/cpp/src/qpid/messaging/ConnectionImpl.h @@ -45,6 +45,9 @@ class ConnectionImpl : public virtual qpid::RefCounted virtual Session getSession(const std::string& name) const = 0; virtual void setOption(const std::string& name, const qpid::types::Variant& value) = 0; virtual std::string getAuthenticatedUsername() = 0; + virtual void reconnect(const std::string& url) = 0; + virtual void reconnect() = 0; + virtual std::string getUrl() const = 0; private: }; }} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index 0d4885c4c3..afa2db6791 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -79,7 +79,7 @@ ConnectionContext::~ConnectionContext() bool ConnectionContext::isOpen() const { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); - return pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); + return state == CONNECTED && pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); } void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn) @@ -399,7 +399,7 @@ void ConnectionContext::check() if (state == DISCONNECTED) { if (ConnectionOptions::reconnect) { reset(); - reconnect(); + autoconnect(); } else { throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)"); } @@ -794,7 +794,7 @@ bool expired(const sys::AbsTime& start, double timeout) const std::string COLON(":"); } -void ConnectionContext::reconnect() +void ConnectionContext::autoconnect() { qpid::sys::AbsTime started(qpid::sys::now()); QPID_LOG(debug, "Starting connection, urls=" << asString(urls)); @@ -821,29 +821,7 @@ bool ConnectionContext::tryConnect() try { QPID_LOG(info, "Trying to connect to " << *i << "..."); if (tryConnect(qpid::Url(*i, protocol.empty() ? qpid::Address::TCP : protocol))) { - QPID_LOG(info, "Connected to " << *i); - if (sasl.get()) { - wakeupDriver(); - while (!sasl->authenticated()) { - QPID_LOG(debug, id << " Waiting to be authenticated..."); - wait(); - } - QPID_LOG(debug, id << " Authenticated"); - } - - QPID_LOG(debug, id << " Opening..."); - setProperties(); - pn_connection_open(connection); - wakeupDriver(); //want to write - while (pn_connection_state(connection) & PN_REMOTE_UNINIT) { - wait(); - } - if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) { - throw qpid::messaging::ConnectionError("Failed to open connection"); - } - QPID_LOG(debug, id << " Opened"); - - return restartSessions(); + return true; } } catch (const qpid::messaging::TransportFailure& e) { QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what()); @@ -852,9 +830,26 @@ bool ConnectionContext::tryConnect() return false; } -bool ConnectionContext::tryConnect(const std::string& url) +void ConnectionContext::reconnect(const std::string& url) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!"); + if (!driver) driver = DriverImpl::getDefault(); + reset(); + if (!tryConnect(qpid::Url(url, protocol.empty() ? qpid::Address::TCP : protocol))) { + throw qpid::messaging::TransportFailure("Failed to connect"); + } +} + +void ConnectionContext::reconnect() { - return tryConnect(qpid::Url(url, protocol.empty() ? qpid::Address::TCP : protocol)); + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!"); + if (!driver) driver = DriverImpl::getDefault(); + reset(); + if (!tryConnect()) { + throw qpid::messaging::TransportFailure("Failed to reconnect"); + } } bool ConnectionContext::tryConnect(const Url& url) @@ -863,11 +858,54 @@ bool ConnectionContext::tryConnect(const Url& url) if (url.getPass().size()) password = url.getPass(); for (Url::const_iterator i = url.begin(); i != url.end(); ++i) { - if (tryConnect(*i)) return true; + if (tryConnect(*i)) { + QPID_LOG(info, "Connected to " << *i); + setCurrentUrl(*i); + if (sasl.get()) { + wakeupDriver(); + while (!sasl->authenticated()) { + QPID_LOG(debug, id << " Waiting to be authenticated..."); + wait(); + } + QPID_LOG(debug, id << " Authenticated"); + } + + QPID_LOG(debug, id << " Opening..."); + setProperties(); + pn_connection_open(connection); + wakeupDriver(); //want to write + while (pn_connection_state(connection) & PN_REMOTE_UNINIT) { + wait(); + } + if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) { + throw qpid::messaging::ConnectionError("Failed to open connection"); + } + QPID_LOG(debug, id << " Opened"); + + return restartSessions(); + } } return false; } +void ConnectionContext::setCurrentUrl(const qpid::Address& a) +{ + std::stringstream u; + u << a; + currentUrl = u.str(); +} + +std::string ConnectionContext::getUrl() const +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (state == CONNECTED) { + return currentUrl; + } else { + return std::string(); + } +} + + bool ConnectionContext::tryConnect(const qpid::Address& address) { transport = driver->getTransport(address.protocol, *this); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h index 3b80f71f1d..7387dba13e 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -106,6 +106,9 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag framing::ProtocolVersion getVersion() const; //additionally, Transport needs: void opened();//signal successful connection + void reconnect(const std::string& url); + void reconnect(); + std::string getUrl() const; const qpid::sys::SecuritySettings* getTransportSecuritySettings(); private: @@ -122,6 +125,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag bool readHeader; bool haveOutput; std::string id; + std::string currentUrl; enum { DISCONNECTED, CONNECTING, @@ -144,14 +148,14 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag void checkClosed(boost::shared_ptr<SessionContext>, pn_link_t*); void wakeupDriver(); void attach(pn_link_t*, int credit=0); - void reconnect(); + void autoconnect(); bool tryConnect(); - bool tryConnect(const std::string& url); bool tryConnect(const qpid::Url& url); bool tryConnect(const qpid::Address& address); void reset(); bool restartSessions(); void restartSession(boost::shared_ptr<SessionContext>); + void setCurrentUrl(const qpid::Address&); std::size_t decodePlain(const char* buffer, std::size_t size); std::size_t encodePlain(char* buffer, std::size_t size); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp index 0c4ec2bfcb..c1ab108a61 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp @@ -81,4 +81,17 @@ std::string ConnectionHandle::getAuthenticatedUsername() return connection->getAuthenticatedUsername(); } +void ConnectionHandle::reconnect(const std::string& url) +{ + connection->reconnect(url); +} +void ConnectionHandle::reconnect() +{ + connection->reconnect(); +} +std::string ConnectionHandle::getUrl() const +{ + return connection->getUrl(); +} + }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h index d1eb27f6de..0238313f93 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h @@ -49,6 +49,9 @@ class ConnectionHandle : public qpid::messaging::ConnectionImpl Session getSession(const std::string& name) const; void setOption(const std::string& name, const qpid::types::Variant& value); std::string getAuthenticatedUsername(); + void reconnect(const std::string& url); + void reconnect(); + std::string getUrl() const; private: boost::shared_ptr<ConnectionContext> connection; }; |
