diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2012-08-10 17:27:28 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2012-08-10 17:27:28 +0000 |
| commit | a981d3293f2db3a57fc996f8322bd04bb13a2da5 (patch) | |
| tree | 025c82b392c2222bba1252f304d0afcd97130c92 /qpid/cpp/src | |
| parent | 1ca286b78d1cfd1942ef607589fdd862c2208c38 (diff) | |
| download | qpid-python-a981d3293f2db3a57fc996f8322bd04bb13a2da5.tar.gz | |
Rearrange buffer memory ownership to avoid leaking buffer memory
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1371774 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/TCPConnector.cpp | 23 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/TCPConnector.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/AsynchIO.h | 16 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp | 44 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/AsynchIOHandler.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/SslPlugin.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp | 25 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp | 25 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp | 14 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h | 1 |
12 files changed, 87 insertions, 70 deletions
diff --git a/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp b/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp index fb59d058f8..420e04e832 100644 --- a/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp +++ b/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp @@ -281,7 +281,7 @@ void SslProtocolFactory::established(sys::Poller::shared_ptr poller, boost::bind(&AsynchIOHandler::idle, async, _1)); } - async->init(aio, brokerTimer, maxNegotiateTime, 4); + async->init(aio, brokerTimer, maxNegotiateTime); aio->start(poller); } diff --git a/qpid/cpp/src/qpid/client/TCPConnector.cpp b/qpid/cpp/src/qpid/client/TCPConnector.cpp index 1dd951d339..a5c6465bad 100644 --- a/qpid/cpp/src/qpid/client/TCPConnector.cpp +++ b/qpid/cpp/src/qpid/client/TCPConnector.cpp @@ -46,11 +46,6 @@ using namespace qpid::framing; using boost::format; using boost::str; -struct TCPConnector::Buff : public AsynchIO::BufferBase { - Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} - ~Buff() { delete [] bytes;} -}; - // Static constructor which registers connector here namespace { Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { @@ -118,9 +113,8 @@ void TCPConnector::connected(const Socket&) { void TCPConnector::start(sys::AsynchIO* aio_) { aio = aio_; - for (int i = 0; i < 4; i++) { - aio->queueReadBuffer(new Buff(maxFrameSize)); - } + + aio->createBuffers(maxFrameSize); identifier = str(format("[%1%]") % socket.getFullAddress()); } @@ -226,15 +220,19 @@ void TCPConnector::writebuff(AsynchIO& /*aio*/) return; Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; - if (codec->canEncode()) { - std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer()); - if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize)); + + if (!codec->canEncode()) { + return; + } + + AsynchIO::BufferBase* buffer = aio->getQueuedBuffer(); + if (buffer) { size_t encoded = codec->encode(buffer->bytes, buffer->byteCount); buffer->dataStart = 0; buffer->dataCount = encoded; - aio->queueWrite(buffer.release()); + aio->queueWrite(buffer); } } @@ -307,6 +305,7 @@ size_t TCPConnector::decode(const char* buffer, size_t size) void TCPConnector::writeDataBlock(const AMQDataBlock& data) { AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); + assert(buff); framing::Buffer out(buff->bytes, buff->byteCount); data.encode(out); buff->dataCount = data.encodedSize(); diff --git a/qpid/cpp/src/qpid/client/TCPConnector.h b/qpid/cpp/src/qpid/client/TCPConnector.h index c87d544816..c0bc26028d 100644 --- a/qpid/cpp/src/qpid/client/TCPConnector.h +++ b/qpid/cpp/src/qpid/client/TCPConnector.h @@ -50,7 +50,6 @@ namespace client { class TCPConnector : public Connector, public sys::Codec { typedef std::deque<framing::AMQFrame> Frames; - struct Buff; const uint16_t maxFrameSize; diff --git a/qpid/cpp/src/qpid/sys/AsynchIO.h b/qpid/cpp/src/qpid/sys/AsynchIO.h index 41f74f7ed0..b2eaaac9de 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIO.h +++ b/qpid/cpp/src/qpid/sys/AsynchIO.h @@ -76,8 +76,8 @@ protected: }; struct AsynchIOBufferBase { - char* const bytes; - const int32_t byteCount; + char* bytes; + int32_t byteCount; int32_t dataStart; int32_t dataCount; @@ -134,9 +134,21 @@ public: BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0); public: + /* + * Size of IO buffers - this is the maximum possible frame size + 1 + */ + const static uint32_t MaxBufferSize = 65536; + + /* + * Number of IO buffers allocated - I think the code can only use 2 - + * 1 for reading and 1 for writing, allocate 4 for safety + */ + const static uint32_t BufferCount = 4; + virtual void queueForDeletion() = 0; virtual void start(boost::shared_ptr<Poller> poller) = 0; + virtual void createBuffers(uint32_t size = MaxBufferSize) = 0; virtual void queueReadBuffer(BufferBase* buff) = 0; virtual void unread(BufferBase* buff) = 0; virtual void queueWrite(BufferBase* buff) = 0; diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp index 8a485db72d..2e117a3fb7 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -33,15 +33,6 @@ namespace qpid { namespace sys { -// Buffer definition -struct Buff : public AsynchIO::BufferBase { - Buff() : - AsynchIO::BufferBase(new char[65536], 65536) - {} - ~Buff() - { delete [] bytes;} -}; - struct ProtocolTimeoutTask : public sys::TimerTask { AsynchIOHandler& handler; std::string id; @@ -79,7 +70,7 @@ AsynchIOHandler::~AsynchIOHandler() { delete codec; } -void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime, int numBuffs) { +void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime) { aio = a; // Start timer for this connection @@ -87,17 +78,14 @@ void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint timer.add(timeoutTimerTask); // Give connection some buffers to use - for (int i = 0; i < numBuffs; i++) { - aio->queueReadBuffer(new Buff); - } + aio->createBuffers(); } void AsynchIOHandler::write(const framing::ProtocolInitiation& data) { QPID_LOG(debug, "SENT [" << identifier << "]: INIT(" << data << ")"); AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); - if (!buff) - buff = new Buff; + assert(buff); framing::Buffer out(buff->bytes, buff->byteCount); data.encode(out); buff->dataCount = data.encodedSize(); @@ -244,24 +232,24 @@ void AsynchIOHandler::idle(AsynchIO&){ return; } if (codec == 0) return; - try { - if (codec->canEncode()) { - // Try and get a queued buffer if not then construct new one - AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); - if (!buff) buff = new Buff; + if (!codec->canEncode()) { + return; + } + AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); + if (buff) { + try { size_t encoded=codec->encode(buff->bytes, buff->byteCount); buff->dataCount = encoded; aio->queueWrite(buff); + if (!codec->isClosed()) { + return; + } + } catch (const std::exception& e) { + QPID_LOG(error, e.what()); } - if (codec->isClosed()) { - readError = true; - aio->queueWriteClose(); - } - } catch (const std::exception& e) { - QPID_LOG(error, e.what()); - readError = true; - aio->queueWriteClose(); } + readError = true; + aio->queueWriteClose(); } }} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.h b/qpid/cpp/src/qpid/sys/AsynchIOHandler.h index 307aad5b85..fd0bc140e5 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.h +++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.h @@ -61,7 +61,7 @@ class AsynchIOHandler : public OutputControl { public: QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f ); QPID_COMMON_EXTERN ~AsynchIOHandler(); - QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime, int numBuffs); + QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime); QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; } diff --git a/qpid/cpp/src/qpid/sys/SslPlugin.cpp b/qpid/cpp/src/qpid/sys/SslPlugin.cpp index 3b50527c0a..c2a3d74cbd 100644 --- a/qpid/cpp/src/qpid/sys/SslPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/SslPlugin.cpp @@ -247,7 +247,7 @@ void SslMuxProtocolFactory::established(Poller::shared_ptr poller, const Socket& boost::bind(&AsynchIOHandler::nobuffs, async, _1), boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, brokerTimer, maxNegotiateTime, 4); + async->init(aio, brokerTimer, maxNegotiateTime); aio->start(poller); } diff --git a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp index 551440f954..ed7cc3748d 100644 --- a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -166,7 +166,7 @@ void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socke boost::bind(&AsynchIOHandler::nobuffs, async, _1), boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, brokerTimer, maxNegotiateTime, 4); + async->init(aio, brokerTimer, maxNegotiateTime); aio->start(poller); } diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp index 01ff8b6bfa..31355627cd 100644 --- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -40,6 +40,7 @@ #include <boost/bind.hpp> #include <boost/lexical_cast.hpp> +#include <boost/shared_array.hpp> namespace qpid { namespace sys { @@ -239,6 +240,7 @@ public: virtual void queueForDeletion(); virtual void start(Poller::shared_ptr poller); + virtual void createBuffers(uint32_t size); virtual void queueReadBuffer(BufferBase* buff); virtual void unread(BufferBase* buff); virtual void queueWrite(BufferBase* buff); @@ -270,6 +272,8 @@ private: const Socket& socket; std::deque<BufferBase*> bufferQueue; std::deque<BufferBase*> writeQueue; + std::vector<BufferBase> buffers; + boost::shared_array<char> bufferMemory; bool queuedClose; /** * This flag is used to detect and handle concurrency between @@ -309,15 +313,7 @@ AsynchIO::AsynchIO(const Socket& s, s.setNonblocking(); } -struct deleter -{ - template <typename T> - void operator()(T *ptr){ delete ptr;} -}; - AsynchIO::~AsynchIO() { - std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter()); - std::for_each( writeQueue.begin(), writeQueue.end(), deleter()); } void AsynchIO::queueForDeletion() { @@ -328,6 +324,19 @@ void AsynchIO::start(Poller::shared_ptr poller) { DispatchHandle::startWatch(poller); } +void AsynchIO::createBuffers(uint32_t size) { + // Allocate all the buffer memory at once + bufferMemory.reset(new char[size*BufferCount]); + + // Create the Buffer structs in a vector + // And push into the buffer queue + buffers.reserve(BufferCount); + for (uint32_t i = 0; i < BufferCount; i++) { + buffers.push_back(BufferBase(&bufferMemory[i*size], size)); + queueReadBuffer(&buffers[i]); + } +} + void AsynchIO::queueReadBuffer(BufferBase* buff) { assert(buff); buff->dataStart = 0; diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp index ae53414e52..355acbe0e6 100644 --- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -40,6 +40,7 @@ #include <windows.h> #include <boost/bind.hpp> +#include <boost/shared_array.hpp> namespace { @@ -252,6 +253,7 @@ public: /// Take any actions needed to prepare for working with the poller. virtual void start(Poller::shared_ptr poller); + virtual void createBuffers(uint32_t size); virtual void queueReadBuffer(BufferBase* buff); virtual void unread(BufferBase* buff); virtual void queueWrite(BufferBase* buff); @@ -286,6 +288,8 @@ private: * access to the buffer queue and write queue. */ Mutex bufferQueueLock; + std::vector<BufferBase> buffers; + boost::shared_array<char> bufferMemory; // Number of outstanding I/O operations. volatile LONG opsInProgress; @@ -385,15 +389,7 @@ AsynchIO::AsynchIO(const Socket& s, working(false) { } -struct deleter -{ - template <typename T> - void operator()(T *ptr){ delete ptr;} -}; - AsynchIO::~AsynchIO() { - std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter()); - std::for_each( writeQueue.begin(), writeQueue.end(), deleter()); } void AsynchIO::queueForDeletion() { @@ -426,6 +422,19 @@ void AsynchIO::start(Poller::shared_ptr poller0) { startReading(); } +void AsynchIO::createBuffers(uint32_t size) { + // Allocate all the buffer memory at once + bufferMemory.reset(new char[size*BufferCount]); + + // Create the Buffer structs in a vector + // And push into the buffer queue + buffers.reserve(BufferCount); + for (uint32_t i = 0; i < BufferCount; i++) { + buffers.push_back(BufferBase(&bufferMemory[i*size], size)); + queueReadBuffer(&buffers[i]); + } +} + void AsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) { assert(buff); buff->dataStart = 0; diff --git a/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp index 25cc94b290..d263f00ab3 100644 --- a/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp @@ -55,7 +55,7 @@ namespace { * the frame layer for writing into. */ struct SslIoBuff : public qpid::sys::AsynchIO::BufferBase { - std::auto_ptr<qpid::sys::AsynchIO::BufferBase> aioBuff; + qpid::sys::AsynchIO::BufferBase* aioBuff; SslIoBuff (qpid::sys::AsynchIO::BufferBase *base, const SecPkgContext_StreamSizes &sizes) @@ -66,7 +66,6 @@ namespace { {} ~SslIoBuff() {} - qpid::sys::AsynchIO::BufferBase* release() { return aioBuff.release(); } }; } @@ -101,10 +100,7 @@ SslAsynchIO::SslAsynchIO(const qpid::sys::Socket& s, } SslAsynchIO::~SslAsynchIO() { - if (leftoverPlaintext) { - delete leftoverPlaintext; - leftoverPlaintext = 0; - } + leftoverPlaintext = 0; } void SslAsynchIO::queueForDeletion() { @@ -121,6 +117,10 @@ void SslAsynchIO::start(qpid::sys::Poller::shared_ptr poller) { startNegotiate(); } +void SslAsynchIO::createBuffers(uint32_t size) { + aio->createBuffers(size); +} + void SslAsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) { aio->queueReadBuffer(buff); } @@ -148,7 +148,7 @@ void SslAsynchIO::queueWrite(AsynchIO::BufferBase* buff) { // encoding was working on, and adjusting counts for, the SslIoBuff. // Update the count of the original BufferBase before handing off to // the I/O layer. - buff = sslBuff->release(); + buff = sslBuff->aioBuff; SecBuffer buffs[4]; buffs[0].cbBuffer = schSizes.cbHeader; buffs[0].BufferType = SECBUFFER_STREAM_HEADER; diff --git a/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h index edec081ced..e9d9e8d629 100644 --- a/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h +++ b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h @@ -70,6 +70,7 @@ public: virtual void queueForDeletion(); virtual void start(qpid::sys::Poller::shared_ptr poller); + virtual void createBuffers(uint32_t size); virtual void queueReadBuffer(BufferBase* buff); virtual void unread(BufferBase* buff); virtual void queueWrite(BufferBase* buff); |
