diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2013-01-30 19:47:54 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2013-01-30 19:47:54 +0000 |
| commit | 0c0db5fc7713d4635c0ce20b531fff5dd66b16d5 (patch) | |
| tree | 0bba029d8d6ac07dca5311b3fe91aaf28e6bfd13 /qpid/cpp | |
| parent | d3aa20dbe7a8a205d777c10b7a8bc51b7fcb834f (diff) | |
| download | qpid-python-0c0db5fc7713d4635c0ce20b531fff5dd66b16d5.tar.gz | |
QPID-4514: Remove IO readCredit throttling only used by removed cluster code
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1440616 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/amqp_0_10/Connection.cpp | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/amqp_0_10/Connection.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.cpp | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/SslTransport.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/AggregateOutput.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/AggregateOutput.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp | 38 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/AsynchIOHandler.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/OutputControl.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp | 5 |
15 files changed, 2 insertions, 63 deletions
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp index 15df439c9c..5312c40f2a 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp @@ -119,7 +119,6 @@ size_t Connection::encode(char* buffer, size_t size) { void Connection::abort() { output.abort(); } void Connection::activateOutput() { output.activateOutput(); } -void Connection::giveReadCredit(int32_t credit) { output.giveReadCredit(credit); } void Connection::close() { // No more frames can be pushed onto the queue. diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/amqp_0_10/Connection.h index 2ac9edf7a2..92ae5a3dd3 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/Connection.h +++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.h @@ -66,7 +66,6 @@ class Connection : public sys::ConnectionCodec, bool canEncode(); void abort(); void activateOutput(); - void giveReadCredit(int32_t); void closed(); // connection closed by peer. void close(); // closing from this end. void send(framing::AMQFrame&); diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index f6185d56a4..68d58a8be7 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -481,7 +481,6 @@ void Connection::OutboundFrameTracker::close() { next->close(); } size_t Connection::OutboundFrameTracker::getBuffered() const { return next->getBuffered(); } void Connection::OutboundFrameTracker::abort() { next->abort(); } void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); } -void Connection::OutboundFrameTracker::giveReadCredit(int32_t credit) { next->giveReadCredit(credit); } void Connection::OutboundFrameTracker::send(framing::AMQFrame& f) { next->send(f); diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index 27b0019e0d..42c997c3af 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -199,7 +199,6 @@ class Connection : public sys::ConnectionInputHandler, size_t getBuffered() const; void abort(); void activateOutput(); - void giveReadCredit(int32_t credit); void send(framing::AMQFrame&); void wrap(sys::ConnectionOutputHandlerPtr&); private: diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 0b5b705688..cd8ecfc476 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -143,11 +143,6 @@ void SessionState::activateOutput() { getConnection().outputTasks.activateOutput(); } -void SessionState::giveReadCredit(int32_t credit) { - if (isAttached()) - getConnection().outputTasks.giveReadCredit(credit); -} - ManagementObject::shared_ptr SessionState::GetManagementObject(void) const { return mgmtObject; diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 39954bb3ee..a531ec9fc6 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -99,7 +99,6 @@ class SessionState : public qpid::SessionState, /** OutputControl **/ void abort(); void activateOutput(); - void giveReadCredit(int32_t); void senderCompleted(const framing::SequenceSet& ranges); diff --git a/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h b/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h index 120bd983c1..f67ab95673 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h @@ -49,7 +49,7 @@ class SslTransport : public Transport void activateOutput(); void abort(); void close(); - void giveReadCredit(int32_t) {} + private: qpid::sys::ssl::SslSocket socket; TransportContext& context; diff --git a/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h b/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h index b9031dcee2..8c1087abb3 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h +++ b/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h @@ -49,7 +49,6 @@ class TcpTransport : public Transport void activateOutput(); void abort(); void close(); - void giveReadCredit(int32_t) {} private: boost::scoped_ptr<qpid::sys::Socket> socket; diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp index ff9c740926..ebc5689ce5 100644 --- a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp +++ b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp @@ -32,8 +32,6 @@ void AggregateOutput::abort() { control.abort(); } void AggregateOutput::activateOutput() { control.activateOutput(); } -void AggregateOutput::giveReadCredit(int32_t credit) { control.giveReadCredit(credit); } - namespace { // Clear the busy flag and notify waiting threads in destructor. struct ScopedBusy { diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.h b/qpid/cpp/src/qpid/sys/AggregateOutput.h index 802722ad26..e9dbd5a4cc 100644 --- a/qpid/cpp/src/qpid/sys/AggregateOutput.h +++ b/qpid/cpp/src/qpid/sys/AggregateOutput.h @@ -59,7 +59,6 @@ class QPID_COMMON_CLASS_EXTERN AggregateOutput : public OutputTask, public Outpu // These may be called concurrently with any function. QPID_COMMON_EXTERN void abort(); QPID_COMMON_EXTERN void activateOutput(); - QPID_COMMON_EXTERN void giveReadCredit(int32_t); QPID_COMMON_EXTERN void addOutputTask(OutputTask* t); // These functions must not be called concurrently with each other. diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp index 0225b11d27..13c71e301b 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -59,8 +59,7 @@ AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory reads(0), readError(false), isClient(isClient0), - nodict(nodict0), - readCredit(InfiniteCredit) + nodict(nodict0) {} AsynchIOHandler::~AsynchIOHandler() { @@ -104,21 +103,6 @@ void AsynchIOHandler::activateOutput() { aio->notifyPendingWrite(); } -// Input side -void AsynchIOHandler::giveReadCredit(int32_t credit) { - // Check whether we started in the don't about credit state - if (readCredit.boolCompareAndSwap(InfiniteCredit, credit)) - return; - // TODO In theory should be able to use an atomic operation before taking the lock - // but in practice there seems to be an unexplained race in that case - ScopedLock<Mutex> l(creditLock); - if (readCredit.fetchAndAdd(credit) != 0) - return; - assert(readCredit.get() >= 0); - if (readCredit.get() != 0) - aio->startReading(); -} - namespace { SecuritySettings getSecuritySettings(AsynchIO* aio, bool nodict) { @@ -133,26 +117,6 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { return; } - // Check here for read credit - if (readCredit.get() != InfiniteCredit) { - if (readCredit.get() == 0) { - // FIXME aconway 2009-10-01: Workaround to avoid "false wakeups". - // readbuff is sometimes called with no credit. - // This should be fixed somewhere else to avoid such calls. - aio->unread(buff); - return; - } - // TODO In theory should be able to use an atomic operation before taking the lock - // but in practice there seems to be an unexplained race in that case - ScopedLock<Mutex> l(creditLock); - if (--readCredit == 0) { - assert(readCredit.get() >= 0); - if (readCredit.get() == 0) { - aio->stopReading(); - } - } - } - ++reads; size_t decoded = 0; if (codec) { // Already initiated diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.h b/qpid/cpp/src/qpid/sys/AsynchIOHandler.h index 91ddb022af..d93e24fd4c 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.h +++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.h @@ -52,9 +52,6 @@ class AsynchIOHandler : public OutputControl { bool readError; bool isClient; bool nodict; - AtomicValue<int32_t> readCredit; - static const int32_t InfiniteCredit = -1; - Mutex creditLock; boost::intrusive_ptr<sys::TimerTask> timeoutTimerTask; void write(const framing::ProtocolInitiation&); @@ -67,7 +64,6 @@ class AsynchIOHandler : public OutputControl { // Output side QPID_COMMON_EXTERN void abort(); QPID_COMMON_EXTERN void activateOutput(); - QPID_COMMON_EXTERN void giveReadCredit(int32_t credit); // Input side QPID_COMMON_EXTERN void readbuff(AsynchIO& aio, AsynchIOBufferBase* buff); diff --git a/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h b/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h index 95a08d15ae..53d56ad716 100644 --- a/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h +++ b/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h @@ -45,7 +45,6 @@ class ConnectionOutputHandlerPtr : public ConnectionOutputHandler size_t getBuffered() const { return next->getBuffered(); } void abort() { next->abort(); } void activateOutput() { next->activateOutput(); } - void giveReadCredit(int32_t credit) { next->giveReadCredit(credit); } void send(framing::AMQFrame& f) { next->send(f); } private: diff --git a/qpid/cpp/src/qpid/sys/OutputControl.h b/qpid/cpp/src/qpid/sys/OutputControl.h index ad5caa3168..0d801e9d16 100644 --- a/qpid/cpp/src/qpid/sys/OutputControl.h +++ b/qpid/cpp/src/qpid/sys/OutputControl.h @@ -33,7 +33,6 @@ namespace sys { virtual ~OutputControl() {} virtual void abort() = 0; virtual void activateOutput() = 0; - virtual void giveReadCredit(int32_t credit) = 0; }; } diff --git a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp index e1f4362d64..51cc0ed109 100644 --- a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp @@ -68,7 +68,6 @@ class RdmaIOHandler : public OutputControl { void close(); void abort(); void activateOutput(); - void giveReadCredit(int32_t credit); void initProtocolOut(); // Input side @@ -200,10 +199,6 @@ void RdmaIOHandler::full(Rdma::AsynchIO&) { QPID_LOG(debug, "Rdma: buffer full [" << identifier << "]"); } -// TODO: Dummy implementation of read throttling -void RdmaIOHandler::giveReadCredit(int32_t) { -} - // The logic here is subtly different from TCP as RDMA is message oriented // so we define that an RDMA message is a frame - in this case there is no putting back // of any message remainder - there shouldn't be any. And what we read here can't be |
