summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/RdmaConnector.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-01-06 19:50:59 +0000
committerGordon Sim <gsim@apache.org>2009-01-06 19:50:59 +0000
commitccd271e851f2bc2b52a7c8daaa54a06551d63dc0 (patch)
treeb0ebe5326ee6a15d0d3339c60805920911a77df5 /cpp/src/qpid/client/RdmaConnector.cpp
parent9b18a2b17aaa643001c54d48445ed0d8bb7f2a4c (diff)
downloadqpid-python-ccd271e851f2bc2b52a7c8daaa54a06551d63dc0.tar.gz
* Cyrus SASL intgeration for c++ client
* SASL security layer support for c++ client and broker git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@732082 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/RdmaConnector.cpp')
-rw-r--r--cpp/src/qpid/client/RdmaConnector.cpp182
1 files changed, 76 insertions, 106 deletions
diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp
index 98fe762f31..3cc8961eea 100644
--- a/cpp/src/qpid/client/RdmaConnector.cpp
+++ b/cpp/src/qpid/client/RdmaConnector.cpp
@@ -29,6 +29,7 @@
#include "qpid/sys/rdma/RdmaIO.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Poller.h"
+#include "qpid/sys/SecurityLayer.h"
#include "qpid/Msg.h"
#include <iostream>
@@ -47,39 +48,21 @@ using namespace qpid::framing;
using boost::format;
using boost::str;
-class RdmaConnector : public Connector, private sys::Runnable
+ class RdmaConnector : public Connector, public sys::Codec, private sys::Runnable
{
struct Buff;
- /** Batch up frames for writing to aio. */
- class Writer : public framing::FrameHandler {
- typedef Rdma::Buffer BufferBase;
- typedef std::deque<framing::AMQFrame> Frames;
-
- const uint16_t maxFrameSize;
- sys::Mutex lock;
- Rdma::AsynchIO* aio;
- BufferBase* buffer;
- Frames frames;
- size_t lastEof; // Position after last EOF in frames
- framing::Buffer encode;
- size_t framesEncoded;
- std::string identifier;
- Bounds* bounds;
-
- void writeOne();
- void newBuffer();
+ typedef Rdma::Buffer BufferBase;
+ typedef std::deque<framing::AMQFrame> Frames;
- public:
-
- Writer(uint16_t maxFrameSize, Bounds*);
- ~Writer();
- void init(std::string id, Rdma::AsynchIO*);
- void handle(framing::AMQFrame&);
- void write(Rdma::AsynchIO&);
- };
-
const uint16_t maxFrameSize;
+ sys::Mutex lock;
+ Frames frames;
+ size_t lastEof; // Position after last EOF in frames
+ uint64_t currentSize;
+ Bounds* bounds;
+
+
framing::ProtocolVersion version;
bool initiated;
@@ -92,12 +75,11 @@ class RdmaConnector : public Connector, private sys::Runnable
framing::InitiationHandler* initialiser;
framing::OutputHandler* output;
- Writer writer;
-
sys::Thread receiver;
Rdma::AsynchIO* aio;
sys::Poller::shared_ptr poller;
+ std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
~RdmaConnector();
@@ -129,6 +111,11 @@ class RdmaConnector : public Connector, private sys::Runnable
sys::ShutdownHandler* getShutdownHandler() const;
framing::OutputHandler* getOutputHandler();
const std::string& getIdentifier() const;
+ void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>);
+
+ size_t decode(const char* buffer, size_t size);
+ size_t encode(const char* buffer, size_t size);
+ bool canEncode();
public:
RdmaConnector(framing::ProtocolVersion pVersion,
@@ -155,12 +142,14 @@ RdmaConnector::RdmaConnector(ProtocolVersion ver,
const ConnectionSettings& settings,
ConnectionImpl* cimpl)
: maxFrameSize(settings.maxFrameSize),
+ lastEof(0),
+ currentSize(0),
+ bounds(cimpl),
version(ver),
initiated(false),
polling(false),
joined(true),
shutdownHandler(0),
- writer(maxFrameSize, cimpl),
aio(0),
impl(cimpl)
{
@@ -216,7 +205,6 @@ void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intru
aio->start(poller);
identifier = str(format("[%1% %2%]") % ci->getLocalName() % ci->getPeerName());
- writer.init(identifier, aio);
ProtocolInitiation init(version);
writeDataBlock(init);
}
@@ -279,7 +267,21 @@ const std::string& RdmaConnector::getIdentifier() const {
}
void RdmaConnector::send(AMQFrame& frame) {
- writer.handle(frame);
+ bool notifyWrite = false;
+ {
+ Mutex::ScopedLock l(lock);
+ frames.push_back(frame);
+ //only ask to write if this is the end of a frameset or if we
+ //already have a buffers worth of data
+ currentSize += frame.encodedSize();
+ if (frame.getEof()) {
+ lastEof = frames.size();
+ notifyWrite = true;
+ } else {
+ notifyWrite = (currentSize >= maxFrameSize);
+ }
+ }
+ if (notifyWrite) aio->notifyPendingWrite();
}
void RdmaConnector::handleClosed() {
@@ -287,88 +289,54 @@ void RdmaConnector::handleClosed() {
shutdownHandler->shutdown();
}
-RdmaConnector::Writer::Writer(uint16_t s, Bounds* b) :
- maxFrameSize(s),
- aio(0),
- buffer(0),
- lastEof(0),
- bounds(b)
-{
-}
-
-RdmaConnector::Writer::~Writer() {
- if (aio)
- aio->returnBuffer(buffer);
-}
-
-void RdmaConnector::Writer::init(std::string id, Rdma::AsynchIO* a) {
- Mutex::ScopedLock l(lock);
- identifier = id;
- aio = a;
- assert(aio->bufferAvailable());
- newBuffer();
-}
-void RdmaConnector::Writer::handle(framing::AMQFrame& frame) {
- Mutex::ScopedLock l(lock);
- frames.push_back(frame);
- // Don't bother to send anything unless we're at the end of a frameset (assembly in 0-10 terminology)
- if (frame.getEof()) {
- lastEof = frames.size();
- QPID_LOG(debug, "Requesting write: lastEof=" << lastEof);
- aio->notifyPendingWrite();
+// Called in IO thread. (write idle routine)
+// This is NOT only called in response to previously calling notifyPendingWrite
+void RdmaConnector::writebuff(Rdma::AsynchIO&) {
+ Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
+ if (codec->canEncode()) {
+ std::auto_ptr<BufferBase> buffer = std::auto_ptr<BufferBase>(aio->getBuffer());
+ size_t encoded = codec->encode(buffer->bytes, buffer->byteCount);
+
+ buffer->dataStart = 0;
+ buffer->dataCount = encoded;
+ aio->queueWrite(buffer.release());
}
- QPID_LOG(trace, "SENT " << identifier << ": " << frame);
}
-void RdmaConnector::Writer::writeOne() {
- assert(buffer);
- QPID_LOG(trace, "Write buffer " << encode.getPosition()
- << " bytes " << framesEncoded << " frames ");
- framesEncoded = 0;
-
- buffer->dataStart = 0;
- buffer->dataCount = encode.getPosition();
- aio->queueWrite(buffer);
- newBuffer();
-}
-
-void RdmaConnector::Writer::newBuffer() {
- buffer = aio->getBuffer();
- encode = framing::Buffer(buffer->bytes, buffer->byteCount);
- framesEncoded = 0;
+bool RdmaConnector::canEncode()
+{
+ Mutex::ScopedLock l(lock);
+ //have at least one full frameset or a whole buffers worth of data
+ return aio->writable() && aio->bufferAvailable() && (lastEof || currentSize >= maxFrameSize);
}
-// Called in IO thread. (write idle routine)
-// This is NOT only called in response to previously calling notifyPendingWrite
-void RdmaConnector::Writer::write(Rdma::AsynchIO&) {
- Mutex::ScopedLock l(lock);
- assert(buffer);
- // If nothing to do return immediately
- if (lastEof==0)
- return;
- size_t bytesWritten = 0;
- while (aio->writable() && aio->bufferAvailable() && !frames.empty()) {
- const AMQFrame* frame = &frames.front();
- uint32_t size = frame->encodedSize();
- while (size <= encode.available()) {
- frame->encode(encode);
+size_t RdmaConnector::encode(const char* buffer, size_t size)
+{
+ framing::Buffer out(const_cast<char*>(buffer), size);
+ size_t bytesWritten(0);
+ {
+ Mutex::ScopedLock l(lock);
+ while (!frames.empty() && out.available() >= frames.front().encodedSize() ) {
+ frames.front().encode(out);
+ QPID_LOG(trace, "SENT " << identifier << ": " << frames.front());
frames.pop_front();
- ++framesEncoded;
- bytesWritten += size;
- if (frames.empty())
- break;
- frame = &frames.front();
- size = frame->encodedSize();
+ if (lastEof) --lastEof;
}
- lastEof -= framesEncoded;
- writeOne();
+ bytesWritten = size - out.available();
+ currentSize -= bytesWritten;
}
if (bounds) bounds->reduce(bytesWritten);
+ return bytesWritten;
}
void RdmaConnector::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) {
- framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+ Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
+ codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
+}
+size_t RdmaConnector::decode(const char* buffer, size_t size)
+{
+ framing::Buffer in(const_cast<char*>(buffer), size);
if (!initiated) {
framing::ProtocolInitiation protocolInit;
if (protocolInit.decode(in)) {
@@ -382,10 +350,7 @@ void RdmaConnector::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) {
QPID_LOG(trace, "RECV " << identifier << ": " << frame);
input->received(frame);
}
-}
-
-void RdmaConnector::writebuff(Rdma::AsynchIO& aio_) {
- writer.write(aio_);
+ return size - in.available();
}
void RdmaConnector::writeDataBlock(const AMQDataBlock& data) {
@@ -424,5 +389,10 @@ void RdmaConnector::run(){
}
}
+void RdmaConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
+{
+ securityLayer = sl;
+ securityLayer->init(this);
+}
}} // namespace qpid::client