diff options
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/Bounds.cpp | 55 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Bounds.h | 48 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Connection.cpp | 67 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Connection.h | 25 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.cpp | 27 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.h | 35 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 51 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.h | 31 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionSettings.cpp | 81 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionSettings.h | 76 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 56 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Connector.h | 17 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 46 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionImpl.h | 5 |
14 files changed, 456 insertions, 164 deletions
diff --git a/cpp/src/qpid/client/Bounds.cpp b/cpp/src/qpid/client/Bounds.cpp new file mode 100644 index 0000000000..1df21db941 --- /dev/null +++ b/cpp/src/qpid/client/Bounds.cpp @@ -0,0 +1,55 @@ +#include "Bounds.h" + +#include "qpid/log/Statement.h" + +namespace qpid { +namespace client { + +using sys::Monitor; + +Bounds::Bounds(size_t maxSize) : max(maxSize), current(0) {} + +bool Bounds::expand(size_t sizeRequired, bool block) +{ + if (max) { + Monitor::ScopedLock l(lock); + current += sizeRequired; + if (block) { + while (current > max) { + QPID_LOG(debug, "Waiting for bounds: " << *this); + lock.wait(); + } + QPID_LOG(debug, "Bounds ok: " << *this); + } + return current <= max; + } else { + return true; + } +} + +void Bounds::reduce(size_t size) +{ + if (!max || size == 0) return; + Monitor::ScopedLock l(lock); + if (current == 0) return; + bool needNotify = current > max; + current -= std::min(size, current); + if (needNotify && current < max) { + //todo: notify one at a time, but ensure that all threads are + //eventually notified + lock.notifyAll(); + } +} + +size_t Bounds::getCurrentSize() +{ + Monitor::ScopedLock l(lock); + return current; +} + +std::ostream& operator<<(std::ostream& out, const Bounds& bounds) { + out << "current=" << bounds.current << ", max=" << bounds.max << " [" << &bounds << "]"; + return out; +} + +}} // namespace qpid::client diff --git a/cpp/src/qpid/client/Bounds.h b/cpp/src/qpid/client/Bounds.h new file mode 100644 index 0000000000..db18becce3 --- /dev/null +++ b/cpp/src/qpid/client/Bounds.h @@ -0,0 +1,48 @@ +#ifndef QPID_CLIENT_BOUNDSCHECKING_H +#define QPID_CLIENT_BOUNDSCHECKING_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/Monitor.h" + +namespace qpid{ +namespace client{ + +class Bounds +{ + public: + Bounds(size_t maxSize); + bool expand(size_t, bool block); + void reduce(size_t); + size_t getCurrentSize(); + + private: + friend std::ostream& operator<<(std::ostream&, const Bounds&); + sys::Monitor lock; + const size_t max; + size_t current; +}; + +std::ostream& operator<<(std::ostream&, const Bounds&); + + +}} + +#endif diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index 25d1c510c8..c11f155afb 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -18,11 +18,8 @@ * under the License. * */ -#include <algorithm> -#include <boost/format.hpp> -#include <boost/bind.hpp> - #include "Connection.h" +#include "ConnectionSettings.h" #include "Channel.h" #include "Message.h" #include "SessionImpl.h" @@ -30,9 +27,13 @@ #include "qpid/log/Options.h" #include "qpid/log/Statement.h" #include "qpid/shared_ptr.h" + +#include <algorithm> #include <iostream> #include <sstream> #include <functional> +#include <boost/format.hpp> +#include <boost/bind.hpp> using namespace qpid::framing; using namespace qpid::sys; @@ -41,41 +42,49 @@ using namespace qpid::sys; namespace qpid { namespace client { -Connection::Connection(bool _debug, uint32_t _max_frame_size, framing::ProtocolVersion _version) : - channelIdCounter(0), version(_version), - max_frame_size(_max_frame_size), - isOpen(false), - impl(new ConnectionImpl( - shared_ptr<Connector>(new Connector(_version, _debug)))) -{} - -Connection::Connection(shared_ptr<Connector> c) : - channelIdCounter(0), version(framing::highestProtocolVersion), - max_frame_size(65535), - isOpen(false), - impl(new ConnectionImpl(c)) -{} +Connection::Connection(framing::ProtocolVersion _version) : + channelIdCounter(0), version(_version) {} Connection::~Connection(){ } void Connection::open( const std::string& host, int port, - const std::string& uid, const std::string& pwd, const std::string& vhost) + const std::string& uid, const std::string& pwd, + const std::string& vhost, + uint16_t maxFrameSize) +{ + ConnectionSettings settings; + settings.host = host; + settings.port = port; + settings.username = uid; + settings.password = pwd; + settings.virtualhost = vhost; + settings.maxFrameSize = maxFrameSize; + open(settings); +} + +void Connection::open(const ConnectionSettings& settings) { - if (isOpen) - throw Exception(QPID_MSG("Channel object is already open")); + if (impl) + throw Exception(QPID_MSG("Connection::open() was already called")); - impl->open(host, port, uid, pwd, vhost); - isOpen = true; + impl = boost::shared_ptr<ConnectionImpl>(new ConnectionImpl(version, settings)); + max_frame_size = impl->getNegotiatedSettings().maxFrameSize; } -void Connection::openChannel(Channel& channel) { +void Connection::openChannel(Channel& channel) +{ + if (!impl) + throw Exception(QPID_MSG("Connection has not yet been opened")); channel.open(newSession(ASYNC)); } Session Connection::newSession(SynchronousMode sync, - uint32_t detachedLifetime) + uint32_t detachedLifetime) { + if (!impl) + throw Exception(QPID_MSG("Connection has not yet been opened")); + shared_ptr<SessionImpl> core( new SessionImpl(impl, ++channelIdCounter, max_frame_size)); core->setSync(sync); @@ -85,13 +94,19 @@ Session Connection::newSession(SynchronousMode sync, } void Connection::resume(Session& session) { + if (!impl) + throw Exception(QPID_MSG("Connection has not yet been opened")); + session.impl->setChannel(++channelIdCounter); impl->addSession(session.impl); session.impl->resume(impl); } void Connection::close() { - impl->close(); + if (impl) { + impl->close(); + impl.reset(); + } } }} // namespace qpid::client diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index 0ddd383381..417739fd1d 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -39,6 +39,7 @@ namespace qpid { */ namespace client { +class ConnectionSettings; /** * \defgroup clientapi Application API for an AMQP client. */ @@ -54,9 +55,7 @@ class Connection { framing::ChannelId channelIdCounter; framing::ProtocolVersion version; - const uint32_t max_frame_size; - bool isOpen; - bool debug; + uint16_t max_frame_size; protected: boost::shared_ptr<ConnectionImpl> impl; @@ -67,17 +66,8 @@ class Connection * connection. * * @param _version the version of the protocol to connect with. - * - * @param debug turns on tracing for the connection - * (i.e. prints details of the frames sent and received to std - * out). Optional. Defaults to false. - * - * @param max_frame_size the maximum frame size that the - * client will accept. Optional. Defaults to 65535. */ - Connection(bool debug = false, uint32_t max_frame_size = 65535, - framing::ProtocolVersion=framing::highestProtocolVersion); - Connection(boost::shared_ptr<Connector>); + Connection(framing::ProtocolVersion=framing::highestProtocolVersion); ~Connection(); /** @@ -100,7 +90,14 @@ class Connection void open(const std::string& host, int port = 5672, const std::string& uid = "guest", const std::string& pwd = "guest", - const std::string& virtualhost = "/"); + const std::string& virtualhost = "/", uint16_t maxFrameSize=65535); + + /** + * Opens a connection to a broker. + * + * @param the settings to use (host, port etc) @see ConnectionSettings + */ + void open(const ConnectionSettings& settings); /** * Close the connection. diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index df27942008..81d966d53f 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -20,9 +20,9 @@ */ #include "ConnectionHandler.h" + #include "qpid/log/Statement.h" #include "qpid/framing/amqp_framing.h" -#include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/all_method_bodies.h" #include "qpid/framing/ClientInvoker.h" #include "qpid/framing/reply_exceptions.h" @@ -42,17 +42,10 @@ const std::string INVALID_STATE_OPEN_OK("open-ok received in invalid state"); const std::string INVALID_STATE_CLOSE_OK("close-ok received in invalid state"); } -ConnectionHandler::ConnectionHandler() - : StateManager(NOT_STARTED), outHandler(*this), proxy(outHandler), errorCode(200) -{ - - mechanism = PLAIN; - locale = en_US; - heartbeat = 0; - maxChannels = 32767; - maxFrameSize = 65535; +ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, framing::ProtocolVersion& v) + : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler), errorCode(200), version(v) +{ insist = true; - version = framing::highestProtocolVersion; ESTABLISHED.insert(FAILED); ESTABLISHED.insert(CLOSED); @@ -141,7 +134,7 @@ void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& /* checkState(NOT_STARTED, INVALID_STATE_START); setState(NEGOTIATING); //TODO: verify that desired mechanism and locale are supported - string response = ((char)0) + uid + ((char)0) + pwd; + string response = ((char)0) + username + ((char)0) + password; proxy.startOk(properties, mechanism, response, locale); } @@ -150,14 +143,16 @@ void ConnectionHandler::secure(const std::string& /*challenge*/) throw NotImplementedException("Challenge-response cycle not yet implemented in client"); } -void ConnectionHandler::tune(uint16_t channelMax, uint16_t /*frameMax*/, uint16_t /*heartbeatMin*/, uint16_t /*heartbeatMax*/) +void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t maxFrameSizeProposed, + uint16_t /*heartbeatMin*/, uint16_t /*heartbeatMax*/) { checkState(NEGOTIATING, INVALID_STATE_TUNE); - //TODO: verify that desired heartbeat and max frame size are valid - maxChannels = channelMax; + maxChannels = std::min(maxChannels, maxChannelsProposed); + maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed); + //TODO: implement heartbeats and check desired value is in valid range proxy.tuneOk(maxChannels, maxFrameSize, heartbeat); setState(OPENING); - proxy.open(vhost, capabilities, insist); + proxy.open(virtualhost, capabilities, insist); } void ConnectionHandler::openOk(const framing::Array& /*knownHosts*/) diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h index d7ab97ce31..1cf0c905ed 100644 --- a/cpp/src/qpid/client/ConnectionHandler.h +++ b/cpp/src/qpid/client/ConnectionHandler.h @@ -22,9 +22,10 @@ #define _ConnectionHandler_ #include "ChainableFrameHandler.h" -#include "Connector.h" +#include "ConnectionSettings.h" #include "StateManager.h" #include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/AMQP_ClientOperations.h" #include "qpid/framing/AMQP_ServerProxy.h" #include "qpid/framing/Array.h" @@ -35,27 +36,11 @@ namespace qpid { namespace client { -struct ConnectionProperties -{ - std::string uid; - std::string pwd; - std::string vhost; - framing::FieldTable properties; - std::string mechanism; - std::string locale; - framing::Array capabilities; - uint16_t heartbeat; - uint16_t maxChannels; - uint64_t maxFrameSize; - bool insist; - framing::ProtocolVersion version; -}; - -class ConnectionHandler : private StateManager, - public ConnectionProperties, - public ChainableFrameHandler, - public framing::InputHandler, - private framing::AMQP_ClientOperations::ConnectionHandler +class ConnectionHandler : private StateManager, + public ConnectionSettings, + public ChainableFrameHandler, + public framing::InputHandler, + private framing::AMQP_ClientOperations::ConnectionHandler { typedef framing::AMQP_ClientOperations::ConnectionHandler ConnectionOperations; enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED}; @@ -73,6 +58,10 @@ class ConnectionHandler : private StateManager, framing::AMQP_ServerProxy::Connection proxy; uint16_t errorCode; std::string errorText; + bool insist; + framing::ProtocolVersion version; + framing::Array capabilities; + framing::FieldTable properties; void checkState(STATES s, const std::string& msg); @@ -96,7 +85,7 @@ public: typedef boost::function<void()> CloseListener; typedef boost::function<void(uint16_t, const std::string&)> ErrorListener; - ConnectionHandler(); + ConnectionHandler(const ConnectionSettings&, framing::ProtocolVersion&); void received(framing::AMQFrame& f) { incoming(f); } diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index ce95e43f58..643d42403d 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -18,13 +18,14 @@ * under the License. * */ +#include "ConnectionImpl.h" +#include "ConnectionSettings.h" +#include "SessionImpl.h" + #include "qpid/log/Statement.h" #include "qpid/framing/constants.h" #include "qpid/framing/reply_exceptions.h" -#include "ConnectionImpl.h" -#include "SessionImpl.h" - #include <boost/bind.hpp> #include <boost/format.hpp> @@ -34,24 +35,32 @@ using namespace qpid::sys; using namespace qpid::framing::connection;//for connection error codes -ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) - : connector(c), isClosed(false), isClosing(false) +ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings) + : Bounds(settings.maxFrameSize * settings.bounds), + handler(settings, v), + connector(v, settings, this), + version(v), + isClosed(false), + isClosing(false) { + QPID_LOG(debug, "ConnectionImpl created for " << version); handler.in = boost::bind(&ConnectionImpl::incoming, this, _1); - handler.out = boost::bind(&Connector::send, connector, _1); + handler.out = boost::bind(&Connector::send, boost::ref(connector), _1); handler.onClose = boost::bind(&ConnectionImpl::closed, this, NORMAL, std::string()); handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2); - connector->setInputHandler(&handler); - connector->setTimeoutHandler(this); - connector->setShutdownHandler(this); + connector.setInputHandler(&handler); + connector.setTimeoutHandler(this); + connector.setShutdownHandler(this); + + open(settings.host, settings.port); } ConnectionImpl::~ConnectionImpl() { // Important to close the connector first, to ensure the // connector thread does not call on us while the destructor // is running. - connector->close(); + connector.close(); } void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session) @@ -79,18 +88,11 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame) s->in(frame); } -void ConnectionImpl::open(const std::string& host, int port, - const std::string& uid, const std::string& pwd, - const std::string& vhost) +void ConnectionImpl::open(const std::string& host, int port) { - //TODO: better management of connection properties - handler.uid = uid; - handler.pwd = pwd; - handler.vhost = vhost; - QPID_LOG(info, "Connecting to " << host << ":" << port); - connector->connect(host, port); - connector->init(); + connector.connect(host, port); + connector.init(); handler.waitForOpen(); } @@ -102,7 +104,7 @@ void ConnectionImpl::idleIn() void ConnectionImpl::idleOut() { AMQFrame frame(in_place<AMQHeartbeatBody>()); - connector->send(frame); + connector.send(frame); } void ConnectionImpl::close() @@ -121,7 +123,7 @@ void ConnectionImpl::close() // so sessions can be updated outside the lock. ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const Mutex::ScopedLock&) { isClosed = true; - connector->close(); + connector.close(); SessionVector save; for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { boost::shared_ptr<SessionImpl> s = i->second.lock(); @@ -157,4 +159,9 @@ void ConnectionImpl::erase(uint16_t ch) { Mutex::ScopedLock l(lock); sessions.erase(ch); } + +const ConnectionSettings& ConnectionImpl::getNegotiatedSettings() +{ + return handler; +} diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index d0df9238f2..986b044b49 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -22,22 +22,26 @@ #ifndef _ConnectionImpl_ #define _ConnectionImpl_ -#include <map> -#include <boost/shared_ptr.hpp> -#include <boost/weak_ptr.hpp> +#include "Bounds.h" +#include "ConnectionHandler.h" +#include "Connector.h" #include "qpid/framing/FrameHandler.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/ShutdownHandler.h" #include "qpid/sys/TimeoutHandler.h" -#include "ConnectionHandler.h" -#include "Connector.h" + +#include <map> +#include <boost/shared_ptr.hpp> +#include <boost/weak_ptr.hpp> namespace qpid { namespace client { +class ConnectionSettings; class SessionImpl; -class ConnectionImpl : public framing::FrameHandler, +class ConnectionImpl : public Bounds, + public framing::FrameHandler, public sys::TimeoutHandler, public sys::ShutdownHandler @@ -47,7 +51,7 @@ class ConnectionImpl : public framing::FrameHandler, SessionMap sessions; ConnectionHandler handler; - boost::shared_ptr<Connector> connector; + Connector connector; framing::ProtocolVersion version; sys::Mutex lock; bool isClosed; @@ -55,6 +59,8 @@ class ConnectionImpl : public framing::FrameHandler, template <class F> void detachAll(const F&); + void open(const std::string& host, int port); + SessionVector closeInternal(const sys::Mutex::ScopedLock&); void incoming(framing::AMQFrame& frame); void closed(uint16_t, const std::string&); @@ -64,21 +70,16 @@ class ConnectionImpl : public framing::FrameHandler, bool setClosing(); public: - typedef boost::shared_ptr<ConnectionImpl> shared_ptr; - - ConnectionImpl(boost::shared_ptr<Connector> c); + ConnectionImpl(framing::ProtocolVersion version, const ConnectionSettings& settings); ~ConnectionImpl(); void addSession(const boost::shared_ptr<SessionImpl>&); - void open(const std::string& host, int port = 5672, - const std::string& uid = "guest", - const std::string& pwd = "guest", - const std::string& virtualhost = "/"); void close(); void handle(framing::AMQFrame& frame); void erase(uint16_t channel); - boost::shared_ptr<Connector> getConnector() { return connector; } + + const ConnectionSettings& getNegotiatedSettings(); }; }} diff --git a/cpp/src/qpid/client/ConnectionSettings.cpp b/cpp/src/qpid/client/ConnectionSettings.cpp new file mode 100644 index 0000000000..ea2729c2dd --- /dev/null +++ b/cpp/src/qpid/client/ConnectionSettings.cpp @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConnectionSettings.h" + +#include "qpid/sys/posix/check.h" +#include <sys/socket.h> +#include <netinet/in.h> +#include <netinet/tcp.h> + +namespace qpid { +namespace client { + +ConnectionSettings::ConnectionSettings() : + Options("Connection Settings"), + host("localhost"), + port(TcpAddress::DEFAULT_PORT), + clientid("cpp"), + username("guest"), + password("guest"), + mechanism("PLAIN"), + locale("en_US"), + heartbeat(0), + maxChannels(32767), + maxFrameSize(65535), + bounds(2), + tcpNoDelay(false) +{ + addOptions() + ("broker,b", optValue(host, "HOST"), "Broker host to connect to") + ("port,p", optValue(port, "PORT"), "Broker port to connect to") + ("virtualhost,v", optValue(virtualhost, "VHOST"), "virtual host") + ("clientname,n", optValue(clientid, "ID"), "unique client identifier") + ("username", optValue(username, "USER"), "user name for broker log in.") + ("password", optValue(password, "PASSWORD"), "password for broker log in.") + ("mechanism", optValue(mechanism, "MECH"), "SASL mechanism to use when authenticating.") + ("locale", optValue(locale, "LOCALE"), "locale to use.") + ("max-channels", optValue(maxChannels, "N"), "the maximum number of channels the client requires.") + ("max-frame-size", optValue(maxFrameSize, "N"), "the maximum frame size to request.") + ("bounds-multiplier", optValue(bounds, "N"), + "restricts the total size of outgoing frames queued up for writing (as a multiple of the max frame size)."); + add(log); +} + +ConnectionSettings::~ConnectionSettings() {} + +void ConnectionSettings::parse(int argc, char** argv) +{ + qpid::Options::parse(argc, argv); + qpid::log::Logger::instance().configure(log, argv[0]); +} + + +void ConnectionSettings::configurePosixTcpSocket(int fd) const +{ + if (tcpNoDelay) { + int flag = 1; + int result = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)); + QPID_POSIX_CHECK(result); + QPID_LOG(debug, "Set TCP_NODELAY"); + } +} + +}} // namespace qpid::client diff --git a/cpp/src/qpid/client/ConnectionSettings.h b/cpp/src/qpid/client/ConnectionSettings.h new file mode 100644 index 0000000000..b430c87099 --- /dev/null +++ b/cpp/src/qpid/client/ConnectionSettings.h @@ -0,0 +1,76 @@ +#ifndef QPID_CLIENT_CONNECTIONSETTINGS_H +#define QPID_CLIENT_CONNECTIONSETTINGS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/Options.h" +#include "qpid/log/Options.h" +#include "qpid/Url.h" +#include "qpid/log/Logger.h" +#include "qpid/sys/Socket.h" + +#include <iostream> +#include <exception> + +namespace qpid { +namespace client { + +/** + * Used to hold seetings for a connection (and parse these from + * command line oprions etc as a convenience). + */ +struct ConnectionSettings : qpid::Options, qpid::sys::Socket::Configuration +{ + ConnectionSettings(); + virtual ~ConnectionSettings(); + + /** + * Applies any tcp specific options to the sockets file descriptor + */ + virtual void configurePosixTcpSocket(int fd) const; + + /** + * Parse options from command line arguments (will throw exception + * if arguments cannot be parsed). + */ + void parse(int argc, char** argv); + + std::string host; + uint16_t port; + std::string virtualhost; + std::string clientid; + std::string username; + std::string password; + std::string mechanism; + std::string locale; + uint16_t heartbeat; + uint16_t maxChannels; + uint16_t maxFrameSize; + uint bounds; + bool tcpNoDelay; + + log::Options log; +}; + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_CONNECTIONSETTINGS_H*/ diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 7fb4997f5a..c9c55c50e8 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -20,6 +20,8 @@ */ #include "Connector.h" +#include "Bounds.h" +#include "ConnectionSettings.h" #include "qpid/log/Statement.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" @@ -40,21 +42,22 @@ using namespace qpid::framing; using boost::format; using boost::str; -Connector::Connector( - ProtocolVersion ver, bool _debug, uint32_t buffer_size -) : debug(_debug), - receive_buffer_size(buffer_size), - send_buffer_size(buffer_size), - version(ver), - initiated(false), - closed(true), - joined(true), - timeout(0), - idleIn(0), idleOut(0), - timeoutHandler(0), - shutdownHandler(0), - aio(0) -{} +Connector::Connector(ProtocolVersion ver, const ConnectionSettings& settings, Bounds* bounds) + : maxFrameSize(settings.maxFrameSize), + version(ver), + initiated(false), + closed(true), + joined(true), + timeout(0), + idleIn(0), idleOut(0), + timeoutHandler(0), + shutdownHandler(0), + writer(maxFrameSize, bounds), + aio(0) +{ + QPID_LOG(debug, "Connector created for " << version); + socket.configure(settings); +} Connector::~Connector() { close(); @@ -176,11 +179,11 @@ void Connector::setTimeoutHandler(TimeoutHandler* handler){ } struct Connector::Buff : public AsynchIO::BufferBase { - Buff() : AsynchIO::BufferBase(new char[65536], 65536) {} + Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} ~Buff() { delete [] bytes;} }; -Connector::Writer::Writer() : aio(0), buffer(0), lastEof(0) +Connector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b) { } @@ -192,12 +195,12 @@ void Connector::Writer::init(std::string id, sys::AsynchIO* a) { aio = a; newBuffer(l); } - void Connector::Writer::handle(framing::AMQFrame& frame) { Mutex::ScopedLock l(lock); frames.push_back(frame); - if (frame.getEof()) { + if (frame.getEof()) {//or if we already have a buffers worth lastEof = frames.size(); + QPID_LOG(debug, "Requesting write: lastEof=" << lastEof); aio->notifyPendingWrite(); } QPID_LOG(trace, "SENT " << identifier << ": " << frame); @@ -217,7 +220,7 @@ void Connector::Writer::writeOne(const Mutex::ScopedLock& l) { void Connector::Writer::newBuffer(const Mutex::ScopedLock&) { buffer = aio->getQueuedBuffer(); - if (!buffer) buffer = new Buff(); + if (!buffer) buffer = new Buff(maxFrameSize); encode = framing::Buffer(buffer->bytes, buffer->byteCount); framesEncoded = 0; } @@ -226,15 +229,20 @@ void Connector::Writer::newBuffer(const Mutex::ScopedLock&) { void Connector::Writer::write(sys::AsynchIO&) { Mutex::ScopedLock l(lock); assert(buffer); + size_t bytesWritten(0); for (size_t i = 0; i < lastEof; ++i) { AMQFrame& frame = frames[i]; - if (frame.size() > encode.available()) writeOne(l); - assert(frame.size() <= encode.available()); + uint32_t size = frame.size(); + if (size > encode.available()) writeOne(l); + assert(size <= encode.available()); frame.encode(encode); ++framesEncoded; + bytesWritten += size; + QPID_LOG(debug, "Wrote frame: lastEof=" << lastEof << ", i=" << i); } frames.erase(frames.begin(), frames.begin()+lastEof); lastEof = 0; + if (bounds) bounds->reduce(bytesWritten); if (encode.getPosition() > 0) writeOne(l); } @@ -272,7 +280,7 @@ void Connector::writebuff(AsynchIO& aio_) { } void Connector::writeDataBlock(const AMQDataBlock& data) { - AsynchIO::BufferBase* buff = new Buff; + AsynchIO::BufferBase* buff = new Buff(maxFrameSize); framing::Buffer out(buff->bytes, buff->byteCount); data.encode(out); buff->dataCount = data.size(); @@ -290,7 +298,7 @@ void Connector::run(){ Dispatcher d(poller); for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff); + aio->queueReadBuffer(new Buff(maxFrameSize)); } aio->start(poller); diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index 366f82acbd..9b13e1d519 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -31,17 +31,21 @@ #include "qpid/sys/TimeoutHandler.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Runnable.h" -#include "qpid/sys/Monitor.h" +#include "qpid/sys/Mutex.h" #include "qpid/sys/Socket.h" #include "qpid/sys/Time.h" #include "qpid/sys/AsynchIO.h" #include <queue> +#include <boost/shared_ptr.hpp> namespace qpid { namespace client { +class Bounds; +class ConnectionSettings; + class Connector : public framing::OutputHandler, private sys::Runnable { @@ -52,6 +56,7 @@ class Connector : public framing::OutputHandler, typedef sys::AsynchIO::BufferBase BufferBase; typedef std::vector<framing::AMQFrame> Frames; + const uint16_t maxFrameSize; sys::Mutex lock; sys::AsynchIO* aio; BufferBase* buffer; @@ -60,22 +65,21 @@ class Connector : public framing::OutputHandler, framing::Buffer encode; size_t framesEncoded; std::string identifier; + Bounds* bounds; void writeOne(const sys::Mutex::ScopedLock&); void newBuffer(const sys::Mutex::ScopedLock&); public: - Writer(); + Writer(uint16_t maxFrameSize, Bounds*); ~Writer(); void init(std::string id, sys::AsynchIO*); void handle(framing::AMQFrame&); void write(sys::AsynchIO&); }; - const bool debug; - const int receive_buffer_size; - const int send_buffer_size; + const uint16_t maxFrameSize; framing::ProtocolVersion version; bool initiated; @@ -122,7 +126,8 @@ class Connector : public framing::OutputHandler, public: Connector(framing::ProtocolVersion pVersion, - bool debug = false, uint32_t buffer_size = 1024); + const ConnectionSettings&, + Bounds* bounds = 0); virtual ~Connector(); virtual void connect(const std::string& host, int port); virtual void init(); diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 571d54df0c..e998d040c8 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -45,6 +45,7 @@ using namespace qpid::framing::session;//for detach codes typedef sys::Monitor::ScopedLock Lock; typedef sys::Monitor::ScopedUnlock UnLock; +typedef sys::ScopedLock<sys::Semaphore> Acquire; SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn, @@ -60,8 +61,9 @@ SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn, name(id.str()), //TODO: may want to allow application defined names instead connection(conn), + ioHandler(*this), channel(ch), - proxy(channel), + proxy(ioHandler), nextIn(0), nextOut(0) { @@ -281,14 +283,20 @@ Future SessionImpl::send(const AMQBody& command, const MethodContent& content) Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* content) { - Lock l(state); - checkOpen(); + Acquire a(sendLock); SequenceNumber id = nextOut++; - incompleteOut.add(id); + bool sync; + { + Lock l(state); + checkOpen(); + incompleteOut.add(id); + sync = syncMode; + } - if (syncMode) command.getMethod()->setSync(syncMode); + if (sync) command.getMethod()->setSync(true); Future f(id); if (command.getMethod()->resultExpected()) { + Lock l(state); //result listener must be set before the command is sent f.setFutureResult(results.listenForResult(id)); } @@ -300,26 +308,25 @@ Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* con if (content) { sendContent(*content); } - if (syncMode) { - waitForCompletionImpl(id); + if (sync) { + waitForCompletion(id); } return f; } - void SessionImpl::sendContent(const MethodContent& content) { AMQFrame header(content.getHeader()); - header.setBof(false); + header.setFirstSegment(false); u_int64_t data_length = content.getData().length(); if(data_length > 0){ - header.setEof(false); + header.setLastSegment(false); handleOut(header); /*Note: end of frame marker included in overhead but not in size*/ const u_int32_t frag_size = maxFrameSize - (AMQFrame::frameOverhead() - 1); if(data_length < frag_size){ AMQFrame frame(in_place<AMQContentBody>(content.getData())); - frame.setBof(false); + frame.setFirstSegment(false); handleOut(frame); }else{ u_int32_t offset = 0; @@ -328,15 +335,15 @@ void SessionImpl::sendContent(const MethodContent& content) u_int32_t length = remaining > frag_size ? frag_size : remaining; string frag(content.getData().substr(offset, length)); AMQFrame frame(in_place<AMQContentBody>(frag)); - frame.setBof(false); + frame.setFirstSegment(false); + frame.setLastSegment(true); if (offset > 0) { - frame.setBos(false); + frame.setFirstFrame(false); } offset += length; remaining = data_length - offset; if (remaining) { - frame.setEos(false); - frame.setEof(false); + frame.setLastFrame(false); } handleOut(frame); } @@ -391,6 +398,13 @@ void SessionImpl::handleIn(AMQFrame& frame) // network thread void SessionImpl::handleOut(AMQFrame& frame) // user thread { + connection->expand(frame.size(), true); + channel.handle(frame); +} + +void SessionImpl::proxyOut(AMQFrame& frame) // network thread +{ + connection->expand(frame.size(), false); channel.handle(frame); } @@ -620,10 +634,8 @@ void SessionImpl::assertOpen() const void SessionImpl::handleClosed() { - QPID_LOG(info, "SessionImpl::handleClosed(): entering"); demux.close(); results.close(); - QPID_LOG(info, "SessionImpl::handleClosed(): returning"); } }} diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h index 3b2e80fefd..0bcec4dd0c 100644 --- a/cpp/src/qpid/client/SessionImpl.h +++ b/cpp/src/qpid/client/SessionImpl.h @@ -33,6 +33,7 @@ #include "qpid/framing/SequenceNumber.h" #include "qpid/framing/AMQP_ClientOperations.h" #include "qpid/framing/AMQP_ServerProxy.h" +#include "qpid/sys/Semaphore.h" #include "qpid/sys/StateMonitor.h" #include <boost/optional.hpp> @@ -124,6 +125,7 @@ private: void handleIn(framing::AMQFrame& frame); void handleOut(framing::AMQFrame& frame); + void proxyOut(framing::AMQFrame& frame); void deliver(framing::AMQFrame& frame); Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0); @@ -164,14 +166,15 @@ private: int code; // Error code std::string text; // Error text mutable StateMonitor state; + mutable sys::Semaphore sendLock; volatile bool syncMode; uint32_t detachedLifetime; const uint64_t maxFrameSize; const framing::Uuid id; const std::string name; - shared_ptr<ConnectionImpl> connection; + framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler; framing::ChannelHandler channel; framing::AMQP_ServerProxy::Session proxy; |
