summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/amqp_0_10/Connection.cpp1
-rw-r--r--cpp/src/qpid/amqp_0_10/Connection.h1
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp5
-rw-r--r--cpp/src/qpid/broker/SessionState.h1
-rw-r--r--cpp/src/qpid/client/Connector.cpp5
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp15
-rw-r--r--cpp/src/qpid/cluster/Connection.h3
-rw-r--r--cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h1
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.cpp2
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.h1
-rw-r--r--cpp/src/qpid/sys/AggregateOutput.cpp7
-rw-r--r--cpp/src/qpid/sys/AggregateOutput.h2
-rw-r--r--cpp/src/qpid/sys/AsynchIO.h3
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.cpp31
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.h8
-rw-r--r--cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h1
-rw-r--r--cpp/src/qpid/sys/OutputControl.h4
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp12
-rw-r--r--cpp/src/qpid/sys/ssl/SslHandler.cpp4
-rw-r--r--cpp/src/qpid/sys/ssl/SslHandler.h1
20 files changed, 92 insertions, 16 deletions
diff --git a/cpp/src/qpid/amqp_0_10/Connection.cpp b/cpp/src/qpid/amqp_0_10/Connection.cpp
index 5241abed9b..8770433e20 100644
--- a/cpp/src/qpid/amqp_0_10/Connection.cpp
+++ b/cpp/src/qpid/amqp_0_10/Connection.cpp
@@ -92,6 +92,7 @@ size_t Connection::encode(const char* buffer, size_t size) {
}
void Connection::activateOutput() { output.activateOutput(); }
+void Connection::giveReadCredit(int32_t credit) { output.giveReadCredit(credit); }
void Connection::close() {
// Close the output queue.
diff --git a/cpp/src/qpid/amqp_0_10/Connection.h b/cpp/src/qpid/amqp_0_10/Connection.h
index 39d27e8662..04bcf4a48b 100644
--- a/cpp/src/qpid/amqp_0_10/Connection.h
+++ b/cpp/src/qpid/amqp_0_10/Connection.h
@@ -62,6 +62,7 @@ class Connection : public sys::ConnectionCodec,
bool isClosed() const;
bool canEncode();
void activateOutput();
+ void giveReadCredit(int32_t);
void closed(); // connection closed by peer.
void close(); // closing from this end.
void send(framing::AMQFrame&);
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index fe10345499..4f088fdf4c 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -121,6 +121,11 @@ void SessionState::activateOutput() {
getConnection().outputTasks.activateOutput();
}
+void SessionState::giveReadCredit(int32_t credit) {
+ if (isAttached())
+ getConnection().outputTasks.giveReadCredit(credit);
+}
+
ManagementObject* SessionState::GetManagementObject (void) const
{
return (ManagementObject*) mgmtObject;
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index f3b85bbd05..035a444127 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -86,6 +86,7 @@ class SessionState : public qpid::SessionState,
/** OutputControl **/
void activateOutput();
+ void giveReadCredit(int32_t);
void senderCompleted(const framing::SequenceSet& ranges);
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index 724d464932..52e8405c0a 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -134,7 +134,7 @@ class TCPConnector : public Connector, private sys::Runnable
void handleClosed();
bool closeInternal();
- void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
+ bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
void writebuff(qpid::sys::AsynchIO&);
void writeDataBlock(const framing::AMQDataBlock& data);
void eof(qpid::sys::AsynchIO&);
@@ -340,7 +340,7 @@ void TCPConnector::Writer::write(sys::AsynchIO&) {
if (encode.getPosition() > 0) writeOne();
}
-void TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
+bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
if (!initiated) {
@@ -367,6 +367,7 @@ void TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
// Give whole buffer back to aio subsystem
aio.queueReadBuffer(buff);
}
+ return true;
}
void TCPConnector::writebuff(AsynchIO& aio_) {
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 1276a994ac..27b391a1c7 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -62,17 +62,21 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, ConnectionId myId)
: cluster(c), self(myId), catchUp(false), output(*this, out),
connection(&output, cluster.getBroker(), wrappedId)
-{
- QPID_LOG(debug, cluster << " new connection: " << *this);
-}
+{ init(); }
// Local connections
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, MemberId myId, bool isCatchUp)
: cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
connection(&output, cluster.getBroker(), wrappedId)
-{
+{ init(); }
+
+void Connection::init() {
QPID_LOG(debug, cluster << " new connection: " << *this);
+ if (isLocal() && !isCatchUp()) {
+ // FIXME aconway 2008-12-05: configurable credit limit
+ output.giveReadCredit(3);
+ }
}
Connection::~Connection() {
@@ -187,6 +191,7 @@ size_t Connection::decode(const char* buffer, size_t size) {
Buffer buf(const_cast<char*>(buffer), size);
while (localDecoder.decode(buf))
received(localDecoder.frame);
+ output.giveReadCredit(1);
}
else { // Multicast local connections.
assert(isLocal());
@@ -200,6 +205,8 @@ void Connection::deliverBuffer(Buffer& buf) {
++deliverSeq;
while (mcastDecoder.decode(buf))
delivered(mcastDecoder.frame);
+ if (isLocal())
+ output.giveReadCredit(1);
}
broker::SessionState& Connection::sessionState() {
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 36476baa34..ce80f42414 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -142,6 +142,7 @@ class Connection :
void exchange(const std::string& encoded);
private:
+ void init();
bool checkUnsupported(const framing::AMQBody& body);
void deliverClose();
void deliverDoOutput(uint32_t requested);
@@ -167,6 +168,8 @@ class Connection :
framing::ChannelId currentChannel;
boost::shared_ptr<broker::TxBuffer> txBuffer;
+ int FIXMEcredit; // FIXME aconway 2008-12-05: remove
+
friend std::ostream& operator<<(std::ostream&, const Connection&);
};
diff --git a/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h b/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
index 3c24dd71f2..74a376a657 100644
--- a/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
+++ b/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
@@ -39,6 +39,7 @@ class NoOpConnectionOutputHandler : public sys::ConnectionOutputHandler
virtual void send(framing::AMQFrame&) {}
virtual void close() {}
virtual void activateOutput() {}
+ virtual void giveReadCredit(int32_t) {}
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp
index 0563899a1c..77ed154bd0 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -55,6 +55,8 @@ void OutputInterceptor::activateOutput() {
}
}
+void OutputInterceptor::giveReadCredit(int32_t credit) { next->giveReadCredit(credit); }
+
// Called in write thread when the IO layer has no more data to write.
// We do nothing in the write thread, we run doOutput only on delivery
// of doOutput requests.
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.h b/cpp/src/qpid/cluster/OutputInterceptor.h
index f4226d3a40..783a443228 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.h
+++ b/cpp/src/qpid/cluster/OutputInterceptor.h
@@ -43,6 +43,7 @@ class OutputInterceptor : public sys::ConnectionOutputHandler {
// sys::ConnectionOutputHandler functions
void send(framing::AMQFrame& f);
void activateOutput();
+ void giveReadCredit(int32_t);
void close();
size_t getBuffered() const;
diff --git a/cpp/src/qpid/sys/AggregateOutput.cpp b/cpp/src/qpid/sys/AggregateOutput.cpp
index 849e2bacfb..fa6901d3e4 100644
--- a/cpp/src/qpid/sys/AggregateOutput.cpp
+++ b/cpp/src/qpid/sys/AggregateOutput.cpp
@@ -26,10 +26,9 @@
namespace qpid {
namespace sys {
-void AggregateOutput::activateOutput()
-{
- control.activateOutput();
-}
+void AggregateOutput::activateOutput() { control.activateOutput(); }
+
+void AggregateOutput::giveReadCredit(int32_t credit) { control.giveReadCredit(credit); }
bool AggregateOutput::hasOutput() {
for (TaskList::const_iterator i = tasks.begin(); i != tasks.end(); ++i)
diff --git a/cpp/src/qpid/sys/AggregateOutput.h b/cpp/src/qpid/sys/AggregateOutput.h
index 9655e28c6a..1cda4456b4 100644
--- a/cpp/src/qpid/sys/AggregateOutput.h
+++ b/cpp/src/qpid/sys/AggregateOutput.h
@@ -43,6 +43,8 @@ namespace sys {
AggregateOutput(OutputControl& c) : next(0), control(c) {};
//this may be called on any thread
void activateOutput();
+ void giveReadCredit(int32_t);
+
//all the following will be called on the same thread
bool doOutput();
bool hasOutput();
diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h
index f5c4607992..68e441349a 100644
--- a/cpp/src/qpid/sys/AsynchIO.h
+++ b/cpp/src/qpid/sys/AsynchIO.h
@@ -108,7 +108,7 @@ class AsynchIO {
public:
typedef AsynchIOBufferBase BufferBase;
- typedef boost::function2<void, AsynchIO&, BufferBase*> ReadCallback;
+ typedef boost::function2<bool, AsynchIO&, BufferBase*> ReadCallback;
typedef boost::function1<void, AsynchIO&> EofCallback;
typedef boost::function1<void, AsynchIO&> DisconnectCallback;
typedef boost::function2<void, AsynchIO&, const Socket&> ClosedCallback;
@@ -137,6 +137,7 @@ public:
virtual void notifyPendingWrite() = 0;
virtual void queueWriteClose() = 0;
virtual bool writeQueueEmpty() = 0;
+ virtual void startReading() = 0;
virtual BufferBase* getQueuedBuffer() = 0;
protected:
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp
index fb4cb32734..83b6329889 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.cpp
+++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp
@@ -44,7 +44,8 @@ AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) :
factory(f),
codec(0),
readError(false),
- isClient(false)
+ isClient(false),
+ readCredit(InfiniteCredit)
{}
AsynchIOHandler::~AsynchIOHandler() {
@@ -79,9 +80,22 @@ void AsynchIOHandler::activateOutput() {
}
// Input side
-void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
- if (readError) {
+void AsynchIOHandler::giveReadCredit(int32_t credit) {
+ // Check whether we started in the don't about credit state
+ if (readCredit.boolCompareAndSwap(InfiniteCredit, credit))
+ return;
+ else if (readCredit.fetchAndAdd(credit) != 0)
return;
+ // Lock and retest credit to make sure we don't race with decreasing credit
+ ScopedLock<Mutex> l(creditLock);
+ assert(readCredit.get() >= 0);
+ if (readCredit.get() != 0)
+ aio->startReading();
+}
+
+bool AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
+ if (readError) {
+ return false;
}
size_t decoded = 0;
if (codec) { // Already initiated
@@ -125,6 +139,17 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
// Give whole buffer back to aio subsystem
aio->queueReadBuffer(buff);
}
+ // Check here for read credit
+ if (readCredit.get() != InfiniteCredit) {
+ if (--readCredit == 0) {
+ // Lock and retest credit to make sure we don't race with increasing credit
+ ScopedLock<Mutex> l(creditLock);
+ assert(readCredit.get() >= 0);
+ if (readCredit.get() == 0)
+ return false;
+ }
+ }
+ return true;
}
void AsynchIOHandler::eof(AsynchIO&) {
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h
index c281c27d14..fa020fbce4 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.h
+++ b/cpp/src/qpid/sys/AsynchIOHandler.h
@@ -23,6 +23,8 @@
#include "OutputControl.h"
#include "ConnectionCodec.h"
+#include "AtomicValue.h"
+#include "Mutex.h"
namespace qpid {
@@ -43,6 +45,9 @@ class AsynchIOHandler : public OutputControl {
ConnectionCodec* codec;
bool readError;
bool isClient;
+ AtomicValue<int32_t> readCredit;
+ static const int32_t InfiniteCredit = -1;
+ Mutex creditLock;
void write(const framing::ProtocolInitiation&);
@@ -56,9 +61,10 @@ class AsynchIOHandler : public OutputControl {
// Output side
void close();
void activateOutput();
+ void giveReadCredit(int32_t credit);
// Input side
- void readbuff(AsynchIO& aio, AsynchIOBufferBase* buff);
+ bool readbuff(AsynchIO& aio, AsynchIOBufferBase* buff);
void eof(AsynchIO& aio);
void disconnect(AsynchIO& aio);
diff --git a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
index 48a5a5ce85..df6de89982 100644
--- a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
+++ b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
@@ -44,6 +44,7 @@ class ConnectionOutputHandlerPtr : public ConnectionOutputHandler
void close() { next->close(); }
size_t getBuffered() const { return next->getBuffered(); }
void activateOutput() { next->activateOutput(); }
+ void giveReadCredit(int32_t credit) { next->giveReadCredit(credit); }
void send(framing::AMQFrame& f) { next->send(f); }
private:
diff --git a/cpp/src/qpid/sys/OutputControl.h b/cpp/src/qpid/sys/OutputControl.h
index d922a0d85c..e9e6c57a9b 100644
--- a/cpp/src/qpid/sys/OutputControl.h
+++ b/cpp/src/qpid/sys/OutputControl.h
@@ -18,6 +18,9 @@
* under the License.
*
*/
+
+#include "IntegerTypes.h"
+
#ifndef _OutputControl_
#define _OutputControl_
@@ -29,6 +32,7 @@ namespace sys {
public:
virtual ~OutputControl() {}
virtual void activateOutput() = 0;
+ virtual void giveReadCredit(int32_t credit) = 0;
};
}
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 50c971a181..9a5798311b 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -265,6 +265,7 @@ public:
virtual void notifyPendingWrite();
virtual void queueWriteClose();
virtual bool writeQueueEmpty();
+ virtual void startReading();
virtual BufferBase* getQueuedBuffer();
private:
@@ -381,6 +382,10 @@ bool AsynchIO::writeQueueEmpty() {
return writeQueue.empty();
}
+void AsynchIO::startReading() {
+ DispatchHandle::rewatchRead();
+}
+
/** Return a queued buffer if there are enough
* to spare
*/
@@ -418,7 +423,12 @@ void AsynchIO::readable(DispatchHandle& h) {
threadReadTotal += rc;
readTotal += rc;
- readCallback(*this, buff);
+ if (!readCallback(*this, buff)) {
+ // We were told to flow control reading at this point
+ h.unwatchRead();
+ break;
+ }
+
if (rc != readCount) {
// If we didn't fill the read buffer then time to stop reading
break;
diff --git a/cpp/src/qpid/sys/ssl/SslHandler.cpp b/cpp/src/qpid/sys/ssl/SslHandler.cpp
index 4177ca294c..3c7e2190e7 100644
--- a/cpp/src/qpid/sys/ssl/SslHandler.cpp
+++ b/cpp/src/qpid/sys/ssl/SslHandler.cpp
@@ -80,6 +80,10 @@ void SslHandler::activateOutput() {
aio->notifyPendingWrite();
}
+void SslHandler::giveReadCredit(int32_t) {
+ // FIXME aconway 2008-12-05: not yet implemented.
+}
+
// Input side
void SslHandler::readbuff(SslIO& , SslIO::BufferBase* buff) {
if (readError) {
diff --git a/cpp/src/qpid/sys/ssl/SslHandler.h b/cpp/src/qpid/sys/ssl/SslHandler.h
index cce5ecf09b..ae654d7ad2 100644
--- a/cpp/src/qpid/sys/ssl/SslHandler.h
+++ b/cpp/src/qpid/sys/ssl/SslHandler.h
@@ -58,6 +58,7 @@ class SslHandler : public OutputControl {
// Output side
void close();
void activateOutput();
+ void giveReadCredit(int32_t);
// Input side
void readbuff(SslIO& aio, SslIOBufferBase* buff);