diff options
| author | Gordon Sim <gsim@apache.org> | 2009-01-06 19:50:59 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2009-01-06 19:50:59 +0000 |
| commit | ccd271e851f2bc2b52a7c8daaa54a06551d63dc0 (patch) | |
| tree | b0ebe5326ee6a15d0d3339c60805920911a77df5 /cpp/src/qpid/client/RdmaConnector.cpp | |
| parent | 9b18a2b17aaa643001c54d48445ed0d8bb7f2a4c (diff) | |
| download | qpid-python-ccd271e851f2bc2b52a7c8daaa54a06551d63dc0.tar.gz | |
* Cyrus SASL intgeration for c++ client
* SASL security layer support for c++ client and broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@732082 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/RdmaConnector.cpp')
| -rw-r--r-- | cpp/src/qpid/client/RdmaConnector.cpp | 182 |
1 files changed, 76 insertions, 106 deletions
diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp index 98fe762f31..3cc8961eea 100644 --- a/cpp/src/qpid/client/RdmaConnector.cpp +++ b/cpp/src/qpid/client/RdmaConnector.cpp @@ -29,6 +29,7 @@ #include "qpid/sys/rdma/RdmaIO.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" +#include "qpid/sys/SecurityLayer.h" #include "qpid/Msg.h" #include <iostream> @@ -47,39 +48,21 @@ using namespace qpid::framing; using boost::format; using boost::str; -class RdmaConnector : public Connector, private sys::Runnable + class RdmaConnector : public Connector, public sys::Codec, private sys::Runnable { struct Buff; - /** Batch up frames for writing to aio. */ - class Writer : public framing::FrameHandler { - typedef Rdma::Buffer BufferBase; - typedef std::deque<framing::AMQFrame> Frames; - - const uint16_t maxFrameSize; - sys::Mutex lock; - Rdma::AsynchIO* aio; - BufferBase* buffer; - Frames frames; - size_t lastEof; // Position after last EOF in frames - framing::Buffer encode; - size_t framesEncoded; - std::string identifier; - Bounds* bounds; - - void writeOne(); - void newBuffer(); + typedef Rdma::Buffer BufferBase; + typedef std::deque<framing::AMQFrame> Frames; - public: - - Writer(uint16_t maxFrameSize, Bounds*); - ~Writer(); - void init(std::string id, Rdma::AsynchIO*); - void handle(framing::AMQFrame&); - void write(Rdma::AsynchIO&); - }; - const uint16_t maxFrameSize; + sys::Mutex lock; + Frames frames; + size_t lastEof; // Position after last EOF in frames + uint64_t currentSize; + Bounds* bounds; + + framing::ProtocolVersion version; bool initiated; @@ -92,12 +75,11 @@ class RdmaConnector : public Connector, private sys::Runnable framing::InitiationHandler* initialiser; framing::OutputHandler* output; - Writer writer; - sys::Thread receiver; Rdma::AsynchIO* aio; sys::Poller::shared_ptr poller; + std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; ~RdmaConnector(); @@ -129,6 +111,11 @@ class RdmaConnector : public Connector, private sys::Runnable sys::ShutdownHandler* getShutdownHandler() const; framing::OutputHandler* getOutputHandler(); const std::string& getIdentifier() const; + void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>); + + size_t decode(const char* buffer, size_t size); + size_t encode(const char* buffer, size_t size); + bool canEncode(); public: RdmaConnector(framing::ProtocolVersion pVersion, @@ -155,12 +142,14 @@ RdmaConnector::RdmaConnector(ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), + lastEof(0), + currentSize(0), + bounds(cimpl), version(ver), initiated(false), polling(false), joined(true), shutdownHandler(0), - writer(maxFrameSize, cimpl), aio(0), impl(cimpl) { @@ -216,7 +205,6 @@ void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intru aio->start(poller); identifier = str(format("[%1% %2%]") % ci->getLocalName() % ci->getPeerName()); - writer.init(identifier, aio); ProtocolInitiation init(version); writeDataBlock(init); } @@ -279,7 +267,21 @@ const std::string& RdmaConnector::getIdentifier() const { } void RdmaConnector::send(AMQFrame& frame) { - writer.handle(frame); + bool notifyWrite = false; + { + Mutex::ScopedLock l(lock); + frames.push_back(frame); + //only ask to write if this is the end of a frameset or if we + //already have a buffers worth of data + currentSize += frame.encodedSize(); + if (frame.getEof()) { + lastEof = frames.size(); + notifyWrite = true; + } else { + notifyWrite = (currentSize >= maxFrameSize); + } + } + if (notifyWrite) aio->notifyPendingWrite(); } void RdmaConnector::handleClosed() { @@ -287,88 +289,54 @@ void RdmaConnector::handleClosed() { shutdownHandler->shutdown(); } -RdmaConnector::Writer::Writer(uint16_t s, Bounds* b) : - maxFrameSize(s), - aio(0), - buffer(0), - lastEof(0), - bounds(b) -{ -} - -RdmaConnector::Writer::~Writer() { - if (aio) - aio->returnBuffer(buffer); -} - -void RdmaConnector::Writer::init(std::string id, Rdma::AsynchIO* a) { - Mutex::ScopedLock l(lock); - identifier = id; - aio = a; - assert(aio->bufferAvailable()); - newBuffer(); -} -void RdmaConnector::Writer::handle(framing::AMQFrame& frame) { - Mutex::ScopedLock l(lock); - frames.push_back(frame); - // Don't bother to send anything unless we're at the end of a frameset (assembly in 0-10 terminology) - if (frame.getEof()) { - lastEof = frames.size(); - QPID_LOG(debug, "Requesting write: lastEof=" << lastEof); - aio->notifyPendingWrite(); +// Called in IO thread. (write idle routine) +// This is NOT only called in response to previously calling notifyPendingWrite +void RdmaConnector::writebuff(Rdma::AsynchIO&) { + Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; + if (codec->canEncode()) { + std::auto_ptr<BufferBase> buffer = std::auto_ptr<BufferBase>(aio->getBuffer()); + size_t encoded = codec->encode(buffer->bytes, buffer->byteCount); + + buffer->dataStart = 0; + buffer->dataCount = encoded; + aio->queueWrite(buffer.release()); } - QPID_LOG(trace, "SENT " << identifier << ": " << frame); } -void RdmaConnector::Writer::writeOne() { - assert(buffer); - QPID_LOG(trace, "Write buffer " << encode.getPosition() - << " bytes " << framesEncoded << " frames "); - framesEncoded = 0; - - buffer->dataStart = 0; - buffer->dataCount = encode.getPosition(); - aio->queueWrite(buffer); - newBuffer(); -} - -void RdmaConnector::Writer::newBuffer() { - buffer = aio->getBuffer(); - encode = framing::Buffer(buffer->bytes, buffer->byteCount); - framesEncoded = 0; +bool RdmaConnector::canEncode() +{ + Mutex::ScopedLock l(lock); + //have at least one full frameset or a whole buffers worth of data + return aio->writable() && aio->bufferAvailable() && (lastEof || currentSize >= maxFrameSize); } -// Called in IO thread. (write idle routine) -// This is NOT only called in response to previously calling notifyPendingWrite -void RdmaConnector::Writer::write(Rdma::AsynchIO&) { - Mutex::ScopedLock l(lock); - assert(buffer); - // If nothing to do return immediately - if (lastEof==0) - return; - size_t bytesWritten = 0; - while (aio->writable() && aio->bufferAvailable() && !frames.empty()) { - const AMQFrame* frame = &frames.front(); - uint32_t size = frame->encodedSize(); - while (size <= encode.available()) { - frame->encode(encode); +size_t RdmaConnector::encode(const char* buffer, size_t size) +{ + framing::Buffer out(const_cast<char*>(buffer), size); + size_t bytesWritten(0); + { + Mutex::ScopedLock l(lock); + while (!frames.empty() && out.available() >= frames.front().encodedSize() ) { + frames.front().encode(out); + QPID_LOG(trace, "SENT " << identifier << ": " << frames.front()); frames.pop_front(); - ++framesEncoded; - bytesWritten += size; - if (frames.empty()) - break; - frame = &frames.front(); - size = frame->encodedSize(); + if (lastEof) --lastEof; } - lastEof -= framesEncoded; - writeOne(); + bytesWritten = size - out.available(); + currentSize -= bytesWritten; } if (bounds) bounds->reduce(bytesWritten); + return bytesWritten; } void RdmaConnector::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { - framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); + Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; + codec->decode(buff->bytes+buff->dataStart, buff->dataCount); +} +size_t RdmaConnector::decode(const char* buffer, size_t size) +{ + framing::Buffer in(const_cast<char*>(buffer), size); if (!initiated) { framing::ProtocolInitiation protocolInit; if (protocolInit.decode(in)) { @@ -382,10 +350,7 @@ void RdmaConnector::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { QPID_LOG(trace, "RECV " << identifier << ": " << frame); input->received(frame); } -} - -void RdmaConnector::writebuff(Rdma::AsynchIO& aio_) { - writer.write(aio_); + return size - in.available(); } void RdmaConnector::writeDataBlock(const AMQDataBlock& data) { @@ -424,5 +389,10 @@ void RdmaConnector::run(){ } } +void RdmaConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) +{ + securityLayer = sl; + securityLayer->init(this); +} }} // namespace qpid::client |
