diff options
| author | Gordon Sim <gsim@apache.org> | 2009-01-06 19:50:59 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2009-01-06 19:50:59 +0000 |
| commit | ccd271e851f2bc2b52a7c8daaa54a06551d63dc0 (patch) | |
| tree | b0ebe5326ee6a15d0d3339c60805920911a77df5 /cpp/src/qpid/client/Connector.cpp | |
| parent | 9b18a2b17aaa643001c54d48445ed0d8bb7f2a4c (diff) | |
| download | qpid-python-ccd271e851f2bc2b52a7c8daaa54a06551d63dc0.tar.gz | |
* Cyrus SASL intgeration for c++ client
* SASL security layer support for c++ client and broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@732082 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/Connector.cpp')
| -rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 197 |
1 files changed, 95 insertions, 102 deletions
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index bef98863a1..0e11b920e1 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -24,15 +24,18 @@ #include "ConnectionImpl.h" #include "ConnectionSettings.h" #include "qpid/log/Statement.h" +#include "qpid/sys/Codec.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" +#include "qpid/sys/SecurityLayer.h" #include "qpid/Msg.h" #include <iostream> #include <map> +#include <deque> #include <boost/bind.hpp> #include <boost/format.hpp> #include <boost/weak_ptr.hpp> @@ -74,39 +77,19 @@ void Connector::registerFactory(const std::string& proto, Factory* connectorFact theProtocolRegistry()[proto] = connectorFactory; } -class TCPConnector : public Connector, private sys::Runnable +class TCPConnector : public Connector, public sys::Codec, private sys::Runnable { + typedef std::deque<framing::AMQFrame> Frames; struct Buff; - /** Batch up frames for writing to aio. */ - class Writer : public framing::FrameHandler { - typedef sys::AsynchIOBufferBase BufferBase; - typedef std::vector<framing::AMQFrame> Frames; - - const uint16_t maxFrameSize; - sys::Mutex lock; - sys::AsynchIO* aio; - BufferBase* buffer; - Frames frames; - size_t lastEof; // Position after last EOF in frames - framing::Buffer encode; - size_t framesEncoded; - std::string identifier; - Bounds* bounds; - - void writeOne(); - void newBuffer(); + const uint16_t maxFrameSize; - public: - - Writer(uint16_t maxFrameSize, Bounds*); - ~Writer(); - void init(std::string id, sys::AsynchIO*); - void handle(framing::AMQFrame&); - void write(sys::AsynchIO&); - }; + sys::Mutex lock; + Frames frames; // Outgoing frame queue + size_t lastEof; // Position after last EOF in frames + uint64_t currentSize; + Bounds* bounds; - const uint16_t maxFrameSize; framing::ProtocolVersion version; bool initiated; @@ -119,14 +102,14 @@ class TCPConnector : public Connector, private sys::Runnable framing::InitiationHandler* initialiser; framing::OutputHandler* output; - Writer writer; - sys::Thread receiver; sys::Socket socket; sys::AsynchIO* aio; + std::string identifier; boost::shared_ptr<sys::Poller> poller; + std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; ~TCPConnector(); @@ -139,8 +122,6 @@ class TCPConnector : public Connector, private sys::Runnable void writeDataBlock(const framing::AMQDataBlock& data); void eof(qpid::sys::AsynchIO&); - std::string identifier; - boost::weak_ptr<ConnectionImpl> impl; void connect(const std::string& host, int port); @@ -153,6 +134,12 @@ class TCPConnector : public Connector, private sys::Runnable sys::ShutdownHandler* getShutdownHandler() const; framing::OutputHandler* getOutputHandler(); const std::string& getIdentifier() const; + void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>); + + size_t decode(const char* buffer, size_t size); + size_t encode(const char* buffer, size_t size); + bool canEncode(); + public: TCPConnector(framing::ProtocolVersion pVersion, @@ -177,12 +164,14 @@ TCPConnector::TCPConnector(ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), + lastEof(0), + currentSize(0), + bounds(cimpl), version(ver), initiated(false), closed(true), joined(true), shutdownHandler(0), - writer(maxFrameSize, cimpl), aio(0), impl(cimpl->shared_from_this()) { @@ -214,7 +203,6 @@ void TCPConnector::connect(const std::string& host, int port){ 0, // closed 0, // nobuffs boost::bind(&TCPConnector::writebuff, this, _1)); - writer.init(identifier, aio); } void TCPConnector::init(){ @@ -266,7 +254,21 @@ const std::string& TCPConnector::getIdentifier() const { } void TCPConnector::send(AMQFrame& frame) { - writer.handle(frame); + bool notifyWrite = false; + { + Mutex::ScopedLock l(lock); + frames.push_back(frame); + //only ask to write if this is the end of a frameset or if we + //already have a buffers worth of data + currentSize += frame.encodedSize(); + if (frame.getEof()) { + lastEof = frames.size(); + notifyWrite = true; + } else { + notifyWrite = (currentSize >= maxFrameSize); + } + } + if (notifyWrite) aio->notifyPendingWrite(); } void TCPConnector::handleClosed() { @@ -279,70 +281,70 @@ struct TCPConnector::Buff : public AsynchIO::BufferBase { ~Buff() { delete [] bytes;} }; -TCPConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b) +void TCPConnector::writebuff(AsynchIO& /*aio*/) { -} - -TCPConnector::Writer::~Writer() { delete buffer; } + Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; + if (codec->canEncode()) { + std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer()); + if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize)); + + size_t encoded = codec->encode(buffer->bytes, buffer->byteCount); -void TCPConnector::Writer::init(std::string id, sys::AsynchIO* a) { - Mutex::ScopedLock l(lock); - identifier = id; - aio = a; - newBuffer(); -} -void TCPConnector::Writer::handle(framing::AMQFrame& frame) { - Mutex::ScopedLock l(lock); - frames.push_back(frame); - //only try to write if this is the end of a frameset or if we - //already have a buffers worth of data - if (frame.getEof() || (bounds && bounds->getCurrentSize() >= maxFrameSize)) { - lastEof = frames.size(); - aio->notifyPendingWrite(); + buffer->dataStart = 0; + buffer->dataCount = encoded; + aio->queueWrite(buffer.release()); } - QPID_LOG(trace, "SENT " << identifier << ": " << frame); -} - -void TCPConnector::Writer::writeOne() { - assert(buffer); - framesEncoded = 0; - - buffer->dataStart = 0; - buffer->dataCount = encode.getPosition(); - aio->queueWrite(buffer); - newBuffer(); } -void TCPConnector::Writer::newBuffer() { - buffer = aio->getQueuedBuffer(); - if (!buffer) buffer = new Buff(maxFrameSize); - encode = framing::Buffer(buffer->bytes, buffer->byteCount); - framesEncoded = 0; +// Called in IO thread. +bool TCPConnector::canEncode() +{ + Mutex::ScopedLock l(lock); + //have at least one full frameset or a whole buffers worth of data + return lastEof || currentSize >= maxFrameSize; } // Called in IO thread. -void TCPConnector::Writer::write(sys::AsynchIO&) { - Mutex::ScopedLock l(lock); - assert(buffer); +size_t TCPConnector::encode(const char* buffer, size_t size) +{ + framing::Buffer out(const_cast<char*>(buffer), size); size_t bytesWritten(0); - for (size_t i = 0; i < lastEof; ++i) { - AMQFrame& frame = frames[i]; - uint32_t size = frame.encodedSize(); - if (size > encode.available()) writeOne(); - assert(size <= encode.available()); - frame.encode(encode); - ++framesEncoded; - bytesWritten += size; + { + Mutex::ScopedLock l(lock); + while (!frames.empty() && out.available() >= frames.front().encodedSize() ) { + frames.front().encode(out); + QPID_LOG(trace, "SENT " << identifier << ": " << frames.front()); + frames.pop_front(); + if (lastEof) --lastEof; + } + bytesWritten = size - out.available(); + currentSize -= bytesWritten; } - frames.erase(frames.begin(), frames.begin()+lastEof); - lastEof = 0; if (bounds) bounds->reduce(bytesWritten); - if (encode.getPosition() > 0) writeOne(); + return bytesWritten; } -bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) { - framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); +bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) +{ + Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; + int32_t decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount); + // TODO: unreading needs to go away, and when we can cope + // with multiple sub-buffers in the general buffer scheme, it will + if (decoded < buff->dataCount) { + // Adjust buffer for used bytes and then "unread them" + buff->dataStart += decoded; + buff->dataCount -= decoded; + aio.unread(buff); + } else { + // Give whole buffer back to aio subsystem + aio.queueReadBuffer(buff); + } + return true; +} +size_t TCPConnector::decode(const char* buffer, size_t size) +{ + framing::Buffer in(const_cast<char*>(buffer), size); if (!initiated) { framing::ProtocolInitiation protocolInit; if (protocolInit.decode(in)) { @@ -356,22 +358,7 @@ bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) { QPID_LOG(trace, "RECV " << identifier << ": " << frame); input->received(frame); } - // TODO: unreading needs to go away, and when we can cope - // with multiple sub-buffers in the general buffer scheme, it will - if (in.available() != 0) { - // Adjust buffer for used bytes and then "unread them" - buff->dataStart += buff->dataCount-in.available(); - buff->dataCount = in.available(); - aio.unread(buff); - } else { - // Give whole buffer back to aio subsystem - aio.queueReadBuffer(buff); - } - return true; -} - -void TCPConnector::writebuff(AsynchIO& aio_) { - writer.write(aio_); + return size - in.available(); } void TCPConnector::writeDataBlock(const AMQDataBlock& data) { @@ -388,7 +375,7 @@ void TCPConnector::eof(AsynchIO&) { // TODO: astitcher 20070908 This version of the code can never time out, so the idle processing // will never be called -void TCPConnector::run(){ +void TCPConnector::run() { // Keep the connection impl in memory until run() completes. boost::shared_ptr<ConnectionImpl> protect = impl.lock(); assert(protect); @@ -409,5 +396,11 @@ void TCPConnector::run(){ } } +void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) +{ + securityLayer = sl; + securityLayer->init(this); +} + }} // namespace qpid::client |
