summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r--cpp/src/qpid/sys/AggregateOutput.cpp7
-rw-r--r--cpp/src/qpid/sys/AggregateOutput.h2
-rw-r--r--cpp/src/qpid/sys/AsynchIO.h3
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.cpp31
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.h8
-rw-r--r--cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h1
-rw-r--r--cpp/src/qpid/sys/OutputControl.h4
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp12
-rw-r--r--cpp/src/qpid/sys/ssl/SslHandler.cpp4
-rw-r--r--cpp/src/qpid/sys/ssl/SslHandler.h1
10 files changed, 63 insertions, 10 deletions
diff --git a/cpp/src/qpid/sys/AggregateOutput.cpp b/cpp/src/qpid/sys/AggregateOutput.cpp
index 849e2bacfb..fa6901d3e4 100644
--- a/cpp/src/qpid/sys/AggregateOutput.cpp
+++ b/cpp/src/qpid/sys/AggregateOutput.cpp
@@ -26,10 +26,9 @@
namespace qpid {
namespace sys {
-void AggregateOutput::activateOutput()
-{
- control.activateOutput();
-}
+void AggregateOutput::activateOutput() { control.activateOutput(); }
+
+void AggregateOutput::giveReadCredit(int32_t credit) { control.giveReadCredit(credit); }
bool AggregateOutput::hasOutput() {
for (TaskList::const_iterator i = tasks.begin(); i != tasks.end(); ++i)
diff --git a/cpp/src/qpid/sys/AggregateOutput.h b/cpp/src/qpid/sys/AggregateOutput.h
index 9655e28c6a..1cda4456b4 100644
--- a/cpp/src/qpid/sys/AggregateOutput.h
+++ b/cpp/src/qpid/sys/AggregateOutput.h
@@ -43,6 +43,8 @@ namespace sys {
AggregateOutput(OutputControl& c) : next(0), control(c) {};
//this may be called on any thread
void activateOutput();
+ void giveReadCredit(int32_t);
+
//all the following will be called on the same thread
bool doOutput();
bool hasOutput();
diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h
index f5c4607992..68e441349a 100644
--- a/cpp/src/qpid/sys/AsynchIO.h
+++ b/cpp/src/qpid/sys/AsynchIO.h
@@ -108,7 +108,7 @@ class AsynchIO {
public:
typedef AsynchIOBufferBase BufferBase;
- typedef boost::function2<void, AsynchIO&, BufferBase*> ReadCallback;
+ typedef boost::function2<bool, AsynchIO&, BufferBase*> ReadCallback;
typedef boost::function1<void, AsynchIO&> EofCallback;
typedef boost::function1<void, AsynchIO&> DisconnectCallback;
typedef boost::function2<void, AsynchIO&, const Socket&> ClosedCallback;
@@ -137,6 +137,7 @@ public:
virtual void notifyPendingWrite() = 0;
virtual void queueWriteClose() = 0;
virtual bool writeQueueEmpty() = 0;
+ virtual void startReading() = 0;
virtual BufferBase* getQueuedBuffer() = 0;
protected:
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp
index fb4cb32734..83b6329889 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.cpp
+++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp
@@ -44,7 +44,8 @@ AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) :
factory(f),
codec(0),
readError(false),
- isClient(false)
+ isClient(false),
+ readCredit(InfiniteCredit)
{}
AsynchIOHandler::~AsynchIOHandler() {
@@ -79,9 +80,22 @@ void AsynchIOHandler::activateOutput() {
}
// Input side
-void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
- if (readError) {
+void AsynchIOHandler::giveReadCredit(int32_t credit) {
+ // Check whether we started in the don't about credit state
+ if (readCredit.boolCompareAndSwap(InfiniteCredit, credit))
+ return;
+ else if (readCredit.fetchAndAdd(credit) != 0)
return;
+ // Lock and retest credit to make sure we don't race with decreasing credit
+ ScopedLock<Mutex> l(creditLock);
+ assert(readCredit.get() >= 0);
+ if (readCredit.get() != 0)
+ aio->startReading();
+}
+
+bool AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
+ if (readError) {
+ return false;
}
size_t decoded = 0;
if (codec) { // Already initiated
@@ -125,6 +139,17 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
// Give whole buffer back to aio subsystem
aio->queueReadBuffer(buff);
}
+ // Check here for read credit
+ if (readCredit.get() != InfiniteCredit) {
+ if (--readCredit == 0) {
+ // Lock and retest credit to make sure we don't race with increasing credit
+ ScopedLock<Mutex> l(creditLock);
+ assert(readCredit.get() >= 0);
+ if (readCredit.get() == 0)
+ return false;
+ }
+ }
+ return true;
}
void AsynchIOHandler::eof(AsynchIO&) {
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h
index c281c27d14..fa020fbce4 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.h
+++ b/cpp/src/qpid/sys/AsynchIOHandler.h
@@ -23,6 +23,8 @@
#include "OutputControl.h"
#include "ConnectionCodec.h"
+#include "AtomicValue.h"
+#include "Mutex.h"
namespace qpid {
@@ -43,6 +45,9 @@ class AsynchIOHandler : public OutputControl {
ConnectionCodec* codec;
bool readError;
bool isClient;
+ AtomicValue<int32_t> readCredit;
+ static const int32_t InfiniteCredit = -1;
+ Mutex creditLock;
void write(const framing::ProtocolInitiation&);
@@ -56,9 +61,10 @@ class AsynchIOHandler : public OutputControl {
// Output side
void close();
void activateOutput();
+ void giveReadCredit(int32_t credit);
// Input side
- void readbuff(AsynchIO& aio, AsynchIOBufferBase* buff);
+ bool readbuff(AsynchIO& aio, AsynchIOBufferBase* buff);
void eof(AsynchIO& aio);
void disconnect(AsynchIO& aio);
diff --git a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
index 48a5a5ce85..df6de89982 100644
--- a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
+++ b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
@@ -44,6 +44,7 @@ class ConnectionOutputHandlerPtr : public ConnectionOutputHandler
void close() { next->close(); }
size_t getBuffered() const { return next->getBuffered(); }
void activateOutput() { next->activateOutput(); }
+ void giveReadCredit(int32_t credit) { next->giveReadCredit(credit); }
void send(framing::AMQFrame& f) { next->send(f); }
private:
diff --git a/cpp/src/qpid/sys/OutputControl.h b/cpp/src/qpid/sys/OutputControl.h
index d922a0d85c..e9e6c57a9b 100644
--- a/cpp/src/qpid/sys/OutputControl.h
+++ b/cpp/src/qpid/sys/OutputControl.h
@@ -18,6 +18,9 @@
* under the License.
*
*/
+
+#include "IntegerTypes.h"
+
#ifndef _OutputControl_
#define _OutputControl_
@@ -29,6 +32,7 @@ namespace sys {
public:
virtual ~OutputControl() {}
virtual void activateOutput() = 0;
+ virtual void giveReadCredit(int32_t credit) = 0;
};
}
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 50c971a181..9a5798311b 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -265,6 +265,7 @@ public:
virtual void notifyPendingWrite();
virtual void queueWriteClose();
virtual bool writeQueueEmpty();
+ virtual void startReading();
virtual BufferBase* getQueuedBuffer();
private:
@@ -381,6 +382,10 @@ bool AsynchIO::writeQueueEmpty() {
return writeQueue.empty();
}
+void AsynchIO::startReading() {
+ DispatchHandle::rewatchRead();
+}
+
/** Return a queued buffer if there are enough
* to spare
*/
@@ -418,7 +423,12 @@ void AsynchIO::readable(DispatchHandle& h) {
threadReadTotal += rc;
readTotal += rc;
- readCallback(*this, buff);
+ if (!readCallback(*this, buff)) {
+ // We were told to flow control reading at this point
+ h.unwatchRead();
+ break;
+ }
+
if (rc != readCount) {
// If we didn't fill the read buffer then time to stop reading
break;
diff --git a/cpp/src/qpid/sys/ssl/SslHandler.cpp b/cpp/src/qpid/sys/ssl/SslHandler.cpp
index 4177ca294c..3c7e2190e7 100644
--- a/cpp/src/qpid/sys/ssl/SslHandler.cpp
+++ b/cpp/src/qpid/sys/ssl/SslHandler.cpp
@@ -80,6 +80,10 @@ void SslHandler::activateOutput() {
aio->notifyPendingWrite();
}
+void SslHandler::giveReadCredit(int32_t) {
+ // FIXME aconway 2008-12-05: not yet implemented.
+}
+
// Input side
void SslHandler::readbuff(SslIO& , SslIO::BufferBase* buff) {
if (readError) {
diff --git a/cpp/src/qpid/sys/ssl/SslHandler.h b/cpp/src/qpid/sys/ssl/SslHandler.h
index cce5ecf09b..ae654d7ad2 100644
--- a/cpp/src/qpid/sys/ssl/SslHandler.h
+++ b/cpp/src/qpid/sys/ssl/SslHandler.h
@@ -58,6 +58,7 @@ class SslHandler : public OutputControl {
// Output side
void close();
void activateOutput();
+ void giveReadCredit(int32_t);
// Input side
void readbuff(SslIO& aio, SslIOBufferBase* buff);