summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/Bounds.cpp55
-rw-r--r--cpp/src/qpid/client/Bounds.h48
-rw-r--r--cpp/src/qpid/client/Connection.cpp67
-rw-r--r--cpp/src/qpid/client/Connection.h25
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp27
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.h35
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp51
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.h31
-rw-r--r--cpp/src/qpid/client/ConnectionSettings.cpp81
-rw-r--r--cpp/src/qpid/client/ConnectionSettings.h76
-rw-r--r--cpp/src/qpid/client/Connector.cpp56
-rw-r--r--cpp/src/qpid/client/Connector.h17
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp46
-rw-r--r--cpp/src/qpid/client/SessionImpl.h5
14 files changed, 456 insertions, 164 deletions
diff --git a/cpp/src/qpid/client/Bounds.cpp b/cpp/src/qpid/client/Bounds.cpp
new file mode 100644
index 0000000000..1df21db941
--- /dev/null
+++ b/cpp/src/qpid/client/Bounds.cpp
@@ -0,0 +1,55 @@
+#include "Bounds.h"
+
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace client {
+
+using sys::Monitor;
+
+Bounds::Bounds(size_t maxSize) : max(maxSize), current(0) {}
+
+bool Bounds::expand(size_t sizeRequired, bool block)
+{
+ if (max) {
+ Monitor::ScopedLock l(lock);
+ current += sizeRequired;
+ if (block) {
+ while (current > max) {
+ QPID_LOG(debug, "Waiting for bounds: " << *this);
+ lock.wait();
+ }
+ QPID_LOG(debug, "Bounds ok: " << *this);
+ }
+ return current <= max;
+ } else {
+ return true;
+ }
+}
+
+void Bounds::reduce(size_t size)
+{
+ if (!max || size == 0) return;
+ Monitor::ScopedLock l(lock);
+ if (current == 0) return;
+ bool needNotify = current > max;
+ current -= std::min(size, current);
+ if (needNotify && current < max) {
+ //todo: notify one at a time, but ensure that all threads are
+ //eventually notified
+ lock.notifyAll();
+ }
+}
+
+size_t Bounds::getCurrentSize()
+{
+ Monitor::ScopedLock l(lock);
+ return current;
+}
+
+std::ostream& operator<<(std::ostream& out, const Bounds& bounds) {
+ out << "current=" << bounds.current << ", max=" << bounds.max << " [" << &bounds << "]";
+ return out;
+}
+
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/Bounds.h b/cpp/src/qpid/client/Bounds.h
new file mode 100644
index 0000000000..db18becce3
--- /dev/null
+++ b/cpp/src/qpid/client/Bounds.h
@@ -0,0 +1,48 @@
+#ifndef QPID_CLIENT_BOUNDSCHECKING_H
+#define QPID_CLIENT_BOUNDSCHECKING_H
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/Monitor.h"
+
+namespace qpid{
+namespace client{
+
+class Bounds
+{
+ public:
+ Bounds(size_t maxSize);
+ bool expand(size_t, bool block);
+ void reduce(size_t);
+ size_t getCurrentSize();
+
+ private:
+ friend std::ostream& operator<<(std::ostream&, const Bounds&);
+ sys::Monitor lock;
+ const size_t max;
+ size_t current;
+};
+
+std::ostream& operator<<(std::ostream&, const Bounds&);
+
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp
index 25d1c510c8..c11f155afb 100644
--- a/cpp/src/qpid/client/Connection.cpp
+++ b/cpp/src/qpid/client/Connection.cpp
@@ -18,11 +18,8 @@
* under the License.
*
*/
-#include <algorithm>
-#include <boost/format.hpp>
-#include <boost/bind.hpp>
-
#include "Connection.h"
+#include "ConnectionSettings.h"
#include "Channel.h"
#include "Message.h"
#include "SessionImpl.h"
@@ -30,9 +27,13 @@
#include "qpid/log/Options.h"
#include "qpid/log/Statement.h"
#include "qpid/shared_ptr.h"
+
+#include <algorithm>
#include <iostream>
#include <sstream>
#include <functional>
+#include <boost/format.hpp>
+#include <boost/bind.hpp>
using namespace qpid::framing;
using namespace qpid::sys;
@@ -41,41 +42,49 @@ using namespace qpid::sys;
namespace qpid {
namespace client {
-Connection::Connection(bool _debug, uint32_t _max_frame_size, framing::ProtocolVersion _version) :
- channelIdCounter(0), version(_version),
- max_frame_size(_max_frame_size),
- isOpen(false),
- impl(new ConnectionImpl(
- shared_ptr<Connector>(new Connector(_version, _debug))))
-{}
-
-Connection::Connection(shared_ptr<Connector> c) :
- channelIdCounter(0), version(framing::highestProtocolVersion),
- max_frame_size(65535),
- isOpen(false),
- impl(new ConnectionImpl(c))
-{}
+Connection::Connection(framing::ProtocolVersion _version) :
+ channelIdCounter(0), version(_version) {}
Connection::~Connection(){ }
void Connection::open(
const std::string& host, int port,
- const std::string& uid, const std::string& pwd, const std::string& vhost)
+ const std::string& uid, const std::string& pwd,
+ const std::string& vhost,
+ uint16_t maxFrameSize)
+{
+ ConnectionSettings settings;
+ settings.host = host;
+ settings.port = port;
+ settings.username = uid;
+ settings.password = pwd;
+ settings.virtualhost = vhost;
+ settings.maxFrameSize = maxFrameSize;
+ open(settings);
+}
+
+void Connection::open(const ConnectionSettings& settings)
{
- if (isOpen)
- throw Exception(QPID_MSG("Channel object is already open"));
+ if (impl)
+ throw Exception(QPID_MSG("Connection::open() was already called"));
- impl->open(host, port, uid, pwd, vhost);
- isOpen = true;
+ impl = boost::shared_ptr<ConnectionImpl>(new ConnectionImpl(version, settings));
+ max_frame_size = impl->getNegotiatedSettings().maxFrameSize;
}
-void Connection::openChannel(Channel& channel) {
+void Connection::openChannel(Channel& channel)
+{
+ if (!impl)
+ throw Exception(QPID_MSG("Connection has not yet been opened"));
channel.open(newSession(ASYNC));
}
Session Connection::newSession(SynchronousMode sync,
- uint32_t detachedLifetime)
+ uint32_t detachedLifetime)
{
+ if (!impl)
+ throw Exception(QPID_MSG("Connection has not yet been opened"));
+
shared_ptr<SessionImpl> core(
new SessionImpl(impl, ++channelIdCounter, max_frame_size));
core->setSync(sync);
@@ -85,13 +94,19 @@ Session Connection::newSession(SynchronousMode sync,
}
void Connection::resume(Session& session) {
+ if (!impl)
+ throw Exception(QPID_MSG("Connection has not yet been opened"));
+
session.impl->setChannel(++channelIdCounter);
impl->addSession(session.impl);
session.impl->resume(impl);
}
void Connection::close() {
- impl->close();
+ if (impl) {
+ impl->close();
+ impl.reset();
+ }
}
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index 0ddd383381..417739fd1d 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -39,6 +39,7 @@ namespace qpid {
*/
namespace client {
+class ConnectionSettings;
/**
* \defgroup clientapi Application API for an AMQP client.
*/
@@ -54,9 +55,7 @@ class Connection
{
framing::ChannelId channelIdCounter;
framing::ProtocolVersion version;
- const uint32_t max_frame_size;
- bool isOpen;
- bool debug;
+ uint16_t max_frame_size;
protected:
boost::shared_ptr<ConnectionImpl> impl;
@@ -67,17 +66,8 @@ class Connection
* connection.
*
* @param _version the version of the protocol to connect with.
- *
- * @param debug turns on tracing for the connection
- * (i.e. prints details of the frames sent and received to std
- * out). Optional. Defaults to false.
- *
- * @param max_frame_size the maximum frame size that the
- * client will accept. Optional. Defaults to 65535.
*/
- Connection(bool debug = false, uint32_t max_frame_size = 65535,
- framing::ProtocolVersion=framing::highestProtocolVersion);
- Connection(boost::shared_ptr<Connector>);
+ Connection(framing::ProtocolVersion=framing::highestProtocolVersion);
~Connection();
/**
@@ -100,7 +90,14 @@ class Connection
void open(const std::string& host, int port = 5672,
const std::string& uid = "guest",
const std::string& pwd = "guest",
- const std::string& virtualhost = "/");
+ const std::string& virtualhost = "/", uint16_t maxFrameSize=65535);
+
+ /**
+ * Opens a connection to a broker.
+ *
+ * @param the settings to use (host, port etc) @see ConnectionSettings
+ */
+ void open(const ConnectionSettings& settings);
/**
* Close the connection.
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index df27942008..81d966d53f 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -20,9 +20,9 @@
*/
#include "ConnectionHandler.h"
+
#include "qpid/log/Statement.h"
#include "qpid/framing/amqp_framing.h"
-#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/all_method_bodies.h"
#include "qpid/framing/ClientInvoker.h"
#include "qpid/framing/reply_exceptions.h"
@@ -42,17 +42,10 @@ const std::string INVALID_STATE_OPEN_OK("open-ok received in invalid state");
const std::string INVALID_STATE_CLOSE_OK("close-ok received in invalid state");
}
-ConnectionHandler::ConnectionHandler()
- : StateManager(NOT_STARTED), outHandler(*this), proxy(outHandler), errorCode(200)
-{
-
- mechanism = PLAIN;
- locale = en_US;
- heartbeat = 0;
- maxChannels = 32767;
- maxFrameSize = 65535;
+ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, framing::ProtocolVersion& v)
+ : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler), errorCode(200), version(v)
+{
insist = true;
- version = framing::highestProtocolVersion;
ESTABLISHED.insert(FAILED);
ESTABLISHED.insert(CLOSED);
@@ -141,7 +134,7 @@ void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& /*
checkState(NOT_STARTED, INVALID_STATE_START);
setState(NEGOTIATING);
//TODO: verify that desired mechanism and locale are supported
- string response = ((char)0) + uid + ((char)0) + pwd;
+ string response = ((char)0) + username + ((char)0) + password;
proxy.startOk(properties, mechanism, response, locale);
}
@@ -150,14 +143,16 @@ void ConnectionHandler::secure(const std::string& /*challenge*/)
throw NotImplementedException("Challenge-response cycle not yet implemented in client");
}
-void ConnectionHandler::tune(uint16_t channelMax, uint16_t /*frameMax*/, uint16_t /*heartbeatMin*/, uint16_t /*heartbeatMax*/)
+void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t maxFrameSizeProposed,
+ uint16_t /*heartbeatMin*/, uint16_t /*heartbeatMax*/)
{
checkState(NEGOTIATING, INVALID_STATE_TUNE);
- //TODO: verify that desired heartbeat and max frame size are valid
- maxChannels = channelMax;
+ maxChannels = std::min(maxChannels, maxChannelsProposed);
+ maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed);
+ //TODO: implement heartbeats and check desired value is in valid range
proxy.tuneOk(maxChannels, maxFrameSize, heartbeat);
setState(OPENING);
- proxy.open(vhost, capabilities, insist);
+ proxy.open(virtualhost, capabilities, insist);
}
void ConnectionHandler::openOk(const framing::Array& /*knownHosts*/)
diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h
index d7ab97ce31..1cf0c905ed 100644
--- a/cpp/src/qpid/client/ConnectionHandler.h
+++ b/cpp/src/qpid/client/ConnectionHandler.h
@@ -22,9 +22,10 @@
#define _ConnectionHandler_
#include "ChainableFrameHandler.h"
-#include "Connector.h"
+#include "ConnectionSettings.h"
#include "StateManager.h"
#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/AMQP_ClientOperations.h"
#include "qpid/framing/AMQP_ServerProxy.h"
#include "qpid/framing/Array.h"
@@ -35,27 +36,11 @@
namespace qpid {
namespace client {
-struct ConnectionProperties
-{
- std::string uid;
- std::string pwd;
- std::string vhost;
- framing::FieldTable properties;
- std::string mechanism;
- std::string locale;
- framing::Array capabilities;
- uint16_t heartbeat;
- uint16_t maxChannels;
- uint64_t maxFrameSize;
- bool insist;
- framing::ProtocolVersion version;
-};
-
-class ConnectionHandler : private StateManager,
- public ConnectionProperties,
- public ChainableFrameHandler,
- public framing::InputHandler,
- private framing::AMQP_ClientOperations::ConnectionHandler
+class ConnectionHandler : private StateManager,
+ public ConnectionSettings,
+ public ChainableFrameHandler,
+ public framing::InputHandler,
+ private framing::AMQP_ClientOperations::ConnectionHandler
{
typedef framing::AMQP_ClientOperations::ConnectionHandler ConnectionOperations;
enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED};
@@ -73,6 +58,10 @@ class ConnectionHandler : private StateManager,
framing::AMQP_ServerProxy::Connection proxy;
uint16_t errorCode;
std::string errorText;
+ bool insist;
+ framing::ProtocolVersion version;
+ framing::Array capabilities;
+ framing::FieldTable properties;
void checkState(STATES s, const std::string& msg);
@@ -96,7 +85,7 @@ public:
typedef boost::function<void()> CloseListener;
typedef boost::function<void(uint16_t, const std::string&)> ErrorListener;
- ConnectionHandler();
+ ConnectionHandler(const ConnectionSettings&, framing::ProtocolVersion&);
void received(framing::AMQFrame& f) { incoming(f); }
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index ce95e43f58..643d42403d 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -18,13 +18,14 @@
* under the License.
*
*/
+#include "ConnectionImpl.h"
+#include "ConnectionSettings.h"
+#include "SessionImpl.h"
+
#include "qpid/log/Statement.h"
#include "qpid/framing/constants.h"
#include "qpid/framing/reply_exceptions.h"
-#include "ConnectionImpl.h"
-#include "SessionImpl.h"
-
#include <boost/bind.hpp>
#include <boost/format.hpp>
@@ -34,24 +35,32 @@ using namespace qpid::sys;
using namespace qpid::framing::connection;//for connection error codes
-ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c)
- : connector(c), isClosed(false), isClosing(false)
+ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings)
+ : Bounds(settings.maxFrameSize * settings.bounds),
+ handler(settings, v),
+ connector(v, settings, this),
+ version(v),
+ isClosed(false),
+ isClosing(false)
{
+ QPID_LOG(debug, "ConnectionImpl created for " << version);
handler.in = boost::bind(&ConnectionImpl::incoming, this, _1);
- handler.out = boost::bind(&Connector::send, connector, _1);
+ handler.out = boost::bind(&Connector::send, boost::ref(connector), _1);
handler.onClose = boost::bind(&ConnectionImpl::closed, this,
NORMAL, std::string());
handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2);
- connector->setInputHandler(&handler);
- connector->setTimeoutHandler(this);
- connector->setShutdownHandler(this);
+ connector.setInputHandler(&handler);
+ connector.setTimeoutHandler(this);
+ connector.setShutdownHandler(this);
+
+ open(settings.host, settings.port);
}
ConnectionImpl::~ConnectionImpl() {
// Important to close the connector first, to ensure the
// connector thread does not call on us while the destructor
// is running.
- connector->close();
+ connector.close();
}
void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session)
@@ -79,18 +88,11 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame)
s->in(frame);
}
-void ConnectionImpl::open(const std::string& host, int port,
- const std::string& uid, const std::string& pwd,
- const std::string& vhost)
+void ConnectionImpl::open(const std::string& host, int port)
{
- //TODO: better management of connection properties
- handler.uid = uid;
- handler.pwd = pwd;
- handler.vhost = vhost;
-
QPID_LOG(info, "Connecting to " << host << ":" << port);
- connector->connect(host, port);
- connector->init();
+ connector.connect(host, port);
+ connector.init();
handler.waitForOpen();
}
@@ -102,7 +104,7 @@ void ConnectionImpl::idleIn()
void ConnectionImpl::idleOut()
{
AMQFrame frame(in_place<AMQHeartbeatBody>());
- connector->send(frame);
+ connector.send(frame);
}
void ConnectionImpl::close()
@@ -121,7 +123,7 @@ void ConnectionImpl::close()
// so sessions can be updated outside the lock.
ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const Mutex::ScopedLock&) {
isClosed = true;
- connector->close();
+ connector.close();
SessionVector save;
for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
boost::shared_ptr<SessionImpl> s = i->second.lock();
@@ -157,4 +159,9 @@ void ConnectionImpl::erase(uint16_t ch) {
Mutex::ScopedLock l(lock);
sessions.erase(ch);
}
+
+const ConnectionSettings& ConnectionImpl::getNegotiatedSettings()
+{
+ return handler;
+}
diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h
index d0df9238f2..986b044b49 100644
--- a/cpp/src/qpid/client/ConnectionImpl.h
+++ b/cpp/src/qpid/client/ConnectionImpl.h
@@ -22,22 +22,26 @@
#ifndef _ConnectionImpl_
#define _ConnectionImpl_
-#include <map>
-#include <boost/shared_ptr.hpp>
-#include <boost/weak_ptr.hpp>
+#include "Bounds.h"
+#include "ConnectionHandler.h"
+#include "Connector.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/ShutdownHandler.h"
#include "qpid/sys/TimeoutHandler.h"
-#include "ConnectionHandler.h"
-#include "Connector.h"
+
+#include <map>
+#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
namespace qpid {
namespace client {
+class ConnectionSettings;
class SessionImpl;
-class ConnectionImpl : public framing::FrameHandler,
+class ConnectionImpl : public Bounds,
+ public framing::FrameHandler,
public sys::TimeoutHandler,
public sys::ShutdownHandler
@@ -47,7 +51,7 @@ class ConnectionImpl : public framing::FrameHandler,
SessionMap sessions;
ConnectionHandler handler;
- boost::shared_ptr<Connector> connector;
+ Connector connector;
framing::ProtocolVersion version;
sys::Mutex lock;
bool isClosed;
@@ -55,6 +59,8 @@ class ConnectionImpl : public framing::FrameHandler,
template <class F> void detachAll(const F&);
+ void open(const std::string& host, int port);
+
SessionVector closeInternal(const sys::Mutex::ScopedLock&);
void incoming(framing::AMQFrame& frame);
void closed(uint16_t, const std::string&);
@@ -64,21 +70,16 @@ class ConnectionImpl : public framing::FrameHandler,
bool setClosing();
public:
- typedef boost::shared_ptr<ConnectionImpl> shared_ptr;
-
- ConnectionImpl(boost::shared_ptr<Connector> c);
+ ConnectionImpl(framing::ProtocolVersion version, const ConnectionSettings& settings);
~ConnectionImpl();
void addSession(const boost::shared_ptr<SessionImpl>&);
- void open(const std::string& host, int port = 5672,
- const std::string& uid = "guest",
- const std::string& pwd = "guest",
- const std::string& virtualhost = "/");
void close();
void handle(framing::AMQFrame& frame);
void erase(uint16_t channel);
- boost::shared_ptr<Connector> getConnector() { return connector; }
+
+ const ConnectionSettings& getNegotiatedSettings();
};
}}
diff --git a/cpp/src/qpid/client/ConnectionSettings.cpp b/cpp/src/qpid/client/ConnectionSettings.cpp
new file mode 100644
index 0000000000..ea2729c2dd
--- /dev/null
+++ b/cpp/src/qpid/client/ConnectionSettings.cpp
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionSettings.h"
+
+#include "qpid/sys/posix/check.h"
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+
+namespace qpid {
+namespace client {
+
+ConnectionSettings::ConnectionSettings() :
+ Options("Connection Settings"),
+ host("localhost"),
+ port(TcpAddress::DEFAULT_PORT),
+ clientid("cpp"),
+ username("guest"),
+ password("guest"),
+ mechanism("PLAIN"),
+ locale("en_US"),
+ heartbeat(0),
+ maxChannels(32767),
+ maxFrameSize(65535),
+ bounds(2),
+ tcpNoDelay(false)
+{
+ addOptions()
+ ("broker,b", optValue(host, "HOST"), "Broker host to connect to")
+ ("port,p", optValue(port, "PORT"), "Broker port to connect to")
+ ("virtualhost,v", optValue(virtualhost, "VHOST"), "virtual host")
+ ("clientname,n", optValue(clientid, "ID"), "unique client identifier")
+ ("username", optValue(username, "USER"), "user name for broker log in.")
+ ("password", optValue(password, "PASSWORD"), "password for broker log in.")
+ ("mechanism", optValue(mechanism, "MECH"), "SASL mechanism to use when authenticating.")
+ ("locale", optValue(locale, "LOCALE"), "locale to use.")
+ ("max-channels", optValue(maxChannels, "N"), "the maximum number of channels the client requires.")
+ ("max-frame-size", optValue(maxFrameSize, "N"), "the maximum frame size to request.")
+ ("bounds-multiplier", optValue(bounds, "N"),
+ "restricts the total size of outgoing frames queued up for writing (as a multiple of the max frame size).");
+ add(log);
+}
+
+ConnectionSettings::~ConnectionSettings() {}
+
+void ConnectionSettings::parse(int argc, char** argv)
+{
+ qpid::Options::parse(argc, argv);
+ qpid::log::Logger::instance().configure(log, argv[0]);
+}
+
+
+void ConnectionSettings::configurePosixTcpSocket(int fd) const
+{
+ if (tcpNoDelay) {
+ int flag = 1;
+ int result = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
+ QPID_POSIX_CHECK(result);
+ QPID_LOG(debug, "Set TCP_NODELAY");
+ }
+}
+
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/ConnectionSettings.h b/cpp/src/qpid/client/ConnectionSettings.h
new file mode 100644
index 0000000000..b430c87099
--- /dev/null
+++ b/cpp/src/qpid/client/ConnectionSettings.h
@@ -0,0 +1,76 @@
+#ifndef QPID_CLIENT_CONNECTIONSETTINGS_H
+#define QPID_CLIENT_CONNECTIONSETTINGS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Options.h"
+#include "qpid/log/Options.h"
+#include "qpid/Url.h"
+#include "qpid/log/Logger.h"
+#include "qpid/sys/Socket.h"
+
+#include <iostream>
+#include <exception>
+
+namespace qpid {
+namespace client {
+
+/**
+ * Used to hold seetings for a connection (and parse these from
+ * command line oprions etc as a convenience).
+ */
+struct ConnectionSettings : qpid::Options, qpid::sys::Socket::Configuration
+{
+ ConnectionSettings();
+ virtual ~ConnectionSettings();
+
+ /**
+ * Applies any tcp specific options to the sockets file descriptor
+ */
+ virtual void configurePosixTcpSocket(int fd) const;
+
+ /**
+ * Parse options from command line arguments (will throw exception
+ * if arguments cannot be parsed).
+ */
+ void parse(int argc, char** argv);
+
+ std::string host;
+ uint16_t port;
+ std::string virtualhost;
+ std::string clientid;
+ std::string username;
+ std::string password;
+ std::string mechanism;
+ std::string locale;
+ uint16_t heartbeat;
+ uint16_t maxChannels;
+ uint16_t maxFrameSize;
+ uint bounds;
+ bool tcpNoDelay;
+
+ log::Options log;
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_CONNECTIONSETTINGS_H*/
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index 7fb4997f5a..c9c55c50e8 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -20,6 +20,8 @@
*/
#include "Connector.h"
+#include "Bounds.h"
+#include "ConnectionSettings.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/AMQFrame.h"
@@ -40,21 +42,22 @@ using namespace qpid::framing;
using boost::format;
using boost::str;
-Connector::Connector(
- ProtocolVersion ver, bool _debug, uint32_t buffer_size
-) : debug(_debug),
- receive_buffer_size(buffer_size),
- send_buffer_size(buffer_size),
- version(ver),
- initiated(false),
- closed(true),
- joined(true),
- timeout(0),
- idleIn(0), idleOut(0),
- timeoutHandler(0),
- shutdownHandler(0),
- aio(0)
-{}
+Connector::Connector(ProtocolVersion ver, const ConnectionSettings& settings, Bounds* bounds)
+ : maxFrameSize(settings.maxFrameSize),
+ version(ver),
+ initiated(false),
+ closed(true),
+ joined(true),
+ timeout(0),
+ idleIn(0), idleOut(0),
+ timeoutHandler(0),
+ shutdownHandler(0),
+ writer(maxFrameSize, bounds),
+ aio(0)
+{
+ QPID_LOG(debug, "Connector created for " << version);
+ socket.configure(settings);
+}
Connector::~Connector() {
close();
@@ -176,11 +179,11 @@ void Connector::setTimeoutHandler(TimeoutHandler* handler){
}
struct Connector::Buff : public AsynchIO::BufferBase {
- Buff() : AsynchIO::BufferBase(new char[65536], 65536) {}
+ Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
~Buff() { delete [] bytes;}
};
-Connector::Writer::Writer() : aio(0), buffer(0), lastEof(0)
+Connector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b)
{
}
@@ -192,12 +195,12 @@ void Connector::Writer::init(std::string id, sys::AsynchIO* a) {
aio = a;
newBuffer(l);
}
-
void Connector::Writer::handle(framing::AMQFrame& frame) {
Mutex::ScopedLock l(lock);
frames.push_back(frame);
- if (frame.getEof()) {
+ if (frame.getEof()) {//or if we already have a buffers worth
lastEof = frames.size();
+ QPID_LOG(debug, "Requesting write: lastEof=" << lastEof);
aio->notifyPendingWrite();
}
QPID_LOG(trace, "SENT " << identifier << ": " << frame);
@@ -217,7 +220,7 @@ void Connector::Writer::writeOne(const Mutex::ScopedLock& l) {
void Connector::Writer::newBuffer(const Mutex::ScopedLock&) {
buffer = aio->getQueuedBuffer();
- if (!buffer) buffer = new Buff();
+ if (!buffer) buffer = new Buff(maxFrameSize);
encode = framing::Buffer(buffer->bytes, buffer->byteCount);
framesEncoded = 0;
}
@@ -226,15 +229,20 @@ void Connector::Writer::newBuffer(const Mutex::ScopedLock&) {
void Connector::Writer::write(sys::AsynchIO&) {
Mutex::ScopedLock l(lock);
assert(buffer);
+ size_t bytesWritten(0);
for (size_t i = 0; i < lastEof; ++i) {
AMQFrame& frame = frames[i];
- if (frame.size() > encode.available()) writeOne(l);
- assert(frame.size() <= encode.available());
+ uint32_t size = frame.size();
+ if (size > encode.available()) writeOne(l);
+ assert(size <= encode.available());
frame.encode(encode);
++framesEncoded;
+ bytesWritten += size;
+ QPID_LOG(debug, "Wrote frame: lastEof=" << lastEof << ", i=" << i);
}
frames.erase(frames.begin(), frames.begin()+lastEof);
lastEof = 0;
+ if (bounds) bounds->reduce(bytesWritten);
if (encode.getPosition() > 0) writeOne(l);
}
@@ -272,7 +280,7 @@ void Connector::writebuff(AsynchIO& aio_) {
}
void Connector::writeDataBlock(const AMQDataBlock& data) {
- AsynchIO::BufferBase* buff = new Buff;
+ AsynchIO::BufferBase* buff = new Buff(maxFrameSize);
framing::Buffer out(buff->bytes, buff->byteCount);
data.encode(out);
buff->dataCount = data.size();
@@ -290,7 +298,7 @@ void Connector::run(){
Dispatcher d(poller);
for (int i = 0; i < 32; i++) {
- aio->queueReadBuffer(new Buff);
+ aio->queueReadBuffer(new Buff(maxFrameSize));
}
aio->start(poller);
diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h
index 366f82acbd..9b13e1d519 100644
--- a/cpp/src/qpid/client/Connector.h
+++ b/cpp/src/qpid/client/Connector.h
@@ -31,17 +31,21 @@
#include "qpid/sys/TimeoutHandler.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Mutex.h"
#include "qpid/sys/Socket.h"
#include "qpid/sys/Time.h"
#include "qpid/sys/AsynchIO.h"
#include <queue>
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace client {
+class Bounds;
+class ConnectionSettings;
+
class Connector : public framing::OutputHandler,
private sys::Runnable
{
@@ -52,6 +56,7 @@ class Connector : public framing::OutputHandler,
typedef sys::AsynchIO::BufferBase BufferBase;
typedef std::vector<framing::AMQFrame> Frames;
+ const uint16_t maxFrameSize;
sys::Mutex lock;
sys::AsynchIO* aio;
BufferBase* buffer;
@@ -60,22 +65,21 @@ class Connector : public framing::OutputHandler,
framing::Buffer encode;
size_t framesEncoded;
std::string identifier;
+ Bounds* bounds;
void writeOne(const sys::Mutex::ScopedLock&);
void newBuffer(const sys::Mutex::ScopedLock&);
public:
- Writer();
+ Writer(uint16_t maxFrameSize, Bounds*);
~Writer();
void init(std::string id, sys::AsynchIO*);
void handle(framing::AMQFrame&);
void write(sys::AsynchIO&);
};
- const bool debug;
- const int receive_buffer_size;
- const int send_buffer_size;
+ const uint16_t maxFrameSize;
framing::ProtocolVersion version;
bool initiated;
@@ -122,7 +126,8 @@ class Connector : public framing::OutputHandler,
public:
Connector(framing::ProtocolVersion pVersion,
- bool debug = false, uint32_t buffer_size = 1024);
+ const ConnectionSettings&,
+ Bounds* bounds = 0);
virtual ~Connector();
virtual void connect(const std::string& host, int port);
virtual void init();
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index 571d54df0c..e998d040c8 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -45,6 +45,7 @@ using namespace qpid::framing::session;//for detach codes
typedef sys::Monitor::ScopedLock Lock;
typedef sys::Monitor::ScopedUnlock UnLock;
+typedef sys::ScopedLock<sys::Semaphore> Acquire;
SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn,
@@ -60,8 +61,9 @@ SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn,
name(id.str()),
//TODO: may want to allow application defined names instead
connection(conn),
+ ioHandler(*this),
channel(ch),
- proxy(channel),
+ proxy(ioHandler),
nextIn(0),
nextOut(0)
{
@@ -281,14 +283,20 @@ Future SessionImpl::send(const AMQBody& command, const MethodContent& content)
Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* content)
{
- Lock l(state);
- checkOpen();
+ Acquire a(sendLock);
SequenceNumber id = nextOut++;
- incompleteOut.add(id);
+ bool sync;
+ {
+ Lock l(state);
+ checkOpen();
+ incompleteOut.add(id);
+ sync = syncMode;
+ }
- if (syncMode) command.getMethod()->setSync(syncMode);
+ if (sync) command.getMethod()->setSync(true);
Future f(id);
if (command.getMethod()->resultExpected()) {
+ Lock l(state);
//result listener must be set before the command is sent
f.setFutureResult(results.listenForResult(id));
}
@@ -300,26 +308,25 @@ Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* con
if (content) {
sendContent(*content);
}
- if (syncMode) {
- waitForCompletionImpl(id);
+ if (sync) {
+ waitForCompletion(id);
}
return f;
}
-
void SessionImpl::sendContent(const MethodContent& content)
{
AMQFrame header(content.getHeader());
- header.setBof(false);
+ header.setFirstSegment(false);
u_int64_t data_length = content.getData().length();
if(data_length > 0){
- header.setEof(false);
+ header.setLastSegment(false);
handleOut(header);
/*Note: end of frame marker included in overhead but not in size*/
const u_int32_t frag_size = maxFrameSize - (AMQFrame::frameOverhead() - 1);
if(data_length < frag_size){
AMQFrame frame(in_place<AMQContentBody>(content.getData()));
- frame.setBof(false);
+ frame.setFirstSegment(false);
handleOut(frame);
}else{
u_int32_t offset = 0;
@@ -328,15 +335,15 @@ void SessionImpl::sendContent(const MethodContent& content)
u_int32_t length = remaining > frag_size ? frag_size : remaining;
string frag(content.getData().substr(offset, length));
AMQFrame frame(in_place<AMQContentBody>(frag));
- frame.setBof(false);
+ frame.setFirstSegment(false);
+ frame.setLastSegment(true);
if (offset > 0) {
- frame.setBos(false);
+ frame.setFirstFrame(false);
}
offset += length;
remaining = data_length - offset;
if (remaining) {
- frame.setEos(false);
- frame.setEof(false);
+ frame.setLastFrame(false);
}
handleOut(frame);
}
@@ -391,6 +398,13 @@ void SessionImpl::handleIn(AMQFrame& frame) // network thread
void SessionImpl::handleOut(AMQFrame& frame) // user thread
{
+ connection->expand(frame.size(), true);
+ channel.handle(frame);
+}
+
+void SessionImpl::proxyOut(AMQFrame& frame) // network thread
+{
+ connection->expand(frame.size(), false);
channel.handle(frame);
}
@@ -620,10 +634,8 @@ void SessionImpl::assertOpen() const
void SessionImpl::handleClosed()
{
- QPID_LOG(info, "SessionImpl::handleClosed(): entering");
demux.close();
results.close();
- QPID_LOG(info, "SessionImpl::handleClosed(): returning");
}
}}
diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h
index 3b2e80fefd..0bcec4dd0c 100644
--- a/cpp/src/qpid/client/SessionImpl.h
+++ b/cpp/src/qpid/client/SessionImpl.h
@@ -33,6 +33,7 @@
#include "qpid/framing/SequenceNumber.h"
#include "qpid/framing/AMQP_ClientOperations.h"
#include "qpid/framing/AMQP_ServerProxy.h"
+#include "qpid/sys/Semaphore.h"
#include "qpid/sys/StateMonitor.h"
#include <boost/optional.hpp>
@@ -124,6 +125,7 @@ private:
void handleIn(framing::AMQFrame& frame);
void handleOut(framing::AMQFrame& frame);
+ void proxyOut(framing::AMQFrame& frame);
void deliver(framing::AMQFrame& frame);
Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0);
@@ -164,14 +166,15 @@ private:
int code; // Error code
std::string text; // Error text
mutable StateMonitor state;
+ mutable sys::Semaphore sendLock;
volatile bool syncMode;
uint32_t detachedLifetime;
const uint64_t maxFrameSize;
const framing::Uuid id;
const std::string name;
-
shared_ptr<ConnectionImpl> connection;
+ framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler;
framing::ChannelHandler channel;
framing::AMQP_ServerProxy::Session proxy;