summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp1
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp172
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h12
-rw-r--r--qpid/cpp/src/qpid/sys/urlAdd.h59
-rw-r--r--qpid/cpp/src/tests/CMakeLists.txt2
-rw-r--r--qpid/cpp/src/tests/qpid-ping.cpp85
-rwxr-xr-xqpid/cpp/src/tests/ssl_test2
7 files changed, 183 insertions, 150 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index 9406c992fe..b229d23851 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -93,7 +93,6 @@ ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& optio
{
setOptions(options);
urls.insert(urls.begin(), url);
- QPID_LOG(debug, "Created connection " << url << " with " << options);
}
void ConnectionImpl::setOptions(const Variant::Map& options)
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index 4230a0d644..0083a2e390 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -37,7 +37,9 @@
#include "qpid/sys/SecurityLayer.h"
#include "qpid/sys/SystemInfo.h"
#include "qpid/sys/Time.h"
+#include "qpid/sys/urlAdd.h"
#include "config.h"
+#include <boost/lexical_cast.hpp>
#include <vector>
extern "C" {
#include <proton/engine.h>
@@ -48,17 +50,6 @@ namespace messaging {
namespace amqp {
namespace {
-std::string asString(const std::vector<std::string>& v) {
- std::stringstream os;
- os << "[";
- for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); ++i ) {
- if (i != v.begin()) os << ", ";
- os << '"' << *i << '"';
- }
- os << "]";
- return os.str();
-}
-
//remove conditional when 0.5 is no longer supported
#ifdef HAVE_PROTON_TRACER
void do_trace(pn_transport_t* transport, const char* message)
@@ -86,6 +77,7 @@ void ConnectionContext::trace(const char* message) const
ConnectionContext::ConnectionContext(const std::string& url, const qpid::types::Variant::Map& o)
: qpid::messaging::ConnectionOptions(o),
+ fullUrl(url),
engine(pn_transport()),
connection(pn_connection()),
//note: disabled read/write of header as now handled by engine
@@ -95,7 +87,8 @@ ConnectionContext::ConnectionContext(const std::string& url, const qpid::types::
state(DISCONNECTED),
codecAdapter(*this)
{
- urls.insert(urls.begin(), url);
+ // Concatenate all known URLs into a single URL, get rid of duplicate addresses.
+ sys::urlAddStrings(fullUrl, urls.begin(), urls.end());
if (pn_transport_bind(engine, connection)) {
//error
}
@@ -495,7 +488,9 @@ void ConnectionContext::reset()
void ConnectionContext::check() {
if (checkDisconnected()) {
if (ConnectionOptions::reconnect) {
+ QPID_LOG(notice, "Auto-reconnecting to " << fullUrl);
autoconnect();
+ QPID_LOG(notice, "Auto-reconnected to " << currentUrl);
} else {
throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)");
}
@@ -903,7 +898,7 @@ void ConnectionContext::open()
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
if (!driver) driver = DriverImpl::getDefault();
-
+ QPID_LOG(info, "Starting connection to " << fullUrl);
autoconnect();
}
@@ -921,64 +916,38 @@ bool expired(const sys::AbsTime& start, double timeout)
const std::string COLON(":");
}
+void throwConnectFail(const Url& url, const std::string& msg) {
+ throw qpid::messaging::TransportFailure(
+ Msg() << "Connect failed to " << url << ": " << msg);
+}
+
void ConnectionContext::autoconnect()
{
qpid::sys::AbsTime started(qpid::sys::now());
- QPID_LOG(debug, "Starting connection, urls=" << asString(urls));
- for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) {
- if (!ConnectionOptions::reconnect) {
- throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
- }
- if (limit >= 0 && retries++ >= limit) {
- throw qpid::messaging::TransportFailure("Failed to connect within reconnect limit");
- }
- if (expired(started, timeout)) {
- throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout");
- }
- QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " microseconds, urls="
- << asString(urls));
+ for (double i = minReconnectInterval; !tryConnectUrl(fullUrl); i = std::min(i*2, maxReconnectInterval)) {
+ if (!ConnectionOptions::reconnect) throwConnectFail(fullUrl, "Reconnect disabled");
+ if (limit >= 0 && retries++ >= limit) throwConnectFail(fullUrl, "Exceeded retries");
+ if (expired(started, timeout)) throwConnectFail(fullUrl, "Exceeded timeout");
+ QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " microseconds to"
+ << fullUrl);
qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds.
}
retries = 0;
}
-bool ConnectionContext::tryConnect()
-{
- for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
- try {
- QPID_LOG(info, "Trying to connect to " << *i << "...");
- if (tryConnect(qpid::Url(*i, protocol.empty() ? qpid::Address::TCP : protocol))) {
- return true;
- }
- QPID_LOG(info, "Failed to connect to " << *i);
- } catch (const qpid::messaging::TransportFailure& e) {
- QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
- }
- }
- return false;
-}
-
-void ConnectionContext::reconnect(const std::string& url)
-{
+void ConnectionContext::reconnect(const Url& url) {
+ QPID_LOG(notice, "Reconnecting to " << url);
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
if (!driver) driver = DriverImpl::getDefault();
reset();
- if (!tryConnect(qpid::Url(url, protocol.empty() ? qpid::Address::TCP : protocol))) {
- throw qpid::messaging::TransportFailure("Failed to connect");
- }
+ if (!tryConnectUrl(url)) throwConnectFail(url, "Failed to reconnect");
+ QPID_LOG(notice, "Reconnected to " << currentUrl);
}
-void ConnectionContext::reconnect()
-{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
- if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
- if (!driver) driver = DriverImpl::getDefault();
- reset();
- if (!tryConnect()) {
- throw qpid::messaging::TransportFailure("Failed to reconnect");
- }
-}
+void ConnectionContext::reconnect(const std::string& url) { reconnect(Url(url)); }
+
+void ConnectionContext::reconnect() { reconnect(fullUrl); }
void ConnectionContext::waitNoReconnect() {
if (!checkDisconnected()) {
@@ -987,77 +956,75 @@ void ConnectionContext::waitNoReconnect() {
}
}
-bool ConnectionContext::tryConnect(const Url& url)
+// Try to connect to a URL, i.e. try to connect to each of its addresses in turn
+// till one succeeds or they all fail.
+// @return true if we connect successfully
+bool ConnectionContext::tryConnectUrl(const Url& url)
{
if (url.getUser().size()) username = url.getUser();
if (url.getPass().size()) password = url.getPass();
for (Url::const_iterator i = url.begin(); i != url.end(); ++i) {
- if (tryConnect(*i)) {
+ QPID_LOG(info, "Connecting to " << *i);
+ if (tryConnectAddr(*i) && tryOpenAddr(*i)) {
QPID_LOG(info, "Connected to " << *i);
- setCurrentUrl(*i);
- if (sasl.get()) {
- wakeupDriver();
- while (!sasl->authenticated() && state != DISCONNECTED) {
- QPID_LOG(debug, id << " Waiting to be authenticated...");
- waitNoReconnect();
- }
- if (state == DISCONNECTED) continue;
- QPID_LOG(debug, id << " Authenticated");
- }
-
- QPID_LOG(debug, id << " Opening...");
- setProperties();
- pn_connection_open(connection);
- wakeupDriver(); //want to write
- while ((pn_connection_state(connection) & PN_REMOTE_UNINIT) &&
- state != DISCONNECTED)
- waitNoReconnect();
- if (state == DISCONNECTED) continue;
- if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
- throw qpid::messaging::ConnectionError("Failed to open connection");
- }
- QPID_LOG(debug, id << " Opened");
-
- return restartSessions();
- } else {
- QPID_LOG(notice, "Failed to connect to " << *i);
+ return true;
}
}
return false;
}
-void ConnectionContext::setCurrentUrl(const qpid::Address& a)
-{
- std::stringstream u;
- u << a;
- currentUrl = u.str();
+// Try to open an AMQP protocol connection on an address, after we have already
+// established a transport connect (see tryConnectAddr below)
+// @return true if the AMQP connection is succesfully opened.
+bool ConnectionContext::tryOpenAddr(const qpid::Address& addr) {
+ currentUrl = Url(addr);
+ if (sasl.get()) {
+ wakeupDriver();
+ while (!sasl->authenticated() && state != DISCONNECTED) {
+ QPID_LOG(debug, id << " Waiting to be authenticated...");
+ waitNoReconnect();
+ }
+ if (state == DISCONNECTED) return false;
+ QPID_LOG(debug, id << " Authenticated");
+ }
+
+ QPID_LOG(debug, id << " Opening...");
+ setProperties();
+ pn_connection_open(connection);
+ wakeupDriver(); //want to write
+ while ((pn_connection_state(connection) & PN_REMOTE_UNINIT) &&
+ state != DISCONNECTED)
+ waitNoReconnect();
+ if (state == DISCONNECTED) return false;
+ if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
+ throw qpid::messaging::ConnectionError("Failed to open connection");
+ }
+ QPID_LOG(debug, id << " Opened");
+
+ return restartSessions();
}
std::string ConnectionContext::getUrl() const
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
- if (state == CONNECTED) {
- return currentUrl;
- } else {
- return std::string();
- }
+ return (state == CONNECTED) ? currentUrl.str() : std::string();
}
-
-bool ConnectionContext::tryConnect(const qpid::Address& address)
+// Try to establish a transport connect to an individual address (typically a
+// TCP host:port)
+// @return true if we succeed in connecting.
+bool ConnectionContext::tryConnectAddr(const qpid::Address& address)
{
transport = driver->getTransport(address.protocol, *this);
- std::stringstream port;
- port << address.port;
- id = address.host + COLON + port.str();
+ id = boost::lexical_cast<std::string>(address);
if (useSasl()) {
sasl = std::auto_ptr<Sasl>(new Sasl(id, *this, address.host));
}
state = CONNECTING;
try {
QPID_LOG(debug, id << " Connecting ...");
- transport->connect(address.host, port.str());
+ transport->connect(address.host, boost::lexical_cast<std::string>(address.port));
bool waiting(true);
while (waiting) {
switch (state) {
@@ -1069,7 +1036,6 @@ bool ConnectionContext::tryConnect(const qpid::Address& address)
break;
case DISCONNECTED:
waiting = false;
- QPID_LOG(debug, id << " Failed to connect");
break;
}
}
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
index 75afeba46a..59270f445d 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
@@ -132,6 +132,9 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
ConnectionContext& context;
};
+ Url fullUrl; // Combined URL of all known addresses.
+ Url currentUrl; // URL of currently connected address.
+
boost::shared_ptr<DriverImpl> driver;
boost::shared_ptr<Transport> transport;
@@ -143,7 +146,6 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
bool readHeader;
bool haveOutput;
std::string id;
- std::string currentUrl;
enum {
DISCONNECTED,
CONNECTING,
@@ -170,13 +172,13 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
void wakeupDriver();
void attach(boost::shared_ptr<SessionContext>, pn_link_t*, int credit=0);
void autoconnect();
- bool tryConnect();
- bool tryConnect(const qpid::Url& url);
- bool tryConnect(const qpid::Address& address);
+ bool tryConnectUrl(const qpid::Url& url);
+ bool tryOpenAddr(const qpid::Address& address);
+ bool tryConnectAddr(const qpid::Address& address);
+ void reconnect(const Url& url);
void reset();
bool restartSessions();
void restartSession(boost::shared_ptr<SessionContext>);
- void setCurrentUrl(const qpid::Address&);
std::size_t decodePlain(const char* buffer, std::size_t size);
std::size_t encodePlain(char* buffer, std::size_t size);
diff --git a/qpid/cpp/src/qpid/sys/urlAdd.h b/qpid/cpp/src/qpid/sys/urlAdd.h
new file mode 100644
index 0000000000..aecb3115cf
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/urlAdd.h
@@ -0,0 +1,59 @@
+#ifndef QPID_SYS_URLADD_H
+#define QPID_SYS_URLADD_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/Url.h>
+#include <boost/bind.hpp>
+#include <algorithm>
+
+namespace qpid {
+namespace sys {
+
+/** Add addr to url if it is not already present. */
+inline void urlAddAddress(Url& url, const Address& addr) {
+ if (std::find(url.begin(), url.end(), addr) == url.end()) url.push_back(addr);
+}
+
+/** Add all addresses in more that are not already in url to url */
+inline void urlAddUrl(Url& url, const Url& more) {
+ for_each(more.begin(), more.end(), boost::bind(&urlAddAddress, boost::ref(url), _1));
+}
+
+/** Convert str to a Url and do urlAddUrl. */
+inline void urlAddString(Url& url, const std::string& str) { urlAddUrl(url, Url(str)); }
+
+/** For each URL in a range, do urlAddUrl */
+template <class UrlIterator>
+void urlAddUrls(Url& url, UrlIterator i, UrlIterator j) {
+ for_each(i, j, boost::bind(&urlAddUrl, boost::ref(url), _1));
+}
+
+/** For each string in a range, do urlAddUrl(Url(string)) */
+template <class StringIterator>
+void urlAddStrings(Url& url, StringIterator i, StringIterator j) {
+ for_each(i, j, boost::bind(&urlAddString, boost::ref(url), _1));
+}
+
+}} // namespace qpid::sys
+
+#endif /*!QPID_SYS_URLADD_H*/
diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt
index 37cd6629dc..4c0dcba500 100644
--- a/qpid/cpp/src/tests/CMakeLists.txt
+++ b/qpid/cpp/src/tests/CMakeLists.txt
@@ -85,7 +85,7 @@ target_link_libraries (qpid-client-test qpidclient qpidcommon "${Boost_PROGRAM_O
remember_location(qpid-client-test)
add_executable (qpid-ping qpid-ping.cpp ${platform_test_additions})
-target_link_libraries (qpid-ping qpidclient qpidcommon qpidtypes "${Boost_PROGRAM_OPTIONS_LIBRARY}")
+target_link_libraries (qpid-ping qpidmessaging qpidtypes qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
remember_location(qpid-ping)
add_executable (qpid-topic-listener qpid-topic-listener.cpp ${platform_test_additions})
diff --git a/qpid/cpp/src/tests/qpid-ping.cpp b/qpid/cpp/src/tests/qpid-ping.cpp
index 52331499e7..f9b1ec17f1 100644
--- a/qpid/cpp/src/tests/qpid-ping.cpp
+++ b/qpid/cpp/src/tests/qpid-ping.cpp
@@ -20,68 +20,75 @@
*/
-#include "TestOptions.h"
-#include "qpid/client/SubscriptionManager.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/AsyncSession.h"
-#include "qpid/sys/Time.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Monitor.h"
-#include "qpid/framing/Uuid.h"
+#include <qpid/messaging/Address.h>
+#include <qpid/messaging/Connection.h>
+#include "qpid/messaging/Duration.h"
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Session.h>
+#include <qpid/Msg.h>
+#include <qpid/Options.h>
+#include <qpid/types/Uuid.h>
#include <string>
#include <iostream>
-using std::cerr;
-using std::cout;
-using std::endl;
-using std::exception;
-using std::string;
-using namespace qpid::client::arg; // For keyword args
-using qpid::client::AsyncSession;
-using qpid::client::Connection;
-using qpid::client::Message;
-using qpid::client::SubscriptionManager;
-using qpid::framing::Uuid;
+using namespace std;
+using namespace qpid::messaging;
+using qpid::types::Uuid;
-namespace qpid {
-namespace tests {
+namespace {
-struct PingOptions : public qpid::TestOptions {
- int timeout; // Timeout in seconds.
+struct PingOptions : public qpid::Options {
+ string url;
+ string address;
+ string message;
+ string connectionOptions;
+ double timeout; // Timeout in seconds.
bool quiet; // No output
- PingOptions() : timeout(1), quiet(false) {
+
+ PingOptions() :
+ url("127.0.0.1"),
+ address(Uuid(true).str()+";{create:always}"),
+ message(Uuid(true).str()),
+ timeout(1),
+ quiet(false)
+ {
+ using qpid::optValue;
addOptions()
+ ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to.")
+ ("address,a", qpid::optValue(address, "ADDRESS"), "address to use.")
+ ("message,m", optValue(message, "MESSAGE"), "message text to send.")
+ ("connection-options", optValue(connectionOptions, "OPTIONS"), "options for the connection.")
("timeout,t", optValue(timeout, "SECONDS"), "Max time to wait.")
("quiet,q", optValue(quiet), "Don't print anything to stderr/stdout.");
}
};
-}} // namespace qpid::tests
+} // namespace
int main(int argc, char** argv) {
+ Connection connection;
+ PingOptions opts;
try {
- qpid::tests::PingOptions opts;
opts.parse(argc, argv);
- opts.con.heartbeat = (opts.timeout+1)/2;
- Connection connection;
- opts.open(connection);
+ connection = Connection(opts.url, opts.connectionOptions);
+ connection.open();
if (!opts.quiet) cout << "Opened connection." << endl;
- AsyncSession s = connection.newSession();
- string qname(Uuid(true).str());
- s.queueDeclare(queue=qname, autoDelete=true, exclusive=true);
- s.messageTransfer(content=Message("hello", qname));
+ Session s = connection.createSession();
+ s.createSender(opts.address).send(Message(opts.message));
if (!opts.quiet) cout << "Sent message." << endl;
- SubscriptionManager subs(s);
- subs.get(qname);
+ Message m = s.createReceiver(opts.address).
+ fetch(Duration(opts.timeout*1000));
+ if (m.getContent() != opts.message)
+ throw qpid::Exception(qpid::Msg() << "Expected " << opts.message
+ << " but received " << m.getContent());
if (!opts.quiet) cout << "Received message." << endl;
- s.sync();
- s.close();
connection.close();
- if (!opts.quiet) cout << "Success." << endl;
return 0;
} catch (const exception& e) {
cerr << "Error: " << e.what() << endl;
+ connection.close();
return 1;
}
}
diff --git a/qpid/cpp/src/tests/ssl_test b/qpid/cpp/src/tests/ssl_test
index 36a8d01b32..e7c763f9ce 100755
--- a/qpid/cpp/src/tests/ssl_test
+++ b/qpid/cpp/src/tests/ssl_test
@@ -153,7 +153,7 @@ ssl_cluster_broker() { # $1 = port
start_brokers 1 "--ssl-port $1 --auth no --load-module $CLUSTER_LIB --cluster-name ssl_test.$HOSTNAME.$$ --cluster-url amqp:ssl:$TEST_HOSTNAME:$1"
# Wait for broker to be ready
- qpid-ping -Pssl -b $TEST_HOSTNAME -qp $1 || { echo "Cannot connect to broker on $1"; exit 1; }
+ qpid-ping -Pssl -b $TEST_HOSTNAME:$1 -q || { echo "Cannot connect to broker on $1"; exit 1; }
}
CERTUTIL=$(type -p certutil)