diff options
| author | Stephen D. Huston <shuston@apache.org> | 2011-10-21 01:19:00 +0000 |
|---|---|---|
| committer | Stephen D. Huston <shuston@apache.org> | 2011-10-21 01:19:00 +0000 |
| commit | ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (patch) | |
| tree | dcfb94e75656c6c239fc3dcb754cd2015126424d /cpp/src/qpid/client | |
| parent | 5eb354b338bb8d8fcd35b6ac3fb33f8103e757c3 (diff) | |
| download | qpid-python-ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5.tar.gz | |
Undo bad merge from trunk - merged at wrong level.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187150 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
23 files changed, 321 insertions, 293 deletions
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index ab0d8e0700..8dc1e8338a 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -22,7 +22,6 @@ #include "qpid/client/ConnectionHandler.h" #include "qpid/SaslFactory.h" -#include "qpid/StringUtils.h" #include "qpid/client/Bounds.h" #include "qpid/framing/amqp_framing.h" #include "qpid/framing/all_method_bodies.h" @@ -143,9 +142,7 @@ void ConnectionHandler::outgoing(AMQFrame& frame) void ConnectionHandler::waitForOpen() { waitFor(ESTABLISHED); - if (getState() == FAILED) { - throw TransportFailure(errorText); - } else if (getState() == CLOSED) { + if (getState() == FAILED || getState() == CLOSED) { throw ConnectionException(errorCode, errorText); } } @@ -205,24 +202,6 @@ void ConnectionHandler::fail(const std::string& message) namespace { std::string SPACE(" "); - -std::string join(const std::vector<std::string>& in) -{ - std::string result; - for (std::vector<std::string>::const_iterator i = in.begin(); i != in.end(); ++i) { - if (result.size()) result += SPACE; - result += *i; - } - return result; -} - -void intersection(const std::vector<std::string>& a, const std::vector<std::string>& b, std::vector<std::string>& results) -{ - for (std::vector<std::string>::const_iterator i = a.begin(); i != a.end(); ++i) { - if (std::find(b.begin(), b.end(), *i) != b.end()) results.push_back(*i); - } -} - } void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& mechanisms, const Array& /*locales*/) @@ -237,35 +216,26 @@ void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& me maxSsf ); - std::vector<std::string> mechlist; - if (mechanism.empty()) { - //mechlist is simply what the server offers - mechanisms.collect(mechlist); - } else { - //mechlist is the intersection of those indicated by user and - //those supported by server, in the order listed by user - std::vector<std::string> allowed = split(mechanism, " "); - std::vector<std::string> supported; - mechanisms.collect(supported); - intersection(allowed, supported, mechlist); - if (mechlist.empty()) { - throw Exception(QPID_MSG("Desired mechanism(s) not valid: " << mechanism << " (supported: " << join(supported) << ")")); + std::string mechlist; + bool chosenMechanismSupported = mechanism.empty(); + for (Array::const_iterator i = mechanisms.begin(); i != mechanisms.end(); ++i) { + if (!mechanism.empty() && mechanism == (*i)->get<std::string>()) { + chosenMechanismSupported = true; + mechlist = (*i)->get<std::string>() + SPACE + mechlist; + } else { + if (i != mechanisms.begin()) mechlist += SPACE; + mechlist += (*i)->get<std::string>(); } } + if (!chosenMechanismSupported) { + fail("Selected mechanism not supported: " + mechanism); + } + if (sasl.get()) { - string response; - if (sasl->start(join(mechlist), response, getSecuritySettings ? getSecuritySettings() : 0)) { - proxy.startOk(properties, sasl->getMechanism(), response, locale); - } else { - //response was null - ConnectionStartOkBody body; - body.setClientProperties(properties); - body.setMechanism(sasl->getMechanism()); - //Don't set response, as none was given - body.setLocale(locale); - proxy.send(body); - } + string response = sasl->start(mechanism.empty() ? mechlist : mechanism, + getSecuritySettings ? getSecuritySettings() : 0); + proxy.startOk(properties, sasl->getMechanism(), response, locale); } else { //TODO: verify that desired mechanism and locale are supported string response = ((char)0) + username + ((char)0) + password; diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index db97f1e0f4..40c004f166 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -36,7 +36,6 @@ #include <boost/bind.hpp> #include <boost/format.hpp> -#include <boost/lexical_cast.hpp> #include <boost/shared_ptr.hpp> #include <limits> @@ -259,16 +258,16 @@ void ConnectionImpl::open() connector->setInputHandler(&handler); connector->setShutdownHandler(this); try { - std::string p = boost::lexical_cast<std::string>(port); - connector->connect(host, p); - + connector->connect(host, port); + } catch (const std::exception& e) { QPID_LOG(debug, "Failed to connect to " << protocol << ":" << host << ":" << port << " " << e.what()); connector.reset(); - throw TransportFailure(e.what()); + throw; } connector->init(); - + QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port); + // Enable heartbeat if requested uint16_t heartbeat = static_cast<ConnectionSettings&>(handler).heartbeat; if (heartbeat) { @@ -282,7 +281,6 @@ void ConnectionImpl::open() // - in that case in connector.reset() above; // - or when we are deleted handler.waitForOpen(); - QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port); // If the SASL layer has provided an "operational" userId for the connection, // put it in the negotiated settings. diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index bc611ffe0d..586012f9d6 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -61,7 +61,7 @@ class Connector : public framing::OutputHandler static void registerFactory(const std::string& proto, Factory* connectorFactory); virtual ~Connector() {}; - virtual void connect(const std::string& host, const std::string& port) = 0; + virtual void connect(const std::string& host, int port) = 0; virtual void init() {}; virtual void close() = 0; virtual void send(framing::AMQFrame& frame) = 0; diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp index 664640f5e7..6af607198c 100644 --- a/cpp/src/qpid/client/RdmaConnector.cpp +++ b/cpp/src/qpid/client/RdmaConnector.cpp @@ -95,7 +95,7 @@ class RdmaConnector : public Connector, public sys::Codec std::string identifier; - void connect(const std::string& host, const std::string& port); + void connect(const std::string& host, int port); void close(); void send(framing::AMQFrame& frame); void abort() {} // TODO: need to fix this for heartbeat timeouts to work @@ -173,7 +173,7 @@ RdmaConnector::~RdmaConnector() { } } -void RdmaConnector::connect(const std::string& host, const std::string& port){ +void RdmaConnector::connect(const std::string& host, int port){ Mutex::ScopedLock l(dataConnectedLock); assert(!dataConnected); @@ -184,7 +184,7 @@ void RdmaConnector::connect(const std::string& host, const std::string& port){ boost::bind(&RdmaConnector::disconnected, this), boost::bind(&RdmaConnector::rejected, this, poller, _1, _2)); - SocketAddress sa(host, port); + SocketAddress sa(host, boost::lexical_cast<std::string>(port)); acon->start(poller, sa); } diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 7cf4ef648e..b507625b11 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -170,7 +170,6 @@ Demux& SessionImpl::getDemux() void SessionImpl::waitForCompletion(const SequenceNumber& id) { Lock l(state); - sys::Waitable::ScopedWait w(state); waitForCompletionImpl(id); } diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp index 26c2335eda..35c7e6bdf6 100644 --- a/cpp/src/qpid/client/SslConnector.cpp +++ b/cpp/src/qpid/client/SslConnector.cpp @@ -114,7 +114,7 @@ class SslConnector : public Connector std::string identifier; - void connect(const std::string& host, const std::string& port); + void connect(const std::string& host, int port); void init(); void close(); void send(framing::AMQFrame& frame); @@ -190,14 +190,14 @@ SslConnector::~SslConnector() { close(); } -void SslConnector::connect(const std::string& host, const std::string& port){ +void SslConnector::connect(const std::string& host, int port){ Mutex::ScopedLock l(closedLock); assert(closed); try { socket.connect(host, port); } catch (const std::exception& e) { socket.close(); - throw TransportFailure(e.what()); + throw ConnectionException(framing::connection::CLOSE_CODE_FRAMING_ERROR, e.what()); } identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); diff --git a/cpp/src/qpid/client/TCPConnector.cpp b/cpp/src/qpid/client/TCPConnector.cpp index 0070b24ec0..e284d57bec 100644 --- a/cpp/src/qpid/client/TCPConnector.cpp +++ b/cpp/src/qpid/client/TCPConnector.cpp @@ -88,7 +88,7 @@ TCPConnector::~TCPConnector() { close(); } -void TCPConnector::connect(const std::string& host, const std::string& port) { +void TCPConnector::connect(const std::string& host, int port) { Mutex::ScopedLock l(lock); assert(closed); connector = AsynchConnector::create( @@ -117,11 +117,11 @@ void TCPConnector::connected(const Socket&) { void TCPConnector::start(sys::AsynchIO* aio_) { aio = aio_; - for (int i = 0; i < 4; i++) { + for (int i = 0; i < 32; i++) { aio->queueReadBuffer(new Buff(maxFrameSize)); } - identifier = str(format("[%1%]") % socket.getFullAddress()); + identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); } void TCPConnector::initAmqp() { diff --git a/cpp/src/qpid/client/TCPConnector.h b/cpp/src/qpid/client/TCPConnector.h index eb3f696013..c756469182 100644 --- a/cpp/src/qpid/client/TCPConnector.h +++ b/cpp/src/qpid/client/TCPConnector.h @@ -98,7 +98,7 @@ class TCPConnector : public Connector, public sys::Codec protected: virtual ~TCPConnector(); - void connect(const std::string& host, const std::string& port); + void connect(const std::string& host, int port); void start(sys::AsynchIO* aio_); void initAmqp(); virtual void connectFailed(const std::string& msg); diff --git a/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp b/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp index d2accddcd0..bfb20118b5 100644 --- a/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp +++ b/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp @@ -30,23 +30,12 @@ void AcceptTracker::State::accept() unaccepted.clear(); } -SequenceSet AcceptTracker::State::accept(qpid::framing::SequenceNumber id, bool cumulative) +void AcceptTracker::State::accept(qpid::framing::SequenceNumber id) { - SequenceSet accepting; - if (cumulative) { - for (SequenceSet::iterator i = unaccepted.begin(); i != unaccepted.end() && *i <= id; ++i) { - accepting.add(*i); - } - unconfirmed.add(accepting); - unaccepted.remove(accepting); - } else { - if (unaccepted.contains(id)) { - unaccepted.remove(id); - unconfirmed.add(id); - accepting.add(id); - } + if (unaccepted.contains(id)) { + unaccepted.remove(id); + unconfirmed.add(id); } - return accepting; } void AcceptTracker::State::release() @@ -70,18 +59,6 @@ void AcceptTracker::delivered(const std::string& destination, const qpid::framin destinationState[destination].unaccepted.add(id); } -namespace -{ -const size_t FLUSH_FREQUENCY = 1024; -} - -void AcceptTracker::addToPending(qpid::client::AsyncSession& session, const Record& record) -{ - pending.push_back(record); - if (pending.size() % FLUSH_FREQUENCY == 0) session.flush(); -} - - void AcceptTracker::accept(qpid::client::AsyncSession& session) { for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { @@ -90,19 +67,20 @@ void AcceptTracker::accept(qpid::client::AsyncSession& session) Record record; record.status = session.messageAccept(aggregateState.unaccepted); record.accepted = aggregateState.unaccepted; - addToPending(session, record); + pending.push_back(record); aggregateState.accept(); } -void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session, bool cumulative) +void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session) { for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { - i->second.accept(id, cumulative); + i->second.accept(id); } Record record; - record.accepted = aggregateState.accept(id, cumulative); + record.accepted.add(id); record.status = session.messageAccept(record.accepted); - addToPending(session, record); + pending.push_back(record); + aggregateState.accept(id); } void AcceptTracker::release(qpid::client::AsyncSession& session) diff --git a/cpp/src/qpid/client/amqp0_10/AcceptTracker.h b/cpp/src/qpid/client/amqp0_10/AcceptTracker.h index 85209c3b87..87890e41cc 100644 --- a/cpp/src/qpid/client/amqp0_10/AcceptTracker.h +++ b/cpp/src/qpid/client/amqp0_10/AcceptTracker.h @@ -42,7 +42,7 @@ class AcceptTracker public: void delivered(const std::string& destination, const qpid::framing::SequenceNumber& id); void accept(qpid::client::AsyncSession&); - void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&, bool cumulative); + void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&); void release(qpid::client::AsyncSession&); uint32_t acceptsPending(); uint32_t acceptsPending(const std::string& destination); @@ -62,7 +62,7 @@ class AcceptTracker qpid::framing::SequenceSet unconfirmed; void accept(); - qpid::framing::SequenceSet accept(qpid::framing::SequenceNumber, bool cumulative); + void accept(qpid::framing::SequenceNumber); void release(); uint32_t acceptsPending(); void completed(qpid::framing::SequenceSet&); @@ -79,7 +79,6 @@ class AcceptTracker StateMap destinationState; Records pending; - void addToPending(qpid::client::AsyncSession&, const Record&); void checkPending(); void completed(qpid::framing::SequenceSet&); }; diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index 16e5fde075..f1295a3b66 100644 --- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -129,10 +129,6 @@ const std::string HEADERS_EXCHANGE("headers"); const std::string XML_EXCHANGE("xml"); const std::string WILDCARD_ANY("#"); -//exchange prefixes: -const std::string PREFIX_AMQ("amq."); -const std::string PREFIX_QPID("qpid."); - const Verifier verifier; } @@ -203,7 +199,6 @@ class Exchange : protected Node void checkCreate(qpid::client::AsyncSession&, CheckMode); void checkAssert(qpid::client::AsyncSession&, CheckMode); void checkDelete(qpid::client::AsyncSession&, CheckMode); - bool isReservedName(); protected: const std::string specifiedType; @@ -238,8 +233,6 @@ class Subscription : public Exchange, public MessageSource const bool reliable; const bool durable; const std::string actualType; - const bool exclusiveQueue; - const bool exclusiveSubscription; FieldTable queueOptions; FieldTable subscriptionOptions; Bindings bindings; @@ -314,7 +307,6 @@ struct Opt Opt& operator/(const std::string& name); operator bool() const; std::string str() const; - bool asBool(bool defaultValue) const; const Variant::List& asList() const; void collect(qpid::framing::FieldTable& args) const; @@ -346,12 +338,6 @@ Opt::operator bool() const return value && !value->isVoid() && value->asBool(); } -bool Opt::asBool(bool defaultValue) const -{ - if (value) return value->asBool(); - else return defaultValue; -} - std::string Opt::str() const { if (value) return value->asString(); @@ -495,7 +481,7 @@ std::string Subscription::getSubscriptionName(const std::string& base, const std if (name.empty()) { return (boost::format("%1%_%2%") % base % Uuid(true).str()).str(); } else { - return name; + return (boost::format("%1%_%2%") % base % name).str(); } } @@ -504,9 +490,7 @@ Subscription::Subscription(const Address& address, const std::string& type) queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())), reliable(AddressResolution::is_reliable(address)), durable(Opt(address)/LINK/DURABLE), - actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type), - exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)), - exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue)) + actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type) { (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions); (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions); @@ -566,7 +550,7 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str checkAssert(session, FOR_RECEIVER); //create subscription queue: - session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue, + session.queueDeclare(arg::queue=queue, arg::exclusive=true, arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions); //'default' binding: bindings.bind(session); @@ -575,15 +559,15 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str linkBindings.bind(session); //subscribe to subscription queue: AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE; - session.messageSubscribe(arg::queue=queue, arg::destination=destination, - arg::exclusive=exclusiveSubscription, arg::acceptMode=accept, arg::arguments=subscriptionOptions); + session.messageSubscribe(arg::queue=queue, arg::destination=destination, + arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions); } void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination) { linkBindings.unbind(session); session.messageCancel(destination); - if (exclusiveQueue) session.queueDelete(arg::queue=queue, arg::ifUnused=true); + session.queueDelete(arg::queue=queue); checkDelete(session, FOR_RECEIVER); } @@ -777,32 +761,18 @@ Exchange::Exchange(const Address& a) : Node(a), linkBindings.setDefaultExchange(name); } -bool Exchange::isReservedName() -{ - return name.find(PREFIX_AMQ) != std::string::npos || name.find(PREFIX_QPID) != std::string::npos; -} - void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode) { if (enabled(createPolicy, mode)) { try { - if (isReservedName()) { - try { - sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true); - } catch (const qpid::framing::NotFoundException& /*e*/) { - throw ResolutionError((boost::format("Cannot create exchange %1%; names beginning with \"amq.\" or \"qpid.\" are reserved.") % name).str()); - } - - } else { - std::string type = specifiedType; - if (type.empty()) type = TOPIC_EXCHANGE; - session.exchangeDeclare(arg::exchange=name, - arg::type=type, - arg::durable=durable, - arg::autoDelete=autoDelete, - arg::alternateExchange=alternateExchange, - arg::arguments=arguments); - } + std::string type = specifiedType; + if (type.empty()) type = TOPIC_EXCHANGE; + session.exchangeDeclare(arg::exchange=name, + arg::type=type, + arg::durable=durable, + arg::autoDelete=autoDelete, + arg::alternateExchange=alternateExchange, + arg::arguments=arguments); nodeBindings.bind(session); session.sync(); } catch (const qpid::framing::NotAllowedException& e) { @@ -852,7 +822,7 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) FieldTable::ValuePtr v = result.getArguments().get(i->first); if (!v) { throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str()); - } else if (*i->second != *v) { + } else if (i->second != v) { throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") % i->first % name % *(i->second) % *v).str()); } diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index cc6e9b9ab2..5a545c1f6a 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -20,6 +20,7 @@ */ #include "ConnectionImpl.h" #include "SessionImpl.h" +#include "SimpleUrlParser.h" #include "qpid/messaging/exceptions.h" #include "qpid/messaging/Session.h" #include "qpid/messaging/PrivateImplRef.h" @@ -38,18 +39,26 @@ using qpid::types::Variant; using qpid::types::VAR_LIST; using qpid::framing::Uuid; -namespace { -void merge(const std::string& value, std::vector<std::string>& list) { - if (std::find(list.begin(), list.end(), value) == list.end()) - list.push_back(value); +void convert(const Variant::List& from, std::vector<std::string>& to) +{ + for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) { + to.push_back(i->asString()); + } } -void merge(const Variant::List& from, std::vector<std::string>& to) +template <class T> bool setIfFound(const Variant::Map& map, const std::string& key, T& value) { - for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) - merge(i->asString(), to); + Variant::Map::const_iterator i = map.find(key); + if (i != map.end()) { + value = (T) i->second; + QPID_LOG(debug, "option " << key << " specified as " << i->second); + return true; + } else { + return false; + } } +namespace { std::string asString(const std::vector<std::string>& v) { std::stringstream os; os << "["; @@ -62,8 +71,49 @@ std::string asString(const std::vector<std::string>& v) { } } +template <> bool setIfFound< std::vector<std::string> >(const Variant::Map& map, + const std::string& key, + std::vector<std::string>& value) +{ + Variant::Map::const_iterator i = map.find(key); + if (i != map.end()) { + value.clear(); + if (i->second.getType() == VAR_LIST) { + convert(i->second.asList(), value); + } else { + value.push_back(i->second.asString()); + } + QPID_LOG(debug, "option " << key << " specified as " << asString(value)); + return true; + } else { + return false; + } +} + +void convert(const Variant::Map& from, ConnectionSettings& to) +{ + setIfFound(from, "username", to.username); + setIfFound(from, "password", to.password); + setIfFound(from, "sasl-mechanism", to.mechanism); + setIfFound(from, "sasl-service", to.service); + setIfFound(from, "sasl-min-ssf", to.minSsf); + setIfFound(from, "sasl-max-ssf", to.maxSsf); + + setIfFound(from, "heartbeat", to.heartbeat); + setIfFound(from, "tcp-nodelay", to.tcpNoDelay); + + setIfFound(from, "locale", to.locale); + setIfFound(from, "max-channels", to.maxChannels); + setIfFound(from, "max-frame-size", to.maxFrameSize); + setIfFound(from, "bounds", to.bounds); + + setIfFound(from, "transport", to.protocol); + + setIfFound(from, "ssl-cert-name", to.sslCertName); +} + ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) : - replaceUrls(false), reconnect(false), timeout(-1), limit(-1), + reconnect(false), timeout(-1), limit(-1), minReconnectInterval(3), maxReconnectInterval(60), retries(0), reconnectOnLimitExceeded(true) { @@ -74,69 +124,27 @@ ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& optio void ConnectionImpl::setOptions(const Variant::Map& options) { - for (Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) { - setOption(i->first, i->second); + sys::Mutex::ScopedLock l(lock); + convert(options, settings); + setIfFound(options, "reconnect", reconnect); + setIfFound(options, "reconnect-timeout", timeout); + setIfFound(options, "reconnect-limit", limit); + int64_t reconnectInterval; + if (setIfFound(options, "reconnect-interval", reconnectInterval)) { + minReconnectInterval = maxReconnectInterval = reconnectInterval; + } else { + setIfFound(options, "reconnect-interval-min", minReconnectInterval); + setIfFound(options, "reconnect-interval-max", maxReconnectInterval); } + setIfFound(options, "reconnect-urls", urls); + setIfFound(options, "x-reconnect-on-limit-exceeded", reconnectOnLimitExceeded); } void ConnectionImpl::setOption(const std::string& name, const Variant& value) { - sys::Mutex::ScopedLock l(lock); - if (name == "reconnect") { - reconnect = value; - } else if (name == "reconnect-timeout" || name == "reconnect_timeout") { - timeout = value; - } else if (name == "reconnect-limit" || name == "reconnect_limit") { - limit = value; - } else if (name == "reconnect-interval" || name == "reconnect_interval") { - maxReconnectInterval = minReconnectInterval = value; - } else if (name == "reconnect-interval-min" || name == "reconnect_interval_min") { - minReconnectInterval = value; - } else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") { - maxReconnectInterval = value; - } else if (name == "reconnect-urls-replace" || name == "reconnect_urls_replace") { - replaceUrls = value.asBool(); - } else if (name == "reconnect-urls" || name == "reconnect_urls") { - if (replaceUrls) urls.clear(); - if (value.getType() == VAR_LIST) { - merge(value.asList(), urls); - } else { - merge(value.asString(), urls); - } - } else if (name == "username") { - settings.username = value.asString(); - } else if (name == "password") { - settings.password = value.asString(); - } else if (name == "sasl-mechanism" || name == "sasl_mechanism" || - name == "sasl-mechanisms" || name == "sasl_mechanisms") { - settings.mechanism = value.asString(); - } else if (name == "sasl-service" || name == "sasl_service") { - settings.service = value.asString(); - } else if (name == "sasl-min-ssf" || name == "sasl_min_ssf") { - settings.minSsf = value; - } else if (name == "sasl-max-ssf" || name == "sasl_max_ssf") { - settings.maxSsf = value; - } else if (name == "heartbeat") { - settings.heartbeat = value; - } else if (name == "tcp-nodelay" || name == "tcp_nodelay") { - settings.tcpNoDelay = value; - } else if (name == "locale") { - settings.locale = value.asString(); - } else if (name == "max-channels" || name == "max_channels") { - settings.maxChannels = value; - } else if (name == "max-frame-size" || name == "max_frame_size") { - settings.maxFrameSize = value; - } else if (name == "bounds") { - settings.bounds = value; - } else if (name == "transport") { - settings.protocol = value.asString(); - } else if (name == "ssl-cert-name" || name == "ssl_cert_name") { - settings.sslCertName = value.asString(); - } else if (name == "x-reconnect-on-limit-exceeded" || name == "x_reconnect_on_limit_exceeded") { - reconnectOnLimitExceeded = value; - } else { - throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised")); - } + Variant::Map options; + options[name] = value; + setOptions(options); } @@ -206,7 +214,7 @@ qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const st sessions[name] = impl; break; } catch (const qpid::TransportFailure&) { - reopen(); + open(); } catch (const qpid::SessionException& e) { throw qpid::messaging::SessionError(e.what()); } catch (const std::exception& e) { @@ -227,15 +235,6 @@ void ConnectionImpl::open() catch (const qpid::Exception& e) { throw messaging::ConnectionError(e.what()); } } -void ConnectionImpl::reopen() -{ - if (!reconnect) { - throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)"); - } - open(); -} - - bool expired(const qpid::sys::AbsTime& start, int64_t timeout) { if (timeout == 0) return true; @@ -263,9 +262,14 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started) } void ConnectionImpl::mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&) { - for (std::vector<Url>::const_iterator i = more.begin(); i != more.end(); ++i) - merge(i->str(), urls); - QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls)); + if (more.size()) { + for (size_t i = 0; i < more.size(); ++i) { + if (std::find(urls.begin(), urls.end(), more[i].str()) == urls.end()) { + urls.push_back(more[i].str()); + } + } + QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls)); + } } bool ConnectionImpl::tryConnect() @@ -274,14 +278,21 @@ bool ConnectionImpl::tryConnect() for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) { try { QPID_LOG(info, "Trying to connect to " << *i << "..."); - Url url(*i); - if (url.getUser().size()) settings.username = url.getUser(); - if (url.getPass().size()) settings.password = url.getPass(); - connection.open(url, settings); + //TODO: when url support is more complete can avoid this test here + if (i->find("amqp:") == 0) { + Url url(*i); + connection.open(url, settings); + } else { + SimpleUrlParser::parse(*i, settings); + connection.open(settings); + } QPID_LOG(info, "Connected to " << *i); mergeUrls(connection.getInitialBrokers(), l); return resetSessions(l); - } catch (const qpid::TransportFailure& e) { + } catch (const qpid::ConnectionException& e) { + //TODO: need to fix timeout on + //qpid::client::Connection::open() so that it throws + //TransportFailure rather than a ConnectionException QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what()); } } diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h index 1b58cbbe3e..09f2038312 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -43,7 +43,6 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl public: ConnectionImpl(const std::string& url, const qpid::types::Variant::Map& options); void open(); - void reopen(); bool isOpen() const; void close(); qpid::messaging::Session newSession(bool transactional, const std::string& name); @@ -60,7 +59,6 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl qpid::sys::Semaphore semaphore;//used to coordinate reconnection Sessions sessions; qpid::client::Connection connection; - bool replaceUrls; // Replace rather than merging with reconnect-urls std::vector<std::string> urls; qpid::client::ConnectionSettings settings; bool reconnect; diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index 3badaf40ba..71e89bdba1 100644 --- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -144,10 +144,10 @@ void IncomingMessages::accept() acceptTracker.accept(session); } -void IncomingMessages::accept(qpid::framing::SequenceNumber id, bool cumulative) +void IncomingMessages::accept(qpid::framing::SequenceNumber id) { sys::Mutex::ScopedLock l(lock); - acceptTracker.accept(id, session, cumulative); + acceptTracker.accept(id, session); } @@ -301,7 +301,6 @@ const std::string SUBJECT("qpid.subject"); const std::string X_APP_ID("x-amqp-0-10.app-id"); const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key"); const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding"); -const std::string X_TIMESTAMP("x-amqp-0-10.timestamp"); } void populateHeaders(qpid::messaging::Message& message, @@ -335,13 +334,10 @@ void populateHeaders(qpid::messaging::Message& message, if (messageProperties->hasContentEncoding()) { message.getProperties()[X_CONTENT_ENCODING] = messageProperties->getContentEncoding(); } - // routing-key, timestamp, others? + // routing-key, others? if (deliveryProperties && deliveryProperties->hasRoutingKey()) { message.getProperties()[X_ROUTING_KEY] = deliveryProperties->getRoutingKey(); } - if (deliveryProperties && deliveryProperties->hasTimestamp()) { - message.getProperties()[X_TIMESTAMP] = deliveryProperties->getTimestamp(); - } } } diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h index 9053b70312..f6a291bc68 100644 --- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h @@ -72,7 +72,7 @@ class IncomingMessages bool get(Handler& handler, qpid::sys::Duration timeout); bool getNextDestination(std::string& destination, qpid::sys::Duration timeout); void accept(); - void accept(qpid::framing::SequenceNumber id, bool cumulative); + void accept(qpid::framing::SequenceNumber id); void releaseAll(); void releasePending(const std::string& destination); diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp index d93416da75..82358961c8 100644 --- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp +++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp @@ -59,9 +59,7 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from) message.getMessageProperties().setReplyTo(AddressResolution::convert(address)); } translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders()); - if (from.getTtl().getMilliseconds()) { - message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds()); - } + message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds()); if (from.getDurable()) { message.getDeliveryProperties().setDeliveryMode(DELIVERY_MODE_PERSISTENT); } diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index f2f0f1a9e5..e1b75ec0cf 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -135,7 +135,6 @@ void SenderImpl::sendUnreliable(const qpid::messaging::Message& m) void SenderImpl::replay(const sys::Mutex::ScopedLock&) { for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) { - i->message.setRedelivered(true); sink->send(session, name, *i); } } @@ -148,7 +147,7 @@ uint32_t SenderImpl::checkPendingSends(bool flush) { uint32_t SenderImpl::checkPendingSends(bool flush, const sys::Mutex::ScopedLock&) { if (flush) { - session.flush(); + session.flush(); flushed = true; } else { flushed = false; diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index be5eab1f2b..75a71997fd 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -60,14 +60,12 @@ SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactio void SessionImpl::checkError() { - ScopedLock l(lock); qpid::client::SessionBase_0_10Access s(session); s.get()->assertOpen(); } bool SessionImpl::hasError() { - ScopedLock l(lock); qpid::client::SessionBase_0_10Access s(session); return s.get()->hasError(); } @@ -114,14 +112,13 @@ void SessionImpl::release(qpid::messaging::Message& m) execute1<Release>(m); } -void SessionImpl::acknowledge(qpid::messaging::Message& m, bool cumulative) +void SessionImpl::acknowledge(qpid::messaging::Message& m) { //Should probably throw an exception on failure here, or indicate //it through a return type at least. Failure means that the //message may be redelivered; i.e. the application cannot delete //any state necessary for preventing reprocessing of the message - Acknowledge2 ack(*this, m, cumulative); - execute(ack); + execute1<Acknowledge1>(m); } void SessionImpl::close() @@ -131,29 +128,27 @@ void SessionImpl::close() senders.clear(); receivers.clear(); } else { - Senders sCopy; - Receivers rCopy; - { - ScopedLock l(lock); - senders.swap(sCopy); - receivers.swap(rCopy); - } - for (Senders::iterator i = sCopy.begin(); i != sCopy.end(); ++i) - { - // outside the lock, will call senderCancelled - i->second.close(); + while (true) { + Sender s; + { + ScopedLock l(lock); + if (senders.empty()) break; + s = senders.begin()->second; + } + s.close(); // outside the lock, will call senderCancelled } - for (Receivers::iterator i = rCopy.begin(); i != rCopy.end(); ++i) - { - // outside the lock, will call receiverCancelled - i->second.close(); + while (true) { + Receiver r; + { + ScopedLock l(lock); + if (receivers.empty()) break; + r = receivers.begin()->second; + } + r.close(); // outside the lock, will call receiverCancelled } } connection->closed(*this); - if (!hasError()) { - ScopedLock l(lock); - session.close(); - } + if (!hasError()) session.close(); } template <class T, class S> boost::intrusive_ptr<S> getImplPtr(T& t) @@ -436,11 +431,8 @@ uint32_t SessionImpl::getUnsettledAcksImpl(const std::string* destination) void SessionImpl::syncImpl(bool block) { - { - ScopedLock l(lock); - if (block) session.sync(); - else session.flush(); - } + if (block) session.sync(); + else session.flush(); //cleanup unconfirmed accept records: incoming.pendingAccept(); } @@ -475,10 +467,10 @@ void SessionImpl::acknowledgeImpl() if (!transactional) incoming.accept(); } -void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m, bool cumulative) +void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m) { ScopedLock l(lock); - if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId(), cumulative); + if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId()); } void SessionImpl::rejectImpl(qpid::messaging::Message& m) @@ -517,7 +509,7 @@ void SessionImpl::senderCancelled(const std::string& name) void SessionImpl::reconnect() { - connection->reopen(); + connection->open(); } bool SessionImpl::backoff() diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h index c7dea77d18..2a2aa47df6 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -63,7 +63,7 @@ class SessionImpl : public qpid::messaging::SessionImpl void acknowledge(bool sync); void reject(qpid::messaging::Message&); void release(qpid::messaging::Message&); - void acknowledge(qpid::messaging::Message& msg, bool cumulative); + void acknowledge(qpid::messaging::Message& msg); void close(); void sync(bool block); qpid::messaging::Sender createSender(const qpid::messaging::Address& address); @@ -139,7 +139,7 @@ class SessionImpl : public qpid::messaging::SessionImpl void commitImpl(); void rollbackImpl(); void acknowledgeImpl(); - void acknowledgeImpl(qpid::messaging::Message&, bool cumulative); + void acknowledgeImpl(qpid::messaging::Message&); void rejectImpl(qpid::messaging::Message&); void releaseImpl(qpid::messaging::Message&); void closeImpl(); @@ -204,13 +204,12 @@ class SessionImpl : public qpid::messaging::SessionImpl void operator()() { impl.releaseImpl(message); } }; - struct Acknowledge2 : Command + struct Acknowledge1 : Command { qpid::messaging::Message& message; - bool cumulative; - Acknowledge2(SessionImpl& i, qpid::messaging::Message& m, bool c) : Command(i), message(m), cumulative(c) {} - void operator()() { impl.acknowledgeImpl(message, cumulative); } + Acknowledge1(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {} + void operator()() { impl.acknowledgeImpl(message); } }; struct CreateSender; diff --git a/cpp/src/qpid/client/amqp0_10/SimpleUrlParser.cpp b/cpp/src/qpid/client/amqp0_10/SimpleUrlParser.cpp new file mode 100644 index 0000000000..327c2274a6 --- /dev/null +++ b/cpp/src/qpid/client/amqp0_10/SimpleUrlParser.cpp @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SimpleUrlParser.h" +#include "qpid/client/ConnectionSettings.h" +#include "qpid/Exception.h" +#include <boost/lexical_cast.hpp> + +namespace qpid { +namespace client { +namespace amqp0_10 { + +bool split(const std::string& in, char delim, std::pair<std::string, std::string>& result) +{ + std::string::size_type i = in.find(delim); + if (i != std::string::npos) { + result.first = in.substr(0, i); + if (i+1 < in.size()) { + result.second = in.substr(i+1); + } + return true; + } else { + return false; + } +} + +void parseUsernameAndPassword(const std::string& in, qpid::client::ConnectionSettings& result) +{ + std::pair<std::string, std::string> parts; + if (!split(in, '/', parts)) { + result.username = in; + } else { + result.username = parts.first; + result.password = parts.second; + } +} + +void parseHostAndPort(const std::string& in, qpid::client::ConnectionSettings& result) +{ + std::pair<std::string, std::string> parts; + if (!split(in, ':', parts)) { + result.host = in; + } else { + result.host = parts.first; + if (parts.second.size()) { + result.port = boost::lexical_cast<uint16_t>(parts.second); + } + } +} + +void SimpleUrlParser::parse(const std::string& url, qpid::client::ConnectionSettings& result) +{ + std::pair<std::string, std::string> parts; + if (!split(url, '@', parts)) { + parseHostAndPort(url, result); + } else { + parseUsernameAndPassword(parts.first, result); + parseHostAndPort(parts.second, result); + } +} + +}}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/SimpleUrlParser.h b/cpp/src/qpid/client/amqp0_10/SimpleUrlParser.h new file mode 100644 index 0000000000..24f90ca9d6 --- /dev/null +++ b/cpp/src/qpid/client/amqp0_10/SimpleUrlParser.h @@ -0,0 +1,42 @@ +#ifndef QPID_CLIENT_AMQP0_10_SIMPLEURLPARSER_H +#define QPID_CLIENT_AMQP0_10_SIMPLEURLPARSER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> + +namespace qpid { +namespace client { + +struct ConnectionSettings; + +namespace amqp0_10 { + +/** + * Parses a simple url of the form user/password@hostname:port + */ +struct SimpleUrlParser +{ + static void parse(const std::string& url, qpid::client::ConnectionSettings& result); +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_SIMPLEURLPARSER_H*/ diff --git a/cpp/src/qpid/client/windows/SaslFactory.cpp b/cpp/src/qpid/client/windows/SaslFactory.cpp index 53d825771b..63c7fa3d1f 100644 --- a/cpp/src/qpid/client/windows/SaslFactory.cpp +++ b/cpp/src/qpid/client/windows/SaslFactory.cpp @@ -71,7 +71,7 @@ class WindowsSasl : public Sasl public: WindowsSasl( const std::string &, const std::string &, const std::string &, const std::string &, int, int ); ~WindowsSasl(); - bool start(const std::string& mechanisms, std::string& response, const SecuritySettings* externalSettings); + std::string start(const std::string& mechanisms, const SecuritySettings* externalSettings); std::string step(const std::string& challenge); std::string getMechanism(); std::string getUserId(); @@ -121,8 +121,8 @@ WindowsSasl::~WindowsSasl() { } -bool WindowsSasl::start(const std::string& mechanisms, std::string& response, - const SecuritySettings* /*externalSettings*/) +std::string WindowsSasl::start(const std::string& mechanisms, + const SecuritySettings* /*externalSettings*/) { QPID_LOG(debug, "WindowsSasl::start(" << mechanisms << ")"); @@ -142,18 +142,18 @@ bool WindowsSasl::start(const std::string& mechanisms, std::string& response, if (!haveAnon && !havePlain) throw InternalErrorException(QPID_MSG("Sasl error: no common mechanism")); + std::string resp = ""; if (havePlain) { mechanism = PLAIN; - response = ((char)0) + settings.username + ((char)0) + settings.password; + resp = ((char)0) + settings.username + ((char)0) + settings.password; } else { mechanism = ANONYMOUS; - response = ""; } - return true; + return resp; } -std::string WindowsSasl::step(const std::string& /*challenge*/) +std::string WindowsSasl::step(const std::string& challenge) { // Shouldn't get this for PLAIN... throw InternalErrorException(QPID_MSG("Sasl step error")); @@ -169,7 +169,7 @@ std::string WindowsSasl::getUserId() return std::string(); // TODO - when GSSAPI is supported, return userId for connection. } -std::auto_ptr<SecurityLayer> WindowsSasl::getSecurityLayer(uint16_t /*maxFrameSize*/) +std::auto_ptr<SecurityLayer> WindowsSasl::getSecurityLayer(uint16_t maxFrameSize) { return std::auto_ptr<SecurityLayer>(0); } diff --git a/cpp/src/qpid/client/windows/SslConnector.cpp b/cpp/src/qpid/client/windows/SslConnector.cpp index 785c817928..a33713e1a8 100644 --- a/cpp/src/qpid/client/windows/SslConnector.cpp +++ b/cpp/src/qpid/client/windows/SslConnector.cpp @@ -77,7 +77,7 @@ public: framing::ProtocolVersion pVersion, const ConnectionSettings&, ConnectionImpl*); - virtual void connect(const std::string& host, const std::string& port); + virtual void connect(const std::string& host, int port); virtual void connected(const Socket&); unsigned int getSSF(); }; @@ -153,7 +153,7 @@ SslConnector::~SslConnector() // Will this get reach via virtual method via boost::bind???? -void SslConnector::connect(const std::string& host, const std::string& port) { +void SslConnector::connect(const std::string& host, int port) { brokerHost = host; TCPConnector::connect(host, port); } |
