summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/Connector.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-01-06 19:50:59 +0000
committerGordon Sim <gsim@apache.org>2009-01-06 19:50:59 +0000
commitccd271e851f2bc2b52a7c8daaa54a06551d63dc0 (patch)
treeb0ebe5326ee6a15d0d3339c60805920911a77df5 /cpp/src/qpid/client/Connector.cpp
parent9b18a2b17aaa643001c54d48445ed0d8bb7f2a4c (diff)
downloadqpid-python-ccd271e851f2bc2b52a7c8daaa54a06551d63dc0.tar.gz
* Cyrus SASL intgeration for c++ client
* SASL security layer support for c++ client and broker git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@732082 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/Connector.cpp')
-rw-r--r--cpp/src/qpid/client/Connector.cpp197
1 files changed, 95 insertions, 102 deletions
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index bef98863a1..0e11b920e1 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -24,15 +24,18 @@
#include "ConnectionImpl.h"
#include "ConnectionSettings.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/Codec.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Poller.h"
+#include "qpid/sys/SecurityLayer.h"
#include "qpid/Msg.h"
#include <iostream>
#include <map>
+#include <deque>
#include <boost/bind.hpp>
#include <boost/format.hpp>
#include <boost/weak_ptr.hpp>
@@ -74,39 +77,19 @@ void Connector::registerFactory(const std::string& proto, Factory* connectorFact
theProtocolRegistry()[proto] = connectorFactory;
}
-class TCPConnector : public Connector, private sys::Runnable
+class TCPConnector : public Connector, public sys::Codec, private sys::Runnable
{
+ typedef std::deque<framing::AMQFrame> Frames;
struct Buff;
- /** Batch up frames for writing to aio. */
- class Writer : public framing::FrameHandler {
- typedef sys::AsynchIOBufferBase BufferBase;
- typedef std::vector<framing::AMQFrame> Frames;
-
- const uint16_t maxFrameSize;
- sys::Mutex lock;
- sys::AsynchIO* aio;
- BufferBase* buffer;
- Frames frames;
- size_t lastEof; // Position after last EOF in frames
- framing::Buffer encode;
- size_t framesEncoded;
- std::string identifier;
- Bounds* bounds;
-
- void writeOne();
- void newBuffer();
+ const uint16_t maxFrameSize;
- public:
-
- Writer(uint16_t maxFrameSize, Bounds*);
- ~Writer();
- void init(std::string id, sys::AsynchIO*);
- void handle(framing::AMQFrame&);
- void write(sys::AsynchIO&);
- };
+ sys::Mutex lock;
+ Frames frames; // Outgoing frame queue
+ size_t lastEof; // Position after last EOF in frames
+ uint64_t currentSize;
+ Bounds* bounds;
- const uint16_t maxFrameSize;
framing::ProtocolVersion version;
bool initiated;
@@ -119,14 +102,14 @@ class TCPConnector : public Connector, private sys::Runnable
framing::InitiationHandler* initialiser;
framing::OutputHandler* output;
- Writer writer;
-
sys::Thread receiver;
sys::Socket socket;
sys::AsynchIO* aio;
+ std::string identifier;
boost::shared_ptr<sys::Poller> poller;
+ std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
~TCPConnector();
@@ -139,8 +122,6 @@ class TCPConnector : public Connector, private sys::Runnable
void writeDataBlock(const framing::AMQDataBlock& data);
void eof(qpid::sys::AsynchIO&);
- std::string identifier;
-
boost::weak_ptr<ConnectionImpl> impl;
void connect(const std::string& host, int port);
@@ -153,6 +134,12 @@ class TCPConnector : public Connector, private sys::Runnable
sys::ShutdownHandler* getShutdownHandler() const;
framing::OutputHandler* getOutputHandler();
const std::string& getIdentifier() const;
+ void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>);
+
+ size_t decode(const char* buffer, size_t size);
+ size_t encode(const char* buffer, size_t size);
+ bool canEncode();
+
public:
TCPConnector(framing::ProtocolVersion pVersion,
@@ -177,12 +164,14 @@ TCPConnector::TCPConnector(ProtocolVersion ver,
const ConnectionSettings& settings,
ConnectionImpl* cimpl)
: maxFrameSize(settings.maxFrameSize),
+ lastEof(0),
+ currentSize(0),
+ bounds(cimpl),
version(ver),
initiated(false),
closed(true),
joined(true),
shutdownHandler(0),
- writer(maxFrameSize, cimpl),
aio(0),
impl(cimpl->shared_from_this())
{
@@ -214,7 +203,6 @@ void TCPConnector::connect(const std::string& host, int port){
0, // closed
0, // nobuffs
boost::bind(&TCPConnector::writebuff, this, _1));
- writer.init(identifier, aio);
}
void TCPConnector::init(){
@@ -266,7 +254,21 @@ const std::string& TCPConnector::getIdentifier() const {
}
void TCPConnector::send(AMQFrame& frame) {
- writer.handle(frame);
+ bool notifyWrite = false;
+ {
+ Mutex::ScopedLock l(lock);
+ frames.push_back(frame);
+ //only ask to write if this is the end of a frameset or if we
+ //already have a buffers worth of data
+ currentSize += frame.encodedSize();
+ if (frame.getEof()) {
+ lastEof = frames.size();
+ notifyWrite = true;
+ } else {
+ notifyWrite = (currentSize >= maxFrameSize);
+ }
+ }
+ if (notifyWrite) aio->notifyPendingWrite();
}
void TCPConnector::handleClosed() {
@@ -279,70 +281,70 @@ struct TCPConnector::Buff : public AsynchIO::BufferBase {
~Buff() { delete [] bytes;}
};
-TCPConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b)
+void TCPConnector::writebuff(AsynchIO& /*aio*/)
{
-}
-
-TCPConnector::Writer::~Writer() { delete buffer; }
+ Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
+ if (codec->canEncode()) {
+ std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer());
+ if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize));
+
+ size_t encoded = codec->encode(buffer->bytes, buffer->byteCount);
-void TCPConnector::Writer::init(std::string id, sys::AsynchIO* a) {
- Mutex::ScopedLock l(lock);
- identifier = id;
- aio = a;
- newBuffer();
-}
-void TCPConnector::Writer::handle(framing::AMQFrame& frame) {
- Mutex::ScopedLock l(lock);
- frames.push_back(frame);
- //only try to write if this is the end of a frameset or if we
- //already have a buffers worth of data
- if (frame.getEof() || (bounds && bounds->getCurrentSize() >= maxFrameSize)) {
- lastEof = frames.size();
- aio->notifyPendingWrite();
+ buffer->dataStart = 0;
+ buffer->dataCount = encoded;
+ aio->queueWrite(buffer.release());
}
- QPID_LOG(trace, "SENT " << identifier << ": " << frame);
-}
-
-void TCPConnector::Writer::writeOne() {
- assert(buffer);
- framesEncoded = 0;
-
- buffer->dataStart = 0;
- buffer->dataCount = encode.getPosition();
- aio->queueWrite(buffer);
- newBuffer();
}
-void TCPConnector::Writer::newBuffer() {
- buffer = aio->getQueuedBuffer();
- if (!buffer) buffer = new Buff(maxFrameSize);
- encode = framing::Buffer(buffer->bytes, buffer->byteCount);
- framesEncoded = 0;
+// Called in IO thread.
+bool TCPConnector::canEncode()
+{
+ Mutex::ScopedLock l(lock);
+ //have at least one full frameset or a whole buffers worth of data
+ return lastEof || currentSize >= maxFrameSize;
}
// Called in IO thread.
-void TCPConnector::Writer::write(sys::AsynchIO&) {
- Mutex::ScopedLock l(lock);
- assert(buffer);
+size_t TCPConnector::encode(const char* buffer, size_t size)
+{
+ framing::Buffer out(const_cast<char*>(buffer), size);
size_t bytesWritten(0);
- for (size_t i = 0; i < lastEof; ++i) {
- AMQFrame& frame = frames[i];
- uint32_t size = frame.encodedSize();
- if (size > encode.available()) writeOne();
- assert(size <= encode.available());
- frame.encode(encode);
- ++framesEncoded;
- bytesWritten += size;
+ {
+ Mutex::ScopedLock l(lock);
+ while (!frames.empty() && out.available() >= frames.front().encodedSize() ) {
+ frames.front().encode(out);
+ QPID_LOG(trace, "SENT " << identifier << ": " << frames.front());
+ frames.pop_front();
+ if (lastEof) --lastEof;
+ }
+ bytesWritten = size - out.available();
+ currentSize -= bytesWritten;
}
- frames.erase(frames.begin(), frames.begin()+lastEof);
- lastEof = 0;
if (bounds) bounds->reduce(bytesWritten);
- if (encode.getPosition() > 0) writeOne();
+ return bytesWritten;
}
-bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
- framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff)
+{
+ Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
+ int32_t decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
+ // TODO: unreading needs to go away, and when we can cope
+ // with multiple sub-buffers in the general buffer scheme, it will
+ if (decoded < buff->dataCount) {
+ // Adjust buffer for used bytes and then "unread them"
+ buff->dataStart += decoded;
+ buff->dataCount -= decoded;
+ aio.unread(buff);
+ } else {
+ // Give whole buffer back to aio subsystem
+ aio.queueReadBuffer(buff);
+ }
+ return true;
+}
+size_t TCPConnector::decode(const char* buffer, size_t size)
+{
+ framing::Buffer in(const_cast<char*>(buffer), size);
if (!initiated) {
framing::ProtocolInitiation protocolInit;
if (protocolInit.decode(in)) {
@@ -356,22 +358,7 @@ bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
QPID_LOG(trace, "RECV " << identifier << ": " << frame);
input->received(frame);
}
- // TODO: unreading needs to go away, and when we can cope
- // with multiple sub-buffers in the general buffer scheme, it will
- if (in.available() != 0) {
- // Adjust buffer for used bytes and then "unread them"
- buff->dataStart += buff->dataCount-in.available();
- buff->dataCount = in.available();
- aio.unread(buff);
- } else {
- // Give whole buffer back to aio subsystem
- aio.queueReadBuffer(buff);
- }
- return true;
-}
-
-void TCPConnector::writebuff(AsynchIO& aio_) {
- writer.write(aio_);
+ return size - in.available();
}
void TCPConnector::writeDataBlock(const AMQDataBlock& data) {
@@ -388,7 +375,7 @@ void TCPConnector::eof(AsynchIO&) {
// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing
// will never be called
-void TCPConnector::run(){
+void TCPConnector::run() {
// Keep the connection impl in memory until run() completes.
boost::shared_ptr<ConnectionImpl> protect = impl.lock();
assert(protect);
@@ -409,5 +396,11 @@ void TCPConnector::run(){
}
}
+void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
+{
+ securityLayer = sl;
+ securityLayer->init(this);
+}
+
}} // namespace qpid::client