summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2013-01-30 19:47:54 +0000
committerAndrew Stitcher <astitcher@apache.org>2013-01-30 19:47:54 +0000
commit0c0db5fc7713d4635c0ce20b531fff5dd66b16d5 (patch)
tree0bba029d8d6ac07dca5311b3fe91aaf28e6bfd13 /qpid/cpp
parentd3aa20dbe7a8a205d777c10b7a8bc51b7fcb834f (diff)
downloadqpid-python-0c0db5fc7713d4635c0ce20b531fff5dd66b16d5.tar.gz
QPID-4514: Remove IO readCredit throttling only used by removed cluster code
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1440616 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/Connection.cpp1
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/Connection.h1
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h1
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SslTransport.h2
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h1
-rw-r--r--qpid/cpp/src/qpid/sys/AggregateOutput.cpp2
-rw-r--r--qpid/cpp/src/qpid/sys/AggregateOutput.h1
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp38
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIOHandler.h4
-rw-r--r--qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h1
-rw-r--r--qpid/cpp/src/qpid/sys/OutputControl.h1
-rw-r--r--qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp5
15 files changed, 2 insertions, 63 deletions
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
index 15df439c9c..5312c40f2a 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
+++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
@@ -119,7 +119,6 @@ size_t Connection::encode(char* buffer, size_t size) {
void Connection::abort() { output.abort(); }
void Connection::activateOutput() { output.activateOutput(); }
-void Connection::giveReadCredit(int32_t credit) { output.giveReadCredit(credit); }
void Connection::close() {
// No more frames can be pushed onto the queue.
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/amqp_0_10/Connection.h
index 2ac9edf7a2..92ae5a3dd3 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/Connection.h
+++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.h
@@ -66,7 +66,6 @@ class Connection : public sys::ConnectionCodec,
bool canEncode();
void abort();
void activateOutput();
- void giveReadCredit(int32_t);
void closed(); // connection closed by peer.
void close(); // closing from this end.
void send(framing::AMQFrame&);
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index f6185d56a4..68d58a8be7 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -481,7 +481,6 @@ void Connection::OutboundFrameTracker::close() { next->close(); }
size_t Connection::OutboundFrameTracker::getBuffered() const { return next->getBuffered(); }
void Connection::OutboundFrameTracker::abort() { next->abort(); }
void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); }
-void Connection::OutboundFrameTracker::giveReadCredit(int32_t credit) { next->giveReadCredit(credit); }
void Connection::OutboundFrameTracker::send(framing::AMQFrame& f)
{
next->send(f);
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index 27b0019e0d..42c997c3af 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -199,7 +199,6 @@ class Connection : public sys::ConnectionInputHandler,
size_t getBuffered() const;
void abort();
void activateOutput();
- void giveReadCredit(int32_t credit);
void send(framing::AMQFrame&);
void wrap(sys::ConnectionOutputHandlerPtr&);
private:
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 0b5b705688..cd8ecfc476 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -143,11 +143,6 @@ void SessionState::activateOutput() {
getConnection().outputTasks.activateOutput();
}
-void SessionState::giveReadCredit(int32_t credit) {
- if (isAttached())
- getConnection().outputTasks.giveReadCredit(credit);
-}
-
ManagementObject::shared_ptr SessionState::GetManagementObject(void) const
{
return mgmtObject;
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index 39954bb3ee..a531ec9fc6 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -99,7 +99,6 @@ class SessionState : public qpid::SessionState,
/** OutputControl **/
void abort();
void activateOutput();
- void giveReadCredit(int32_t);
void senderCompleted(const framing::SequenceSet& ranges);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h b/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h
index 120bd983c1..f67ab95673 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h
@@ -49,7 +49,7 @@ class SslTransport : public Transport
void activateOutput();
void abort();
void close();
- void giveReadCredit(int32_t) {}
+
private:
qpid::sys::ssl::SslSocket socket;
TransportContext& context;
diff --git a/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h b/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h
index b9031dcee2..8c1087abb3 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h
@@ -49,7 +49,6 @@ class TcpTransport : public Transport
void activateOutput();
void abort();
void close();
- void giveReadCredit(int32_t) {}
private:
boost::scoped_ptr<qpid::sys::Socket> socket;
diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
index ff9c740926..ebc5689ce5 100644
--- a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
+++ b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
@@ -32,8 +32,6 @@ void AggregateOutput::abort() { control.abort(); }
void AggregateOutput::activateOutput() { control.activateOutput(); }
-void AggregateOutput::giveReadCredit(int32_t credit) { control.giveReadCredit(credit); }
-
namespace {
// Clear the busy flag and notify waiting threads in destructor.
struct ScopedBusy {
diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.h b/qpid/cpp/src/qpid/sys/AggregateOutput.h
index 802722ad26..e9dbd5a4cc 100644
--- a/qpid/cpp/src/qpid/sys/AggregateOutput.h
+++ b/qpid/cpp/src/qpid/sys/AggregateOutput.h
@@ -59,7 +59,6 @@ class QPID_COMMON_CLASS_EXTERN AggregateOutput : public OutputTask, public Outpu
// These may be called concurrently with any function.
QPID_COMMON_EXTERN void abort();
QPID_COMMON_EXTERN void activateOutput();
- QPID_COMMON_EXTERN void giveReadCredit(int32_t);
QPID_COMMON_EXTERN void addOutputTask(OutputTask* t);
// These functions must not be called concurrently with each other.
diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
index 0225b11d27..13c71e301b 100644
--- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
+++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
@@ -59,8 +59,7 @@ AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory
reads(0),
readError(false),
isClient(isClient0),
- nodict(nodict0),
- readCredit(InfiniteCredit)
+ nodict(nodict0)
{}
AsynchIOHandler::~AsynchIOHandler() {
@@ -104,21 +103,6 @@ void AsynchIOHandler::activateOutput() {
aio->notifyPendingWrite();
}
-// Input side
-void AsynchIOHandler::giveReadCredit(int32_t credit) {
- // Check whether we started in the don't about credit state
- if (readCredit.boolCompareAndSwap(InfiniteCredit, credit))
- return;
- // TODO In theory should be able to use an atomic operation before taking the lock
- // but in practice there seems to be an unexplained race in that case
- ScopedLock<Mutex> l(creditLock);
- if (readCredit.fetchAndAdd(credit) != 0)
- return;
- assert(readCredit.get() >= 0);
- if (readCredit.get() != 0)
- aio->startReading();
-}
-
namespace {
SecuritySettings getSecuritySettings(AsynchIO* aio, bool nodict)
{
@@ -133,26 +117,6 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
return;
}
- // Check here for read credit
- if (readCredit.get() != InfiniteCredit) {
- if (readCredit.get() == 0) {
- // FIXME aconway 2009-10-01: Workaround to avoid "false wakeups".
- // readbuff is sometimes called with no credit.
- // This should be fixed somewhere else to avoid such calls.
- aio->unread(buff);
- return;
- }
- // TODO In theory should be able to use an atomic operation before taking the lock
- // but in practice there seems to be an unexplained race in that case
- ScopedLock<Mutex> l(creditLock);
- if (--readCredit == 0) {
- assert(readCredit.get() >= 0);
- if (readCredit.get() == 0) {
- aio->stopReading();
- }
- }
- }
-
++reads;
size_t decoded = 0;
if (codec) { // Already initiated
diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.h b/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
index 91ddb022af..d93e24fd4c 100644
--- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
+++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
@@ -52,9 +52,6 @@ class AsynchIOHandler : public OutputControl {
bool readError;
bool isClient;
bool nodict;
- AtomicValue<int32_t> readCredit;
- static const int32_t InfiniteCredit = -1;
- Mutex creditLock;
boost::intrusive_ptr<sys::TimerTask> timeoutTimerTask;
void write(const framing::ProtocolInitiation&);
@@ -67,7 +64,6 @@ class AsynchIOHandler : public OutputControl {
// Output side
QPID_COMMON_EXTERN void abort();
QPID_COMMON_EXTERN void activateOutput();
- QPID_COMMON_EXTERN void giveReadCredit(int32_t credit);
// Input side
QPID_COMMON_EXTERN void readbuff(AsynchIO& aio, AsynchIOBufferBase* buff);
diff --git a/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h b/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
index 95a08d15ae..53d56ad716 100644
--- a/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
+++ b/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
@@ -45,7 +45,6 @@ class ConnectionOutputHandlerPtr : public ConnectionOutputHandler
size_t getBuffered() const { return next->getBuffered(); }
void abort() { next->abort(); }
void activateOutput() { next->activateOutput(); }
- void giveReadCredit(int32_t credit) { next->giveReadCredit(credit); }
void send(framing::AMQFrame& f) { next->send(f); }
private:
diff --git a/qpid/cpp/src/qpid/sys/OutputControl.h b/qpid/cpp/src/qpid/sys/OutputControl.h
index ad5caa3168..0d801e9d16 100644
--- a/qpid/cpp/src/qpid/sys/OutputControl.h
+++ b/qpid/cpp/src/qpid/sys/OutputControl.h
@@ -33,7 +33,6 @@ namespace sys {
virtual ~OutputControl() {}
virtual void abort() = 0;
virtual void activateOutput() = 0;
- virtual void giveReadCredit(int32_t credit) = 0;
};
}
diff --git a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
index e1f4362d64..51cc0ed109 100644
--- a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
+++ b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
@@ -68,7 +68,6 @@ class RdmaIOHandler : public OutputControl {
void close();
void abort();
void activateOutput();
- void giveReadCredit(int32_t credit);
void initProtocolOut();
// Input side
@@ -200,10 +199,6 @@ void RdmaIOHandler::full(Rdma::AsynchIO&) {
QPID_LOG(debug, "Rdma: buffer full [" << identifier << "]");
}
-// TODO: Dummy implementation of read throttling
-void RdmaIOHandler::giveReadCredit(int32_t) {
-}
-
// The logic here is subtly different from TCP as RDMA is message oriented
// so we define that an RDMA message is a frame - in this case there is no putting back
// of any message remainder - there shouldn't be any. And what we read here can't be