diff options
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 172 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h | 12 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/urlAdd.h | 59 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/qpid-ping.cpp | 85 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ssl_test | 2 |
7 files changed, 183 insertions, 150 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index 9406c992fe..b229d23851 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -93,7 +93,6 @@ ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& optio { setOptions(options); urls.insert(urls.begin(), url); - QPID_LOG(debug, "Created connection " << url << " with " << options); } void ConnectionImpl::setOptions(const Variant::Map& options) diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index 4230a0d644..0083a2e390 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -37,7 +37,9 @@ #include "qpid/sys/SecurityLayer.h" #include "qpid/sys/SystemInfo.h" #include "qpid/sys/Time.h" +#include "qpid/sys/urlAdd.h" #include "config.h" +#include <boost/lexical_cast.hpp> #include <vector> extern "C" { #include <proton/engine.h> @@ -48,17 +50,6 @@ namespace messaging { namespace amqp { namespace { -std::string asString(const std::vector<std::string>& v) { - std::stringstream os; - os << "["; - for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); ++i ) { - if (i != v.begin()) os << ", "; - os << '"' << *i << '"'; - } - os << "]"; - return os.str(); -} - //remove conditional when 0.5 is no longer supported #ifdef HAVE_PROTON_TRACER void do_trace(pn_transport_t* transport, const char* message) @@ -86,6 +77,7 @@ void ConnectionContext::trace(const char* message) const ConnectionContext::ConnectionContext(const std::string& url, const qpid::types::Variant::Map& o) : qpid::messaging::ConnectionOptions(o), + fullUrl(url), engine(pn_transport()), connection(pn_connection()), //note: disabled read/write of header as now handled by engine @@ -95,7 +87,8 @@ ConnectionContext::ConnectionContext(const std::string& url, const qpid::types:: state(DISCONNECTED), codecAdapter(*this) { - urls.insert(urls.begin(), url); + // Concatenate all known URLs into a single URL, get rid of duplicate addresses. + sys::urlAddStrings(fullUrl, urls.begin(), urls.end()); if (pn_transport_bind(engine, connection)) { //error } @@ -495,7 +488,9 @@ void ConnectionContext::reset() void ConnectionContext::check() { if (checkDisconnected()) { if (ConnectionOptions::reconnect) { + QPID_LOG(notice, "Auto-reconnecting to " << fullUrl); autoconnect(); + QPID_LOG(notice, "Auto-reconnected to " << currentUrl); } else { throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)"); } @@ -903,7 +898,7 @@ void ConnectionContext::open() qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!"); if (!driver) driver = DriverImpl::getDefault(); - + QPID_LOG(info, "Starting connection to " << fullUrl); autoconnect(); } @@ -921,64 +916,38 @@ bool expired(const sys::AbsTime& start, double timeout) const std::string COLON(":"); } +void throwConnectFail(const Url& url, const std::string& msg) { + throw qpid::messaging::TransportFailure( + Msg() << "Connect failed to " << url << ": " << msg); +} + void ConnectionContext::autoconnect() { qpid::sys::AbsTime started(qpid::sys::now()); - QPID_LOG(debug, "Starting connection, urls=" << asString(urls)); - for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) { - if (!ConnectionOptions::reconnect) { - throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)"); - } - if (limit >= 0 && retries++ >= limit) { - throw qpid::messaging::TransportFailure("Failed to connect within reconnect limit"); - } - if (expired(started, timeout)) { - throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout"); - } - QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " microseconds, urls=" - << asString(urls)); + for (double i = minReconnectInterval; !tryConnectUrl(fullUrl); i = std::min(i*2, maxReconnectInterval)) { + if (!ConnectionOptions::reconnect) throwConnectFail(fullUrl, "Reconnect disabled"); + if (limit >= 0 && retries++ >= limit) throwConnectFail(fullUrl, "Exceeded retries"); + if (expired(started, timeout)) throwConnectFail(fullUrl, "Exceeded timeout"); + QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " microseconds to" + << fullUrl); qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds. } retries = 0; } -bool ConnectionContext::tryConnect() -{ - for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) { - try { - QPID_LOG(info, "Trying to connect to " << *i << "..."); - if (tryConnect(qpid::Url(*i, protocol.empty() ? qpid::Address::TCP : protocol))) { - return true; - } - QPID_LOG(info, "Failed to connect to " << *i); - } catch (const qpid::messaging::TransportFailure& e) { - QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what()); - } - } - return false; -} - -void ConnectionContext::reconnect(const std::string& url) -{ +void ConnectionContext::reconnect(const Url& url) { + QPID_LOG(notice, "Reconnecting to " << url); qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!"); if (!driver) driver = DriverImpl::getDefault(); reset(); - if (!tryConnect(qpid::Url(url, protocol.empty() ? qpid::Address::TCP : protocol))) { - throw qpid::messaging::TransportFailure("Failed to connect"); - } + if (!tryConnectUrl(url)) throwConnectFail(url, "Failed to reconnect"); + QPID_LOG(notice, "Reconnected to " << currentUrl); } -void ConnectionContext::reconnect() -{ - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); - if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!"); - if (!driver) driver = DriverImpl::getDefault(); - reset(); - if (!tryConnect()) { - throw qpid::messaging::TransportFailure("Failed to reconnect"); - } -} +void ConnectionContext::reconnect(const std::string& url) { reconnect(Url(url)); } + +void ConnectionContext::reconnect() { reconnect(fullUrl); } void ConnectionContext::waitNoReconnect() { if (!checkDisconnected()) { @@ -987,77 +956,75 @@ void ConnectionContext::waitNoReconnect() { } } -bool ConnectionContext::tryConnect(const Url& url) +// Try to connect to a URL, i.e. try to connect to each of its addresses in turn +// till one succeeds or they all fail. +// @return true if we connect successfully +bool ConnectionContext::tryConnectUrl(const Url& url) { if (url.getUser().size()) username = url.getUser(); if (url.getPass().size()) password = url.getPass(); for (Url::const_iterator i = url.begin(); i != url.end(); ++i) { - if (tryConnect(*i)) { + QPID_LOG(info, "Connecting to " << *i); + if (tryConnectAddr(*i) && tryOpenAddr(*i)) { QPID_LOG(info, "Connected to " << *i); - setCurrentUrl(*i); - if (sasl.get()) { - wakeupDriver(); - while (!sasl->authenticated() && state != DISCONNECTED) { - QPID_LOG(debug, id << " Waiting to be authenticated..."); - waitNoReconnect(); - } - if (state == DISCONNECTED) continue; - QPID_LOG(debug, id << " Authenticated"); - } - - QPID_LOG(debug, id << " Opening..."); - setProperties(); - pn_connection_open(connection); - wakeupDriver(); //want to write - while ((pn_connection_state(connection) & PN_REMOTE_UNINIT) && - state != DISCONNECTED) - waitNoReconnect(); - if (state == DISCONNECTED) continue; - if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) { - throw qpid::messaging::ConnectionError("Failed to open connection"); - } - QPID_LOG(debug, id << " Opened"); - - return restartSessions(); - } else { - QPID_LOG(notice, "Failed to connect to " << *i); + return true; } } return false; } -void ConnectionContext::setCurrentUrl(const qpid::Address& a) -{ - std::stringstream u; - u << a; - currentUrl = u.str(); +// Try to open an AMQP protocol connection on an address, after we have already +// established a transport connect (see tryConnectAddr below) +// @return true if the AMQP connection is succesfully opened. +bool ConnectionContext::tryOpenAddr(const qpid::Address& addr) { + currentUrl = Url(addr); + if (sasl.get()) { + wakeupDriver(); + while (!sasl->authenticated() && state != DISCONNECTED) { + QPID_LOG(debug, id << " Waiting to be authenticated..."); + waitNoReconnect(); + } + if (state == DISCONNECTED) return false; + QPID_LOG(debug, id << " Authenticated"); + } + + QPID_LOG(debug, id << " Opening..."); + setProperties(); + pn_connection_open(connection); + wakeupDriver(); //want to write + while ((pn_connection_state(connection) & PN_REMOTE_UNINIT) && + state != DISCONNECTED) + waitNoReconnect(); + if (state == DISCONNECTED) return false; + if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) { + throw qpid::messaging::ConnectionError("Failed to open connection"); + } + QPID_LOG(debug, id << " Opened"); + + return restartSessions(); } std::string ConnectionContext::getUrl() const { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); - if (state == CONNECTED) { - return currentUrl; - } else { - return std::string(); - } + return (state == CONNECTED) ? currentUrl.str() : std::string(); } - -bool ConnectionContext::tryConnect(const qpid::Address& address) +// Try to establish a transport connect to an individual address (typically a +// TCP host:port) +// @return true if we succeed in connecting. +bool ConnectionContext::tryConnectAddr(const qpid::Address& address) { transport = driver->getTransport(address.protocol, *this); - std::stringstream port; - port << address.port; - id = address.host + COLON + port.str(); + id = boost::lexical_cast<std::string>(address); if (useSasl()) { sasl = std::auto_ptr<Sasl>(new Sasl(id, *this, address.host)); } state = CONNECTING; try { QPID_LOG(debug, id << " Connecting ..."); - transport->connect(address.host, port.str()); + transport->connect(address.host, boost::lexical_cast<std::string>(address.port)); bool waiting(true); while (waiting) { switch (state) { @@ -1069,7 +1036,6 @@ bool ConnectionContext::tryConnect(const qpid::Address& address) break; case DISCONNECTED: waiting = false; - QPID_LOG(debug, id << " Failed to connect"); break; } } diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h index 75afeba46a..59270f445d 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -132,6 +132,9 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag ConnectionContext& context; }; + Url fullUrl; // Combined URL of all known addresses. + Url currentUrl; // URL of currently connected address. + boost::shared_ptr<DriverImpl> driver; boost::shared_ptr<Transport> transport; @@ -143,7 +146,6 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag bool readHeader; bool haveOutput; std::string id; - std::string currentUrl; enum { DISCONNECTED, CONNECTING, @@ -170,13 +172,13 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag void wakeupDriver(); void attach(boost::shared_ptr<SessionContext>, pn_link_t*, int credit=0); void autoconnect(); - bool tryConnect(); - bool tryConnect(const qpid::Url& url); - bool tryConnect(const qpid::Address& address); + bool tryConnectUrl(const qpid::Url& url); + bool tryOpenAddr(const qpid::Address& address); + bool tryConnectAddr(const qpid::Address& address); + void reconnect(const Url& url); void reset(); bool restartSessions(); void restartSession(boost::shared_ptr<SessionContext>); - void setCurrentUrl(const qpid::Address&); std::size_t decodePlain(const char* buffer, std::size_t size); std::size_t encodePlain(char* buffer, std::size_t size); diff --git a/qpid/cpp/src/qpid/sys/urlAdd.h b/qpid/cpp/src/qpid/sys/urlAdd.h new file mode 100644 index 0000000000..aecb3115cf --- /dev/null +++ b/qpid/cpp/src/qpid/sys/urlAdd.h @@ -0,0 +1,59 @@ +#ifndef QPID_SYS_URLADD_H +#define QPID_SYS_URLADD_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/Url.h> +#include <boost/bind.hpp> +#include <algorithm> + +namespace qpid { +namespace sys { + +/** Add addr to url if it is not already present. */ +inline void urlAddAddress(Url& url, const Address& addr) { + if (std::find(url.begin(), url.end(), addr) == url.end()) url.push_back(addr); +} + +/** Add all addresses in more that are not already in url to url */ +inline void urlAddUrl(Url& url, const Url& more) { + for_each(more.begin(), more.end(), boost::bind(&urlAddAddress, boost::ref(url), _1)); +} + +/** Convert str to a Url and do urlAddUrl. */ +inline void urlAddString(Url& url, const std::string& str) { urlAddUrl(url, Url(str)); } + +/** For each URL in a range, do urlAddUrl */ +template <class UrlIterator> +void urlAddUrls(Url& url, UrlIterator i, UrlIterator j) { + for_each(i, j, boost::bind(&urlAddUrl, boost::ref(url), _1)); +} + +/** For each string in a range, do urlAddUrl(Url(string)) */ +template <class StringIterator> +void urlAddStrings(Url& url, StringIterator i, StringIterator j) { + for_each(i, j, boost::bind(&urlAddString, boost::ref(url), _1)); +} + +}} // namespace qpid::sys + +#endif /*!QPID_SYS_URLADD_H*/ diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt index 37cd6629dc..4c0dcba500 100644 --- a/qpid/cpp/src/tests/CMakeLists.txt +++ b/qpid/cpp/src/tests/CMakeLists.txt @@ -85,7 +85,7 @@ target_link_libraries (qpid-client-test qpidclient qpidcommon "${Boost_PROGRAM_O remember_location(qpid-client-test) add_executable (qpid-ping qpid-ping.cpp ${platform_test_additions}) -target_link_libraries (qpid-ping qpidclient qpidcommon qpidtypes "${Boost_PROGRAM_OPTIONS_LIBRARY}") +target_link_libraries (qpid-ping qpidmessaging qpidtypes qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") remember_location(qpid-ping) add_executable (qpid-topic-listener qpid-topic-listener.cpp ${platform_test_additions}) diff --git a/qpid/cpp/src/tests/qpid-ping.cpp b/qpid/cpp/src/tests/qpid-ping.cpp index 52331499e7..f9b1ec17f1 100644 --- a/qpid/cpp/src/tests/qpid-ping.cpp +++ b/qpid/cpp/src/tests/qpid-ping.cpp @@ -20,68 +20,75 @@ */ -#include "TestOptions.h" -#include "qpid/client/SubscriptionManager.h" -#include "qpid/client/Connection.h" -#include "qpid/client/AsyncSession.h" -#include "qpid/sys/Time.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/Runnable.h" -#include "qpid/sys/Monitor.h" -#include "qpid/framing/Uuid.h" +#include <qpid/messaging/Address.h> +#include <qpid/messaging/Connection.h> +#include "qpid/messaging/Duration.h" +#include <qpid/messaging/Message.h> +#include <qpid/messaging/Sender.h> +#include <qpid/messaging/Receiver.h> +#include <qpid/messaging/Session.h> +#include <qpid/Msg.h> +#include <qpid/Options.h> +#include <qpid/types/Uuid.h> #include <string> #include <iostream> -using std::cerr; -using std::cout; -using std::endl; -using std::exception; -using std::string; -using namespace qpid::client::arg; // For keyword args -using qpid::client::AsyncSession; -using qpid::client::Connection; -using qpid::client::Message; -using qpid::client::SubscriptionManager; -using qpid::framing::Uuid; +using namespace std; +using namespace qpid::messaging; +using qpid::types::Uuid; -namespace qpid { -namespace tests { +namespace { -struct PingOptions : public qpid::TestOptions { - int timeout; // Timeout in seconds. +struct PingOptions : public qpid::Options { + string url; + string address; + string message; + string connectionOptions; + double timeout; // Timeout in seconds. bool quiet; // No output - PingOptions() : timeout(1), quiet(false) { + + PingOptions() : + url("127.0.0.1"), + address(Uuid(true).str()+";{create:always}"), + message(Uuid(true).str()), + timeout(1), + quiet(false) + { + using qpid::optValue; addOptions() + ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to.") + ("address,a", qpid::optValue(address, "ADDRESS"), "address to use.") + ("message,m", optValue(message, "MESSAGE"), "message text to send.") + ("connection-options", optValue(connectionOptions, "OPTIONS"), "options for the connection.") ("timeout,t", optValue(timeout, "SECONDS"), "Max time to wait.") ("quiet,q", optValue(quiet), "Don't print anything to stderr/stdout."); } }; -}} // namespace qpid::tests +} // namespace int main(int argc, char** argv) { + Connection connection; + PingOptions opts; try { - qpid::tests::PingOptions opts; opts.parse(argc, argv); - opts.con.heartbeat = (opts.timeout+1)/2; - Connection connection; - opts.open(connection); + connection = Connection(opts.url, opts.connectionOptions); + connection.open(); if (!opts.quiet) cout << "Opened connection." << endl; - AsyncSession s = connection.newSession(); - string qname(Uuid(true).str()); - s.queueDeclare(queue=qname, autoDelete=true, exclusive=true); - s.messageTransfer(content=Message("hello", qname)); + Session s = connection.createSession(); + s.createSender(opts.address).send(Message(opts.message)); if (!opts.quiet) cout << "Sent message." << endl; - SubscriptionManager subs(s); - subs.get(qname); + Message m = s.createReceiver(opts.address). + fetch(Duration(opts.timeout*1000)); + if (m.getContent() != opts.message) + throw qpid::Exception(qpid::Msg() << "Expected " << opts.message + << " but received " << m.getContent()); if (!opts.quiet) cout << "Received message." << endl; - s.sync(); - s.close(); connection.close(); - if (!opts.quiet) cout << "Success." << endl; return 0; } catch (const exception& e) { cerr << "Error: " << e.what() << endl; + connection.close(); return 1; } } diff --git a/qpid/cpp/src/tests/ssl_test b/qpid/cpp/src/tests/ssl_test index 36a8d01b32..e7c763f9ce 100755 --- a/qpid/cpp/src/tests/ssl_test +++ b/qpid/cpp/src/tests/ssl_test @@ -153,7 +153,7 @@ ssl_cluster_broker() { # $1 = port start_brokers 1 "--ssl-port $1 --auth no --load-module $CLUSTER_LIB --cluster-name ssl_test.$HOSTNAME.$$ --cluster-url amqp:ssl:$TEST_HOSTNAME:$1" # Wait for broker to be ready - qpid-ping -Pssl -b $TEST_HOSTNAME -qp $1 || { echo "Cannot connect to broker on $1"; exit 1; } + qpid-ping -Pssl -b $TEST_HOSTNAME:$1 -q || { echo "Cannot connect to broker on $1"; exit 1; } } CERTUTIL=$(type -p certutil) |
