summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2011-10-21 01:19:00 +0000
committerStephen D. Huston <shuston@apache.org>2011-10-21 01:19:00 +0000
commitebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (patch)
treedcfb94e75656c6c239fc3dcb754cd2015126424d /cpp/src/qpid/client
parent5eb354b338bb8d8fcd35b6ac3fb33f8103e757c3 (diff)
downloadqpid-python-ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5.tar.gz
Undo bad merge from trunk - merged at wrong level.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187150 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp64
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp12
-rw-r--r--cpp/src/qpid/client/Connector.h2
-rw-r--r--cpp/src/qpid/client/RdmaConnector.cpp6
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp1
-rw-r--r--cpp/src/qpid/client/SslConnector.cpp6
-rw-r--r--cpp/src/qpid/client/TCPConnector.cpp6
-rw-r--r--cpp/src/qpid/client/TCPConnector.h2
-rw-r--r--cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp42
-rw-r--r--cpp/src/qpid/client/amqp0_10/AcceptTracker.h5
-rw-r--r--cpp/src/qpid/client/amqp0_10/AddressResolution.cpp60
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp179
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.h2
-rw-r--r--cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp10
-rw-r--r--cpp/src/qpid/client/amqp0_10/IncomingMessages.h2
-rw-r--r--cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp4
-rw-r--r--cpp/src/qpid/client/amqp0_10/SenderImpl.cpp3
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp56
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.h11
-rw-r--r--cpp/src/qpid/client/amqp0_10/SimpleUrlParser.cpp79
-rw-r--r--cpp/src/qpid/client/amqp0_10/SimpleUrlParser.h42
-rw-r--r--cpp/src/qpid/client/windows/SaslFactory.cpp16
-rw-r--r--cpp/src/qpid/client/windows/SslConnector.cpp4
23 files changed, 321 insertions, 293 deletions
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index ab0d8e0700..8dc1e8338a 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -22,7 +22,6 @@
#include "qpid/client/ConnectionHandler.h"
#include "qpid/SaslFactory.h"
-#include "qpid/StringUtils.h"
#include "qpid/client/Bounds.h"
#include "qpid/framing/amqp_framing.h"
#include "qpid/framing/all_method_bodies.h"
@@ -143,9 +142,7 @@ void ConnectionHandler::outgoing(AMQFrame& frame)
void ConnectionHandler::waitForOpen()
{
waitFor(ESTABLISHED);
- if (getState() == FAILED) {
- throw TransportFailure(errorText);
- } else if (getState() == CLOSED) {
+ if (getState() == FAILED || getState() == CLOSED) {
throw ConnectionException(errorCode, errorText);
}
}
@@ -205,24 +202,6 @@ void ConnectionHandler::fail(const std::string& message)
namespace {
std::string SPACE(" ");
-
-std::string join(const std::vector<std::string>& in)
-{
- std::string result;
- for (std::vector<std::string>::const_iterator i = in.begin(); i != in.end(); ++i) {
- if (result.size()) result += SPACE;
- result += *i;
- }
- return result;
-}
-
-void intersection(const std::vector<std::string>& a, const std::vector<std::string>& b, std::vector<std::string>& results)
-{
- for (std::vector<std::string>::const_iterator i = a.begin(); i != a.end(); ++i) {
- if (std::find(b.begin(), b.end(), *i) != b.end()) results.push_back(*i);
- }
-}
-
}
void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& mechanisms, const Array& /*locales*/)
@@ -237,35 +216,26 @@ void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& me
maxSsf
);
- std::vector<std::string> mechlist;
- if (mechanism.empty()) {
- //mechlist is simply what the server offers
- mechanisms.collect(mechlist);
- } else {
- //mechlist is the intersection of those indicated by user and
- //those supported by server, in the order listed by user
- std::vector<std::string> allowed = split(mechanism, " ");
- std::vector<std::string> supported;
- mechanisms.collect(supported);
- intersection(allowed, supported, mechlist);
- if (mechlist.empty()) {
- throw Exception(QPID_MSG("Desired mechanism(s) not valid: " << mechanism << " (supported: " << join(supported) << ")"));
+ std::string mechlist;
+ bool chosenMechanismSupported = mechanism.empty();
+ for (Array::const_iterator i = mechanisms.begin(); i != mechanisms.end(); ++i) {
+ if (!mechanism.empty() && mechanism == (*i)->get<std::string>()) {
+ chosenMechanismSupported = true;
+ mechlist = (*i)->get<std::string>() + SPACE + mechlist;
+ } else {
+ if (i != mechanisms.begin()) mechlist += SPACE;
+ mechlist += (*i)->get<std::string>();
}
}
+ if (!chosenMechanismSupported) {
+ fail("Selected mechanism not supported: " + mechanism);
+ }
+
if (sasl.get()) {
- string response;
- if (sasl->start(join(mechlist), response, getSecuritySettings ? getSecuritySettings() : 0)) {
- proxy.startOk(properties, sasl->getMechanism(), response, locale);
- } else {
- //response was null
- ConnectionStartOkBody body;
- body.setClientProperties(properties);
- body.setMechanism(sasl->getMechanism());
- //Don't set response, as none was given
- body.setLocale(locale);
- proxy.send(body);
- }
+ string response = sasl->start(mechanism.empty() ? mechlist : mechanism,
+ getSecuritySettings ? getSecuritySettings() : 0);
+ proxy.startOk(properties, sasl->getMechanism(), response, locale);
} else {
//TODO: verify that desired mechanism and locale are supported
string response = ((char)0) + username + ((char)0) + password;
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index db97f1e0f4..40c004f166 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -36,7 +36,6 @@
#include <boost/bind.hpp>
#include <boost/format.hpp>
-#include <boost/lexical_cast.hpp>
#include <boost/shared_ptr.hpp>
#include <limits>
@@ -259,16 +258,16 @@ void ConnectionImpl::open()
connector->setInputHandler(&handler);
connector->setShutdownHandler(this);
try {
- std::string p = boost::lexical_cast<std::string>(port);
- connector->connect(host, p);
-
+ connector->connect(host, port);
+
} catch (const std::exception& e) {
QPID_LOG(debug, "Failed to connect to " << protocol << ":" << host << ":" << port << " " << e.what());
connector.reset();
- throw TransportFailure(e.what());
+ throw;
}
connector->init();
-
+ QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port);
+
// Enable heartbeat if requested
uint16_t heartbeat = static_cast<ConnectionSettings&>(handler).heartbeat;
if (heartbeat) {
@@ -282,7 +281,6 @@ void ConnectionImpl::open()
// - in that case in connector.reset() above;
// - or when we are deleted
handler.waitForOpen();
- QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port);
// If the SASL layer has provided an "operational" userId for the connection,
// put it in the negotiated settings.
diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h
index bc611ffe0d..586012f9d6 100644
--- a/cpp/src/qpid/client/Connector.h
+++ b/cpp/src/qpid/client/Connector.h
@@ -61,7 +61,7 @@ class Connector : public framing::OutputHandler
static void registerFactory(const std::string& proto, Factory* connectorFactory);
virtual ~Connector() {};
- virtual void connect(const std::string& host, const std::string& port) = 0;
+ virtual void connect(const std::string& host, int port) = 0;
virtual void init() {};
virtual void close() = 0;
virtual void send(framing::AMQFrame& frame) = 0;
diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp
index 664640f5e7..6af607198c 100644
--- a/cpp/src/qpid/client/RdmaConnector.cpp
+++ b/cpp/src/qpid/client/RdmaConnector.cpp
@@ -95,7 +95,7 @@ class RdmaConnector : public Connector, public sys::Codec
std::string identifier;
- void connect(const std::string& host, const std::string& port);
+ void connect(const std::string& host, int port);
void close();
void send(framing::AMQFrame& frame);
void abort() {} // TODO: need to fix this for heartbeat timeouts to work
@@ -173,7 +173,7 @@ RdmaConnector::~RdmaConnector() {
}
}
-void RdmaConnector::connect(const std::string& host, const std::string& port){
+void RdmaConnector::connect(const std::string& host, int port){
Mutex::ScopedLock l(dataConnectedLock);
assert(!dataConnected);
@@ -184,7 +184,7 @@ void RdmaConnector::connect(const std::string& host, const std::string& port){
boost::bind(&RdmaConnector::disconnected, this),
boost::bind(&RdmaConnector::rejected, this, poller, _1, _2));
- SocketAddress sa(host, port);
+ SocketAddress sa(host, boost::lexical_cast<std::string>(port));
acon->start(poller, sa);
}
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index 7cf4ef648e..b507625b11 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -170,7 +170,6 @@ Demux& SessionImpl::getDemux()
void SessionImpl::waitForCompletion(const SequenceNumber& id)
{
Lock l(state);
- sys::Waitable::ScopedWait w(state);
waitForCompletionImpl(id);
}
diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp
index 26c2335eda..35c7e6bdf6 100644
--- a/cpp/src/qpid/client/SslConnector.cpp
+++ b/cpp/src/qpid/client/SslConnector.cpp
@@ -114,7 +114,7 @@ class SslConnector : public Connector
std::string identifier;
- void connect(const std::string& host, const std::string& port);
+ void connect(const std::string& host, int port);
void init();
void close();
void send(framing::AMQFrame& frame);
@@ -190,14 +190,14 @@ SslConnector::~SslConnector() {
close();
}
-void SslConnector::connect(const std::string& host, const std::string& port){
+void SslConnector::connect(const std::string& host, int port){
Mutex::ScopedLock l(closedLock);
assert(closed);
try {
socket.connect(host, port);
} catch (const std::exception& e) {
socket.close();
- throw TransportFailure(e.what());
+ throw ConnectionException(framing::connection::CLOSE_CODE_FRAMING_ERROR, e.what());
}
identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
diff --git a/cpp/src/qpid/client/TCPConnector.cpp b/cpp/src/qpid/client/TCPConnector.cpp
index 0070b24ec0..e284d57bec 100644
--- a/cpp/src/qpid/client/TCPConnector.cpp
+++ b/cpp/src/qpid/client/TCPConnector.cpp
@@ -88,7 +88,7 @@ TCPConnector::~TCPConnector() {
close();
}
-void TCPConnector::connect(const std::string& host, const std::string& port) {
+void TCPConnector::connect(const std::string& host, int port) {
Mutex::ScopedLock l(lock);
assert(closed);
connector = AsynchConnector::create(
@@ -117,11 +117,11 @@ void TCPConnector::connected(const Socket&) {
void TCPConnector::start(sys::AsynchIO* aio_) {
aio = aio_;
- for (int i = 0; i < 4; i++) {
+ for (int i = 0; i < 32; i++) {
aio->queueReadBuffer(new Buff(maxFrameSize));
}
- identifier = str(format("[%1%]") % socket.getFullAddress());
+ identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
}
void TCPConnector::initAmqp() {
diff --git a/cpp/src/qpid/client/TCPConnector.h b/cpp/src/qpid/client/TCPConnector.h
index eb3f696013..c756469182 100644
--- a/cpp/src/qpid/client/TCPConnector.h
+++ b/cpp/src/qpid/client/TCPConnector.h
@@ -98,7 +98,7 @@ class TCPConnector : public Connector, public sys::Codec
protected:
virtual ~TCPConnector();
- void connect(const std::string& host, const std::string& port);
+ void connect(const std::string& host, int port);
void start(sys::AsynchIO* aio_);
void initAmqp();
virtual void connectFailed(const std::string& msg);
diff --git a/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp b/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
index d2accddcd0..bfb20118b5 100644
--- a/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
+++ b/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
@@ -30,23 +30,12 @@ void AcceptTracker::State::accept()
unaccepted.clear();
}
-SequenceSet AcceptTracker::State::accept(qpid::framing::SequenceNumber id, bool cumulative)
+void AcceptTracker::State::accept(qpid::framing::SequenceNumber id)
{
- SequenceSet accepting;
- if (cumulative) {
- for (SequenceSet::iterator i = unaccepted.begin(); i != unaccepted.end() && *i <= id; ++i) {
- accepting.add(*i);
- }
- unconfirmed.add(accepting);
- unaccepted.remove(accepting);
- } else {
- if (unaccepted.contains(id)) {
- unaccepted.remove(id);
- unconfirmed.add(id);
- accepting.add(id);
- }
+ if (unaccepted.contains(id)) {
+ unaccepted.remove(id);
+ unconfirmed.add(id);
}
- return accepting;
}
void AcceptTracker::State::release()
@@ -70,18 +59,6 @@ void AcceptTracker::delivered(const std::string& destination, const qpid::framin
destinationState[destination].unaccepted.add(id);
}
-namespace
-{
-const size_t FLUSH_FREQUENCY = 1024;
-}
-
-void AcceptTracker::addToPending(qpid::client::AsyncSession& session, const Record& record)
-{
- pending.push_back(record);
- if (pending.size() % FLUSH_FREQUENCY == 0) session.flush();
-}
-
-
void AcceptTracker::accept(qpid::client::AsyncSession& session)
{
for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
@@ -90,19 +67,20 @@ void AcceptTracker::accept(qpid::client::AsyncSession& session)
Record record;
record.status = session.messageAccept(aggregateState.unaccepted);
record.accepted = aggregateState.unaccepted;
- addToPending(session, record);
+ pending.push_back(record);
aggregateState.accept();
}
-void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session, bool cumulative)
+void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session)
{
for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
- i->second.accept(id, cumulative);
+ i->second.accept(id);
}
Record record;
- record.accepted = aggregateState.accept(id, cumulative);
+ record.accepted.add(id);
record.status = session.messageAccept(record.accepted);
- addToPending(session, record);
+ pending.push_back(record);
+ aggregateState.accept(id);
}
void AcceptTracker::release(qpid::client::AsyncSession& session)
diff --git a/cpp/src/qpid/client/amqp0_10/AcceptTracker.h b/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
index 85209c3b87..87890e41cc 100644
--- a/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
+++ b/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
@@ -42,7 +42,7 @@ class AcceptTracker
public:
void delivered(const std::string& destination, const qpid::framing::SequenceNumber& id);
void accept(qpid::client::AsyncSession&);
- void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&, bool cumulative);
+ void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&);
void release(qpid::client::AsyncSession&);
uint32_t acceptsPending();
uint32_t acceptsPending(const std::string& destination);
@@ -62,7 +62,7 @@ class AcceptTracker
qpid::framing::SequenceSet unconfirmed;
void accept();
- qpid::framing::SequenceSet accept(qpid::framing::SequenceNumber, bool cumulative);
+ void accept(qpid::framing::SequenceNumber);
void release();
uint32_t acceptsPending();
void completed(qpid::framing::SequenceSet&);
@@ -79,7 +79,6 @@ class AcceptTracker
StateMap destinationState;
Records pending;
- void addToPending(qpid::client::AsyncSession&, const Record&);
void checkPending();
void completed(qpid::framing::SequenceSet&);
};
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index 16e5fde075..f1295a3b66 100644
--- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -129,10 +129,6 @@ const std::string HEADERS_EXCHANGE("headers");
const std::string XML_EXCHANGE("xml");
const std::string WILDCARD_ANY("#");
-//exchange prefixes:
-const std::string PREFIX_AMQ("amq.");
-const std::string PREFIX_QPID("qpid.");
-
const Verifier verifier;
}
@@ -203,7 +199,6 @@ class Exchange : protected Node
void checkCreate(qpid::client::AsyncSession&, CheckMode);
void checkAssert(qpid::client::AsyncSession&, CheckMode);
void checkDelete(qpid::client::AsyncSession&, CheckMode);
- bool isReservedName();
protected:
const std::string specifiedType;
@@ -238,8 +233,6 @@ class Subscription : public Exchange, public MessageSource
const bool reliable;
const bool durable;
const std::string actualType;
- const bool exclusiveQueue;
- const bool exclusiveSubscription;
FieldTable queueOptions;
FieldTable subscriptionOptions;
Bindings bindings;
@@ -314,7 +307,6 @@ struct Opt
Opt& operator/(const std::string& name);
operator bool() const;
std::string str() const;
- bool asBool(bool defaultValue) const;
const Variant::List& asList() const;
void collect(qpid::framing::FieldTable& args) const;
@@ -346,12 +338,6 @@ Opt::operator bool() const
return value && !value->isVoid() && value->asBool();
}
-bool Opt::asBool(bool defaultValue) const
-{
- if (value) return value->asBool();
- else return defaultValue;
-}
-
std::string Opt::str() const
{
if (value) return value->asString();
@@ -495,7 +481,7 @@ std::string Subscription::getSubscriptionName(const std::string& base, const std
if (name.empty()) {
return (boost::format("%1%_%2%") % base % Uuid(true).str()).str();
} else {
- return name;
+ return (boost::format("%1%_%2%") % base % name).str();
}
}
@@ -504,9 +490,7 @@ Subscription::Subscription(const Address& address, const std::string& type)
queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())),
reliable(AddressResolution::is_reliable(address)),
durable(Opt(address)/LINK/DURABLE),
- actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type),
- exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)),
- exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue))
+ actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type)
{
(Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
(Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
@@ -566,7 +550,7 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str
checkAssert(session, FOR_RECEIVER);
//create subscription queue:
- session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue,
+ session.queueDeclare(arg::queue=queue, arg::exclusive=true,
arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions);
//'default' binding:
bindings.bind(session);
@@ -575,15 +559,15 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str
linkBindings.bind(session);
//subscribe to subscription queue:
AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
- session.messageSubscribe(arg::queue=queue, arg::destination=destination,
- arg::exclusive=exclusiveSubscription, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
+ session.messageSubscribe(arg::queue=queue, arg::destination=destination,
+ arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
}
void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination)
{
linkBindings.unbind(session);
session.messageCancel(destination);
- if (exclusiveQueue) session.queueDelete(arg::queue=queue, arg::ifUnused=true);
+ session.queueDelete(arg::queue=queue);
checkDelete(session, FOR_RECEIVER);
}
@@ -777,32 +761,18 @@ Exchange::Exchange(const Address& a) : Node(a),
linkBindings.setDefaultExchange(name);
}
-bool Exchange::isReservedName()
-{
- return name.find(PREFIX_AMQ) != std::string::npos || name.find(PREFIX_QPID) != std::string::npos;
-}
-
void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
{
if (enabled(createPolicy, mode)) {
try {
- if (isReservedName()) {
- try {
- sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
- } catch (const qpid::framing::NotFoundException& /*e*/) {
- throw ResolutionError((boost::format("Cannot create exchange %1%; names beginning with \"amq.\" or \"qpid.\" are reserved.") % name).str());
- }
-
- } else {
- std::string type = specifiedType;
- if (type.empty()) type = TOPIC_EXCHANGE;
- session.exchangeDeclare(arg::exchange=name,
- arg::type=type,
- arg::durable=durable,
- arg::autoDelete=autoDelete,
- arg::alternateExchange=alternateExchange,
- arg::arguments=arguments);
- }
+ std::string type = specifiedType;
+ if (type.empty()) type = TOPIC_EXCHANGE;
+ session.exchangeDeclare(arg::exchange=name,
+ arg::type=type,
+ arg::durable=durable,
+ arg::autoDelete=autoDelete,
+ arg::alternateExchange=alternateExchange,
+ arg::arguments=arguments);
nodeBindings.bind(session);
session.sync();
} catch (const qpid::framing::NotAllowedException& e) {
@@ -852,7 +822,7 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
FieldTable::ValuePtr v = result.getArguments().get(i->first);
if (!v) {
throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str());
- } else if (*i->second != *v) {
+ } else if (i->second != v) {
throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
% i->first % name % *(i->second) % *v).str());
}
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index cc6e9b9ab2..5a545c1f6a 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -20,6 +20,7 @@
*/
#include "ConnectionImpl.h"
#include "SessionImpl.h"
+#include "SimpleUrlParser.h"
#include "qpid/messaging/exceptions.h"
#include "qpid/messaging/Session.h"
#include "qpid/messaging/PrivateImplRef.h"
@@ -38,18 +39,26 @@ using qpid::types::Variant;
using qpid::types::VAR_LIST;
using qpid::framing::Uuid;
-namespace {
-void merge(const std::string& value, std::vector<std::string>& list) {
- if (std::find(list.begin(), list.end(), value) == list.end())
- list.push_back(value);
+void convert(const Variant::List& from, std::vector<std::string>& to)
+{
+ for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) {
+ to.push_back(i->asString());
+ }
}
-void merge(const Variant::List& from, std::vector<std::string>& to)
+template <class T> bool setIfFound(const Variant::Map& map, const std::string& key, T& value)
{
- for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i)
- merge(i->asString(), to);
+ Variant::Map::const_iterator i = map.find(key);
+ if (i != map.end()) {
+ value = (T) i->second;
+ QPID_LOG(debug, "option " << key << " specified as " << i->second);
+ return true;
+ } else {
+ return false;
+ }
}
+namespace {
std::string asString(const std::vector<std::string>& v) {
std::stringstream os;
os << "[";
@@ -62,8 +71,49 @@ std::string asString(const std::vector<std::string>& v) {
}
}
+template <> bool setIfFound< std::vector<std::string> >(const Variant::Map& map,
+ const std::string& key,
+ std::vector<std::string>& value)
+{
+ Variant::Map::const_iterator i = map.find(key);
+ if (i != map.end()) {
+ value.clear();
+ if (i->second.getType() == VAR_LIST) {
+ convert(i->second.asList(), value);
+ } else {
+ value.push_back(i->second.asString());
+ }
+ QPID_LOG(debug, "option " << key << " specified as " << asString(value));
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void convert(const Variant::Map& from, ConnectionSettings& to)
+{
+ setIfFound(from, "username", to.username);
+ setIfFound(from, "password", to.password);
+ setIfFound(from, "sasl-mechanism", to.mechanism);
+ setIfFound(from, "sasl-service", to.service);
+ setIfFound(from, "sasl-min-ssf", to.minSsf);
+ setIfFound(from, "sasl-max-ssf", to.maxSsf);
+
+ setIfFound(from, "heartbeat", to.heartbeat);
+ setIfFound(from, "tcp-nodelay", to.tcpNoDelay);
+
+ setIfFound(from, "locale", to.locale);
+ setIfFound(from, "max-channels", to.maxChannels);
+ setIfFound(from, "max-frame-size", to.maxFrameSize);
+ setIfFound(from, "bounds", to.bounds);
+
+ setIfFound(from, "transport", to.protocol);
+
+ setIfFound(from, "ssl-cert-name", to.sslCertName);
+}
+
ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
- replaceUrls(false), reconnect(false), timeout(-1), limit(-1),
+ reconnect(false), timeout(-1), limit(-1),
minReconnectInterval(3), maxReconnectInterval(60),
retries(0), reconnectOnLimitExceeded(true)
{
@@ -74,69 +124,27 @@ ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& optio
void ConnectionImpl::setOptions(const Variant::Map& options)
{
- for (Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) {
- setOption(i->first, i->second);
+ sys::Mutex::ScopedLock l(lock);
+ convert(options, settings);
+ setIfFound(options, "reconnect", reconnect);
+ setIfFound(options, "reconnect-timeout", timeout);
+ setIfFound(options, "reconnect-limit", limit);
+ int64_t reconnectInterval;
+ if (setIfFound(options, "reconnect-interval", reconnectInterval)) {
+ minReconnectInterval = maxReconnectInterval = reconnectInterval;
+ } else {
+ setIfFound(options, "reconnect-interval-min", minReconnectInterval);
+ setIfFound(options, "reconnect-interval-max", maxReconnectInterval);
}
+ setIfFound(options, "reconnect-urls", urls);
+ setIfFound(options, "x-reconnect-on-limit-exceeded", reconnectOnLimitExceeded);
}
void ConnectionImpl::setOption(const std::string& name, const Variant& value)
{
- sys::Mutex::ScopedLock l(lock);
- if (name == "reconnect") {
- reconnect = value;
- } else if (name == "reconnect-timeout" || name == "reconnect_timeout") {
- timeout = value;
- } else if (name == "reconnect-limit" || name == "reconnect_limit") {
- limit = value;
- } else if (name == "reconnect-interval" || name == "reconnect_interval") {
- maxReconnectInterval = minReconnectInterval = value;
- } else if (name == "reconnect-interval-min" || name == "reconnect_interval_min") {
- minReconnectInterval = value;
- } else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") {
- maxReconnectInterval = value;
- } else if (name == "reconnect-urls-replace" || name == "reconnect_urls_replace") {
- replaceUrls = value.asBool();
- } else if (name == "reconnect-urls" || name == "reconnect_urls") {
- if (replaceUrls) urls.clear();
- if (value.getType() == VAR_LIST) {
- merge(value.asList(), urls);
- } else {
- merge(value.asString(), urls);
- }
- } else if (name == "username") {
- settings.username = value.asString();
- } else if (name == "password") {
- settings.password = value.asString();
- } else if (name == "sasl-mechanism" || name == "sasl_mechanism" ||
- name == "sasl-mechanisms" || name == "sasl_mechanisms") {
- settings.mechanism = value.asString();
- } else if (name == "sasl-service" || name == "sasl_service") {
- settings.service = value.asString();
- } else if (name == "sasl-min-ssf" || name == "sasl_min_ssf") {
- settings.minSsf = value;
- } else if (name == "sasl-max-ssf" || name == "sasl_max_ssf") {
- settings.maxSsf = value;
- } else if (name == "heartbeat") {
- settings.heartbeat = value;
- } else if (name == "tcp-nodelay" || name == "tcp_nodelay") {
- settings.tcpNoDelay = value;
- } else if (name == "locale") {
- settings.locale = value.asString();
- } else if (name == "max-channels" || name == "max_channels") {
- settings.maxChannels = value;
- } else if (name == "max-frame-size" || name == "max_frame_size") {
- settings.maxFrameSize = value;
- } else if (name == "bounds") {
- settings.bounds = value;
- } else if (name == "transport") {
- settings.protocol = value.asString();
- } else if (name == "ssl-cert-name" || name == "ssl_cert_name") {
- settings.sslCertName = value.asString();
- } else if (name == "x-reconnect-on-limit-exceeded" || name == "x_reconnect_on_limit_exceeded") {
- reconnectOnLimitExceeded = value;
- } else {
- throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised"));
- }
+ Variant::Map options;
+ options[name] = value;
+ setOptions(options);
}
@@ -206,7 +214,7 @@ qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const st
sessions[name] = impl;
break;
} catch (const qpid::TransportFailure&) {
- reopen();
+ open();
} catch (const qpid::SessionException& e) {
throw qpid::messaging::SessionError(e.what());
} catch (const std::exception& e) {
@@ -227,15 +235,6 @@ void ConnectionImpl::open()
catch (const qpid::Exception& e) { throw messaging::ConnectionError(e.what()); }
}
-void ConnectionImpl::reopen()
-{
- if (!reconnect) {
- throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
- }
- open();
-}
-
-
bool expired(const qpid::sys::AbsTime& start, int64_t timeout)
{
if (timeout == 0) return true;
@@ -263,9 +262,14 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
}
void ConnectionImpl::mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&) {
- for (std::vector<Url>::const_iterator i = more.begin(); i != more.end(); ++i)
- merge(i->str(), urls);
- QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls));
+ if (more.size()) {
+ for (size_t i = 0; i < more.size(); ++i) {
+ if (std::find(urls.begin(), urls.end(), more[i].str()) == urls.end()) {
+ urls.push_back(more[i].str());
+ }
+ }
+ QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls));
+ }
}
bool ConnectionImpl::tryConnect()
@@ -274,14 +278,21 @@ bool ConnectionImpl::tryConnect()
for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
try {
QPID_LOG(info, "Trying to connect to " << *i << "...");
- Url url(*i);
- if (url.getUser().size()) settings.username = url.getUser();
- if (url.getPass().size()) settings.password = url.getPass();
- connection.open(url, settings);
+ //TODO: when url support is more complete can avoid this test here
+ if (i->find("amqp:") == 0) {
+ Url url(*i);
+ connection.open(url, settings);
+ } else {
+ SimpleUrlParser::parse(*i, settings);
+ connection.open(settings);
+ }
QPID_LOG(info, "Connected to " << *i);
mergeUrls(connection.getInitialBrokers(), l);
return resetSessions(l);
- } catch (const qpid::TransportFailure& e) {
+ } catch (const qpid::ConnectionException& e) {
+ //TODO: need to fix timeout on
+ //qpid::client::Connection::open() so that it throws
+ //TransportFailure rather than a ConnectionException
QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
}
}
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
index 1b58cbbe3e..09f2038312 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
@@ -43,7 +43,6 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
public:
ConnectionImpl(const std::string& url, const qpid::types::Variant::Map& options);
void open();
- void reopen();
bool isOpen() const;
void close();
qpid::messaging::Session newSession(bool transactional, const std::string& name);
@@ -60,7 +59,6 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
qpid::sys::Semaphore semaphore;//used to coordinate reconnection
Sessions sessions;
qpid::client::Connection connection;
- bool replaceUrls; // Replace rather than merging with reconnect-urls
std::vector<std::string> urls;
qpid::client::ConnectionSettings settings;
bool reconnect;
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index 3badaf40ba..71e89bdba1 100644
--- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -144,10 +144,10 @@ void IncomingMessages::accept()
acceptTracker.accept(session);
}
-void IncomingMessages::accept(qpid::framing::SequenceNumber id, bool cumulative)
+void IncomingMessages::accept(qpid::framing::SequenceNumber id)
{
sys::Mutex::ScopedLock l(lock);
- acceptTracker.accept(id, session, cumulative);
+ acceptTracker.accept(id, session);
}
@@ -301,7 +301,6 @@ const std::string SUBJECT("qpid.subject");
const std::string X_APP_ID("x-amqp-0-10.app-id");
const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key");
const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding");
-const std::string X_TIMESTAMP("x-amqp-0-10.timestamp");
}
void populateHeaders(qpid::messaging::Message& message,
@@ -335,13 +334,10 @@ void populateHeaders(qpid::messaging::Message& message,
if (messageProperties->hasContentEncoding()) {
message.getProperties()[X_CONTENT_ENCODING] = messageProperties->getContentEncoding();
}
- // routing-key, timestamp, others?
+ // routing-key, others?
if (deliveryProperties && deliveryProperties->hasRoutingKey()) {
message.getProperties()[X_ROUTING_KEY] = deliveryProperties->getRoutingKey();
}
- if (deliveryProperties && deliveryProperties->hasTimestamp()) {
- message.getProperties()[X_TIMESTAMP] = deliveryProperties->getTimestamp();
- }
}
}
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
index 9053b70312..f6a291bc68 100644
--- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
+++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
@@ -72,7 +72,7 @@ class IncomingMessages
bool get(Handler& handler, qpid::sys::Duration timeout);
bool getNextDestination(std::string& destination, qpid::sys::Duration timeout);
void accept();
- void accept(qpid::framing::SequenceNumber id, bool cumulative);
+ void accept(qpid::framing::SequenceNumber id);
void releaseAll();
void releasePending(const std::string& destination);
diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
index d93416da75..82358961c8 100644
--- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
+++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
@@ -59,9 +59,7 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from)
message.getMessageProperties().setReplyTo(AddressResolution::convert(address));
}
translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders());
- if (from.getTtl().getMilliseconds()) {
- message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds());
- }
+ message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds());
if (from.getDurable()) {
message.getDeliveryProperties().setDeliveryMode(DELIVERY_MODE_PERSISTENT);
}
diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
index f2f0f1a9e5..e1b75ec0cf 100644
--- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
@@ -135,7 +135,6 @@ void SenderImpl::sendUnreliable(const qpid::messaging::Message& m)
void SenderImpl::replay(const sys::Mutex::ScopedLock&)
{
for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
- i->message.setRedelivered(true);
sink->send(session, name, *i);
}
}
@@ -148,7 +147,7 @@ uint32_t SenderImpl::checkPendingSends(bool flush) {
uint32_t SenderImpl::checkPendingSends(bool flush, const sys::Mutex::ScopedLock&)
{
if (flush) {
- session.flush();
+ session.flush();
flushed = true;
} else {
flushed = false;
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index be5eab1f2b..75a71997fd 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -60,14 +60,12 @@ SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactio
void SessionImpl::checkError()
{
- ScopedLock l(lock);
qpid::client::SessionBase_0_10Access s(session);
s.get()->assertOpen();
}
bool SessionImpl::hasError()
{
- ScopedLock l(lock);
qpid::client::SessionBase_0_10Access s(session);
return s.get()->hasError();
}
@@ -114,14 +112,13 @@ void SessionImpl::release(qpid::messaging::Message& m)
execute1<Release>(m);
}
-void SessionImpl::acknowledge(qpid::messaging::Message& m, bool cumulative)
+void SessionImpl::acknowledge(qpid::messaging::Message& m)
{
//Should probably throw an exception on failure here, or indicate
//it through a return type at least. Failure means that the
//message may be redelivered; i.e. the application cannot delete
//any state necessary for preventing reprocessing of the message
- Acknowledge2 ack(*this, m, cumulative);
- execute(ack);
+ execute1<Acknowledge1>(m);
}
void SessionImpl::close()
@@ -131,29 +128,27 @@ void SessionImpl::close()
senders.clear();
receivers.clear();
} else {
- Senders sCopy;
- Receivers rCopy;
- {
- ScopedLock l(lock);
- senders.swap(sCopy);
- receivers.swap(rCopy);
- }
- for (Senders::iterator i = sCopy.begin(); i != sCopy.end(); ++i)
- {
- // outside the lock, will call senderCancelled
- i->second.close();
+ while (true) {
+ Sender s;
+ {
+ ScopedLock l(lock);
+ if (senders.empty()) break;
+ s = senders.begin()->second;
+ }
+ s.close(); // outside the lock, will call senderCancelled
}
- for (Receivers::iterator i = rCopy.begin(); i != rCopy.end(); ++i)
- {
- // outside the lock, will call receiverCancelled
- i->second.close();
+ while (true) {
+ Receiver r;
+ {
+ ScopedLock l(lock);
+ if (receivers.empty()) break;
+ r = receivers.begin()->second;
+ }
+ r.close(); // outside the lock, will call receiverCancelled
}
}
connection->closed(*this);
- if (!hasError()) {
- ScopedLock l(lock);
- session.close();
- }
+ if (!hasError()) session.close();
}
template <class T, class S> boost::intrusive_ptr<S> getImplPtr(T& t)
@@ -436,11 +431,8 @@ uint32_t SessionImpl::getUnsettledAcksImpl(const std::string* destination)
void SessionImpl::syncImpl(bool block)
{
- {
- ScopedLock l(lock);
- if (block) session.sync();
- else session.flush();
- }
+ if (block) session.sync();
+ else session.flush();
//cleanup unconfirmed accept records:
incoming.pendingAccept();
}
@@ -475,10 +467,10 @@ void SessionImpl::acknowledgeImpl()
if (!transactional) incoming.accept();
}
-void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m, bool cumulative)
+void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m)
{
ScopedLock l(lock);
- if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId(), cumulative);
+ if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId());
}
void SessionImpl::rejectImpl(qpid::messaging::Message& m)
@@ -517,7 +509,7 @@ void SessionImpl::senderCancelled(const std::string& name)
void SessionImpl::reconnect()
{
- connection->reopen();
+ connection->open();
}
bool SessionImpl::backoff()
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
index c7dea77d18..2a2aa47df6 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -63,7 +63,7 @@ class SessionImpl : public qpid::messaging::SessionImpl
void acknowledge(bool sync);
void reject(qpid::messaging::Message&);
void release(qpid::messaging::Message&);
- void acknowledge(qpid::messaging::Message& msg, bool cumulative);
+ void acknowledge(qpid::messaging::Message& msg);
void close();
void sync(bool block);
qpid::messaging::Sender createSender(const qpid::messaging::Address& address);
@@ -139,7 +139,7 @@ class SessionImpl : public qpid::messaging::SessionImpl
void commitImpl();
void rollbackImpl();
void acknowledgeImpl();
- void acknowledgeImpl(qpid::messaging::Message&, bool cumulative);
+ void acknowledgeImpl(qpid::messaging::Message&);
void rejectImpl(qpid::messaging::Message&);
void releaseImpl(qpid::messaging::Message&);
void closeImpl();
@@ -204,13 +204,12 @@ class SessionImpl : public qpid::messaging::SessionImpl
void operator()() { impl.releaseImpl(message); }
};
- struct Acknowledge2 : Command
+ struct Acknowledge1 : Command
{
qpid::messaging::Message& message;
- bool cumulative;
- Acknowledge2(SessionImpl& i, qpid::messaging::Message& m, bool c) : Command(i), message(m), cumulative(c) {}
- void operator()() { impl.acknowledgeImpl(message, cumulative); }
+ Acknowledge1(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {}
+ void operator()() { impl.acknowledgeImpl(message); }
};
struct CreateSender;
diff --git a/cpp/src/qpid/client/amqp0_10/SimpleUrlParser.cpp b/cpp/src/qpid/client/amqp0_10/SimpleUrlParser.cpp
new file mode 100644
index 0000000000..327c2274a6
--- /dev/null
+++ b/cpp/src/qpid/client/amqp0_10/SimpleUrlParser.cpp
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SimpleUrlParser.h"
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/Exception.h"
+#include <boost/lexical_cast.hpp>
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+bool split(const std::string& in, char delim, std::pair<std::string, std::string>& result)
+{
+ std::string::size_type i = in.find(delim);
+ if (i != std::string::npos) {
+ result.first = in.substr(0, i);
+ if (i+1 < in.size()) {
+ result.second = in.substr(i+1);
+ }
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void parseUsernameAndPassword(const std::string& in, qpid::client::ConnectionSettings& result)
+{
+ std::pair<std::string, std::string> parts;
+ if (!split(in, '/', parts)) {
+ result.username = in;
+ } else {
+ result.username = parts.first;
+ result.password = parts.second;
+ }
+}
+
+void parseHostAndPort(const std::string& in, qpid::client::ConnectionSettings& result)
+{
+ std::pair<std::string, std::string> parts;
+ if (!split(in, ':', parts)) {
+ result.host = in;
+ } else {
+ result.host = parts.first;
+ if (parts.second.size()) {
+ result.port = boost::lexical_cast<uint16_t>(parts.second);
+ }
+ }
+}
+
+void SimpleUrlParser::parse(const std::string& url, qpid::client::ConnectionSettings& result)
+{
+ std::pair<std::string, std::string> parts;
+ if (!split(url, '@', parts)) {
+ parseHostAndPort(url, result);
+ } else {
+ parseUsernameAndPassword(parts.first, result);
+ parseHostAndPort(parts.second, result);
+ }
+}
+
+}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/SimpleUrlParser.h b/cpp/src/qpid/client/amqp0_10/SimpleUrlParser.h
new file mode 100644
index 0000000000..24f90ca9d6
--- /dev/null
+++ b/cpp/src/qpid/client/amqp0_10/SimpleUrlParser.h
@@ -0,0 +1,42 @@
+#ifndef QPID_CLIENT_AMQP0_10_SIMPLEURLPARSER_H
+#define QPID_CLIENT_AMQP0_10_SIMPLEURLPARSER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+
+namespace qpid {
+namespace client {
+
+struct ConnectionSettings;
+
+namespace amqp0_10 {
+
+/**
+ * Parses a simple url of the form user/password@hostname:port
+ */
+struct SimpleUrlParser
+{
+ static void parse(const std::string& url, qpid::client::ConnectionSettings& result);
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_SIMPLEURLPARSER_H*/
diff --git a/cpp/src/qpid/client/windows/SaslFactory.cpp b/cpp/src/qpid/client/windows/SaslFactory.cpp
index 53d825771b..63c7fa3d1f 100644
--- a/cpp/src/qpid/client/windows/SaslFactory.cpp
+++ b/cpp/src/qpid/client/windows/SaslFactory.cpp
@@ -71,7 +71,7 @@ class WindowsSasl : public Sasl
public:
WindowsSasl( const std::string &, const std::string &, const std::string &, const std::string &, int, int );
~WindowsSasl();
- bool start(const std::string& mechanisms, std::string& response, const SecuritySettings* externalSettings);
+ std::string start(const std::string& mechanisms, const SecuritySettings* externalSettings);
std::string step(const std::string& challenge);
std::string getMechanism();
std::string getUserId();
@@ -121,8 +121,8 @@ WindowsSasl::~WindowsSasl()
{
}
-bool WindowsSasl::start(const std::string& mechanisms, std::string& response,
- const SecuritySettings* /*externalSettings*/)
+std::string WindowsSasl::start(const std::string& mechanisms,
+ const SecuritySettings* /*externalSettings*/)
{
QPID_LOG(debug, "WindowsSasl::start(" << mechanisms << ")");
@@ -142,18 +142,18 @@ bool WindowsSasl::start(const std::string& mechanisms, std::string& response,
if (!haveAnon && !havePlain)
throw InternalErrorException(QPID_MSG("Sasl error: no common mechanism"));
+ std::string resp = "";
if (havePlain) {
mechanism = PLAIN;
- response = ((char)0) + settings.username + ((char)0) + settings.password;
+ resp = ((char)0) + settings.username + ((char)0) + settings.password;
}
else {
mechanism = ANONYMOUS;
- response = "";
}
- return true;
+ return resp;
}
-std::string WindowsSasl::step(const std::string& /*challenge*/)
+std::string WindowsSasl::step(const std::string& challenge)
{
// Shouldn't get this for PLAIN...
throw InternalErrorException(QPID_MSG("Sasl step error"));
@@ -169,7 +169,7 @@ std::string WindowsSasl::getUserId()
return std::string(); // TODO - when GSSAPI is supported, return userId for connection.
}
-std::auto_ptr<SecurityLayer> WindowsSasl::getSecurityLayer(uint16_t /*maxFrameSize*/)
+std::auto_ptr<SecurityLayer> WindowsSasl::getSecurityLayer(uint16_t maxFrameSize)
{
return std::auto_ptr<SecurityLayer>(0);
}
diff --git a/cpp/src/qpid/client/windows/SslConnector.cpp b/cpp/src/qpid/client/windows/SslConnector.cpp
index 785c817928..a33713e1a8 100644
--- a/cpp/src/qpid/client/windows/SslConnector.cpp
+++ b/cpp/src/qpid/client/windows/SslConnector.cpp
@@ -77,7 +77,7 @@ public:
framing::ProtocolVersion pVersion,
const ConnectionSettings&,
ConnectionImpl*);
- virtual void connect(const std::string& host, const std::string& port);
+ virtual void connect(const std::string& host, int port);
virtual void connected(const Socket&);
unsigned int getSSF();
};
@@ -153,7 +153,7 @@ SslConnector::~SslConnector()
// Will this get reach via virtual method via boost::bind????
-void SslConnector::connect(const std::string& host, const std::string& port) {
+void SslConnector::connect(const std::string& host, int port) {
brokerHost = host;
TCPConnector::connect(host, port);
}