diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2012-08-27 15:40:33 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2012-08-27 15:40:33 +0000 |
| commit | 868ce7469262d6fd2fe3f2e7f04cfe7af654d59f (patch) | |
| tree | 63e6b5e62554609beb21e8c8d0610569f36d2743 /cpp/src/qpid/sys/ssl | |
| parent | 2e5ff8f1b328831043e6d7e323249d62187234c6 (diff) | |
| download | qpid-python-868ce7469262d6fd2fe3f2e7f04cfe7af654d59f.tar.gz | |
QPID-3858: Updated code to include recent refactoring by Gordon (gsim) - see QPID-4178.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1377715 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/ssl')
| -rw-r--r-- | cpp/src/qpid/sys/ssl/SslHandler.cpp | 27 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/ssl/SslHandler.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/ssl/SslIo.cpp | 21 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/ssl/SslIo.h | 26 |
4 files changed, 42 insertions, 34 deletions
diff --git a/cpp/src/qpid/sys/ssl/SslHandler.cpp b/cpp/src/qpid/sys/ssl/SslHandler.cpp index 8613059f28..eeb8c26a76 100644 --- a/cpp/src/qpid/sys/ssl/SslHandler.cpp +++ b/cpp/src/qpid/sys/ssl/SslHandler.cpp @@ -33,15 +33,6 @@ namespace sys { namespace ssl { -// Buffer definition -struct Buff : public SslIO::BufferBase { - Buff() : - SslIO::BufferBase(new char[65536], 65536) - {} - ~Buff() - { delete [] bytes;} -}; - struct ProtocolTimeoutTask : public sys::TimerTask { SslHandler& handler; std::string id; @@ -78,7 +69,7 @@ SslHandler::~SslHandler() { delete codec; } -void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs) { +void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime) { aio = a; // Start timer for this connection @@ -86,17 +77,14 @@ void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs) { timer.add(timeoutTimerTask); // Give connection some buffers to use - for (int i = 0; i < numBuffs; i++) { - aio->queueReadBuffer(new Buff); - } + aio->createBuffers(); } void SslHandler::write(const framing::ProtocolInitiation& data) { QPID_LOG(debug, "SENT [" << identifier << "]: INIT(" << data << ")"); SslIO::BufferBase* buff = aio->getQueuedBuffer(); - if (!buff) - buff = new Buff; + assert(buff); framing::Buffer out(buff->bytes, buff->byteCount); data.encode(out); buff->dataCount = data.encodedSize(); @@ -205,10 +193,11 @@ void SslHandler::idle(SslIO&){ return; } if (codec == 0) return; - if (codec->canEncode()) { - // Try and get a queued buffer if not then construct new one - SslIO::BufferBase* buff = aio->getQueuedBuffer(); - if (!buff) buff = new Buff; + if (!codec->canEncode()) { + return; + } + SslIO::BufferBase* buff = aio->getQueuedBuffer(); + if (buff) { size_t encoded=codec->encode(buff->bytes, buff->byteCount); buff->dataCount = encoded; aio->queueWrite(buff); diff --git a/cpp/src/qpid/sys/ssl/SslHandler.h b/cpp/src/qpid/sys/ssl/SslHandler.h index 74df2b7fb0..14814b0281 100644 --- a/cpp/src/qpid/sys/ssl/SslHandler.h +++ b/cpp/src/qpid/sys/ssl/SslHandler.h @@ -60,7 +60,7 @@ class SslHandler : public OutputControl { public: SslHandler(std::string id, ConnectionCodec::Factory* f, bool nodict); ~SslHandler(); - void init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs); + void init(SslIO* a, Timer& timer, uint32_t maxTime); void setClient() { isClient = true; } diff --git a/cpp/src/qpid/sys/ssl/SslIo.cpp b/cpp/src/qpid/sys/ssl/SslIo.cpp index 789c205ead..bbfb703170 100644 --- a/cpp/src/qpid/sys/ssl/SslIo.cpp +++ b/cpp/src/qpid/sys/ssl/SslIo.cpp @@ -197,15 +197,7 @@ SslIO::SslIO(const SslSocket& s, s.setNonblocking(); } -struct deleter -{ - template <typename T> - void operator()(T *ptr){ delete ptr;} -}; - SslIO::~SslIO() { - std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter()); - std::for_each( writeQueue.begin(), writeQueue.end(), deleter()); } void SslIO::queueForDeletion() { @@ -216,6 +208,19 @@ void SslIO::start(Poller::shared_ptr poller) { DispatchHandle::startWatch(poller); } +void SslIO::createBuffers(uint32_t size) { + // Allocate all the buffer memory at once + bufferMemory.reset(new char[size*BufferCount]); + + // Create the Buffer structs in a vector + // And push into the buffer queue + buffers.reserve(BufferCount); + for (uint32_t i = 0; i < BufferCount; i++) { + buffers.push_back(BufferBase(&bufferMemory[i*size], size)); + queueReadBuffer(&buffers[i]); + } +} + void SslIO::queueReadBuffer(BufferBase* buff) { assert(buff); buff->dataStart = 0; diff --git a/cpp/src/qpid/sys/ssl/SslIo.h b/cpp/src/qpid/sys/ssl/SslIo.h index b795594cd9..f3112bfa65 100644 --- a/cpp/src/qpid/sys/ssl/SslIo.h +++ b/cpp/src/qpid/sys/ssl/SslIo.h @@ -25,6 +25,7 @@ #include "qpid/sys/SecuritySettings.h" #include <boost/function.hpp> +#include <boost/shared_array.hpp> #include <deque> namespace qpid { @@ -87,8 +88,8 @@ private: }; struct SslIOBufferBase { - char* const bytes; - const int32_t byteCount; + char* bytes; + int32_t byteCount; int32_t dataStart; int32_t dataCount; @@ -127,7 +128,9 @@ public: typedef boost::function1<void, SslIO&> IdleCallback; typedef boost::function1<void, SslIO&> RequestCallback; - + SslIO(const SslSocket& s, + ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, + ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0); private: ReadCallback readCallback; EofCallback eofCallback; @@ -138,6 +141,8 @@ private: const SslSocket& socket; std::deque<BufferBase*> bufferQueue; std::deque<BufferBase*> writeQueue; + std::vector<BufferBase> buffers; + boost::shared_array<char> bufferMemory; bool queuedClose; /** * This flag is used to detect and handle concurrency between @@ -148,12 +153,21 @@ private: volatile bool writePending; public: - SslIO(const SslSocket& s, - ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, - ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0); + /* + * Size of IO buffers - this is the maximum possible frame size + 1 + */ + const static uint32_t MaxBufferSize = 65536; + + /* + * Number of IO buffers allocated - I think the code can only use 2 - + * 1 for reading and 1 for writing, allocate 4 for safety + */ + const static uint32_t BufferCount = 4; + void queueForDeletion(); void start(qpid::sys::Poller::shared_ptr poller); + void createBuffers(uint32_t size = MaxBufferSize); void queueReadBuffer(BufferBase* buff); void unread(BufferBase* buff); void queueWrite(BufferBase* buff); |
