diff options
Diffstat (limited to 'cpp/src/qpid/sys')
| -rw-r--r-- | cpp/src/qpid/sys/AggregateOutput.cpp | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/AggregateOutput.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIO.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIOHandler.cpp | 31 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIOHandler.h | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/OutputControl.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 12 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/ssl/SslHandler.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/ssl/SslHandler.h | 1 |
10 files changed, 63 insertions, 10 deletions
diff --git a/cpp/src/qpid/sys/AggregateOutput.cpp b/cpp/src/qpid/sys/AggregateOutput.cpp index 849e2bacfb..fa6901d3e4 100644 --- a/cpp/src/qpid/sys/AggregateOutput.cpp +++ b/cpp/src/qpid/sys/AggregateOutput.cpp @@ -26,10 +26,9 @@ namespace qpid { namespace sys { -void AggregateOutput::activateOutput() -{ - control.activateOutput(); -} +void AggregateOutput::activateOutput() { control.activateOutput(); } + +void AggregateOutput::giveReadCredit(int32_t credit) { control.giveReadCredit(credit); } bool AggregateOutput::hasOutput() { for (TaskList::const_iterator i = tasks.begin(); i != tasks.end(); ++i) diff --git a/cpp/src/qpid/sys/AggregateOutput.h b/cpp/src/qpid/sys/AggregateOutput.h index 9655e28c6a..1cda4456b4 100644 --- a/cpp/src/qpid/sys/AggregateOutput.h +++ b/cpp/src/qpid/sys/AggregateOutput.h @@ -43,6 +43,8 @@ namespace sys { AggregateOutput(OutputControl& c) : next(0), control(c) {}; //this may be called on any thread void activateOutput(); + void giveReadCredit(int32_t); + //all the following will be called on the same thread bool doOutput(); bool hasOutput(); diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h index f5c4607992..68e441349a 100644 --- a/cpp/src/qpid/sys/AsynchIO.h +++ b/cpp/src/qpid/sys/AsynchIO.h @@ -108,7 +108,7 @@ class AsynchIO { public: typedef AsynchIOBufferBase BufferBase; - typedef boost::function2<void, AsynchIO&, BufferBase*> ReadCallback; + typedef boost::function2<bool, AsynchIO&, BufferBase*> ReadCallback; typedef boost::function1<void, AsynchIO&> EofCallback; typedef boost::function1<void, AsynchIO&> DisconnectCallback; typedef boost::function2<void, AsynchIO&, const Socket&> ClosedCallback; @@ -137,6 +137,7 @@ public: virtual void notifyPendingWrite() = 0; virtual void queueWriteClose() = 0; virtual bool writeQueueEmpty() = 0; + virtual void startReading() = 0; virtual BufferBase* getQueuedBuffer() = 0; protected: diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp index fb4cb32734..83b6329889 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -44,7 +44,8 @@ AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) : factory(f), codec(0), readError(false), - isClient(false) + isClient(false), + readCredit(InfiniteCredit) {} AsynchIOHandler::~AsynchIOHandler() { @@ -79,9 +80,22 @@ void AsynchIOHandler::activateOutput() { } // Input side -void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { - if (readError) { +void AsynchIOHandler::giveReadCredit(int32_t credit) { + // Check whether we started in the don't about credit state + if (readCredit.boolCompareAndSwap(InfiniteCredit, credit)) + return; + else if (readCredit.fetchAndAdd(credit) != 0) return; + // Lock and retest credit to make sure we don't race with decreasing credit + ScopedLock<Mutex> l(creditLock); + assert(readCredit.get() >= 0); + if (readCredit.get() != 0) + aio->startReading(); +} + +bool AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { + if (readError) { + return false; } size_t decoded = 0; if (codec) { // Already initiated @@ -125,6 +139,17 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { // Give whole buffer back to aio subsystem aio->queueReadBuffer(buff); } + // Check here for read credit + if (readCredit.get() != InfiniteCredit) { + if (--readCredit == 0) { + // Lock and retest credit to make sure we don't race with increasing credit + ScopedLock<Mutex> l(creditLock); + assert(readCredit.get() >= 0); + if (readCredit.get() == 0) + return false; + } + } + return true; } void AsynchIOHandler::eof(AsynchIO&) { diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h index c281c27d14..fa020fbce4 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.h +++ b/cpp/src/qpid/sys/AsynchIOHandler.h @@ -23,6 +23,8 @@ #include "OutputControl.h" #include "ConnectionCodec.h" +#include "AtomicValue.h" +#include "Mutex.h" namespace qpid { @@ -43,6 +45,9 @@ class AsynchIOHandler : public OutputControl { ConnectionCodec* codec; bool readError; bool isClient; + AtomicValue<int32_t> readCredit; + static const int32_t InfiniteCredit = -1; + Mutex creditLock; void write(const framing::ProtocolInitiation&); @@ -56,9 +61,10 @@ class AsynchIOHandler : public OutputControl { // Output side void close(); void activateOutput(); + void giveReadCredit(int32_t credit); // Input side - void readbuff(AsynchIO& aio, AsynchIOBufferBase* buff); + bool readbuff(AsynchIO& aio, AsynchIOBufferBase* buff); void eof(AsynchIO& aio); void disconnect(AsynchIO& aio); diff --git a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h index 48a5a5ce85..df6de89982 100644 --- a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h +++ b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h @@ -44,6 +44,7 @@ class ConnectionOutputHandlerPtr : public ConnectionOutputHandler void close() { next->close(); } size_t getBuffered() const { return next->getBuffered(); } void activateOutput() { next->activateOutput(); } + void giveReadCredit(int32_t credit) { next->giveReadCredit(credit); } void send(framing::AMQFrame& f) { next->send(f); } private: diff --git a/cpp/src/qpid/sys/OutputControl.h b/cpp/src/qpid/sys/OutputControl.h index d922a0d85c..e9e6c57a9b 100644 --- a/cpp/src/qpid/sys/OutputControl.h +++ b/cpp/src/qpid/sys/OutputControl.h @@ -18,6 +18,9 @@ * under the License. * */ + +#include "IntegerTypes.h" + #ifndef _OutputControl_ #define _OutputControl_ @@ -29,6 +32,7 @@ namespace sys { public: virtual ~OutputControl() {} virtual void activateOutput() = 0; + virtual void giveReadCredit(int32_t credit) = 0; }; } diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index 50c971a181..9a5798311b 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -265,6 +265,7 @@ public: virtual void notifyPendingWrite(); virtual void queueWriteClose(); virtual bool writeQueueEmpty(); + virtual void startReading(); virtual BufferBase* getQueuedBuffer(); private: @@ -381,6 +382,10 @@ bool AsynchIO::writeQueueEmpty() { return writeQueue.empty(); } +void AsynchIO::startReading() { + DispatchHandle::rewatchRead(); +} + /** Return a queued buffer if there are enough * to spare */ @@ -418,7 +423,12 @@ void AsynchIO::readable(DispatchHandle& h) { threadReadTotal += rc; readTotal += rc; - readCallback(*this, buff); + if (!readCallback(*this, buff)) { + // We were told to flow control reading at this point + h.unwatchRead(); + break; + } + if (rc != readCount) { // If we didn't fill the read buffer then time to stop reading break; diff --git a/cpp/src/qpid/sys/ssl/SslHandler.cpp b/cpp/src/qpid/sys/ssl/SslHandler.cpp index 4177ca294c..3c7e2190e7 100644 --- a/cpp/src/qpid/sys/ssl/SslHandler.cpp +++ b/cpp/src/qpid/sys/ssl/SslHandler.cpp @@ -80,6 +80,10 @@ void SslHandler::activateOutput() { aio->notifyPendingWrite(); } +void SslHandler::giveReadCredit(int32_t) { + // FIXME aconway 2008-12-05: not yet implemented. +} + // Input side void SslHandler::readbuff(SslIO& , SslIO::BufferBase* buff) { if (readError) { diff --git a/cpp/src/qpid/sys/ssl/SslHandler.h b/cpp/src/qpid/sys/ssl/SslHandler.h index cce5ecf09b..ae654d7ad2 100644 --- a/cpp/src/qpid/sys/ssl/SslHandler.h +++ b/cpp/src/qpid/sys/ssl/SslHandler.h @@ -58,6 +58,7 @@ class SslHandler : public OutputControl { // Output side void close(); void activateOutput(); + void giveReadCredit(int32_t); // Input side void readbuff(SslIO& aio, SslIOBufferBase* buff); |
