diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2009-01-27 21:17:47 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2009-01-27 21:17:47 +0000 |
| commit | 2dff9493ceb62d37a3b70a4abd6bc0539bdb581e (patch) | |
| tree | 0003c4766da8f32e8fc2b9f5b4968391b7319492 /cpp/src/qpid/client/SessionImpl.cpp | |
| parent | 3f547381f1af5cdb9d7c5f9cc30f7303d643afd9 (diff) | |
| download | qpid-python-2dff9493ceb62d37a3b70a4abd6bc0539bdb581e.tar.gz | |
Producer side rate throttling:
This uses the Message.Flow command to send credit from
broker to client to ensure that the client doesn't
exceed a rate configured on the broker per session.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@738247 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/SessionImpl.cpp')
| -rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 101 |
1 files changed, 88 insertions, 13 deletions
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 4fadf236f8..7cf68956ea 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -62,7 +62,8 @@ SessionImpl::SessionImpl(const std::string& name, shared_ptr<ConnectionImpl> con ioHandler(*this), proxy(ioHandler), nextIn(0), - nextOut(0) + nextOut(0), + sendMsgCredit(0) { channel.next = connectionShared.get(); } @@ -76,6 +77,7 @@ SessionImpl::~SessionImpl() { handleClosed(); state.waitWaiters(); } + delete sendMsgCredit; } boost::shared_ptr<ConnectionImpl> c = connectionWeak.lock(); if (c) c->erase(channel); @@ -359,7 +361,7 @@ void SessionImpl::sendContent(const MethodContent& content) uint64_t data_length = content.getData().length(); if(data_length > 0){ header.setLastSegment(false); - handleOut(header); + handleContentOut(header); /*Note: end of frame marker included in overhead but not in size*/ const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead(); @@ -388,7 +390,7 @@ void SessionImpl::sendContent(const MethodContent& content) } } } else { - handleOut(header); + handleContentOut(header); } } @@ -414,16 +416,18 @@ bool isContentFrame(AMQFrame& frame) void SessionImpl::handleIn(AMQFrame& frame) // network thread { try { - if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { - if (invoke(static_cast<ExecutionHandler&>(*this), *frame.getBody())) { - //make sure the command id sequence and completion - //tracking takes account of execution commands - Lock l(state); - completedIn.add(nextIn++); - } else { - //if not handled by this class, its for the application: - deliver(frame); - } + if (invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { + ; + } else if (invoke(static_cast<ExecutionHandler&>(*this), *frame.getBody())) { + //make sure the command id sequence and completion + //tracking takes account of execution commands + Lock l(state); + completedIn.add(nextIn++); + } else if (invoke(static_cast<MessageHandler&>(*this), *frame.getBody())) { + ; + } else { + //if not handled by this class, its for the application: + deliver(frame); } } catch (const SessionException& e) { @@ -439,6 +443,14 @@ void SessionImpl::handleOut(AMQFrame& frame) // user thread sendFrame(frame, true); } +void SessionImpl::handleContentOut(AMQFrame& frame) // user thread +{ + if (sendMsgCredit) { + sendMsgCredit->acquire(); + } + sendFrame(frame, true); +} + void SessionImpl::proxyOut(AMQFrame& frame) // network thread { //Note: this case is treated slightly differently that command @@ -631,6 +643,69 @@ void SessionImpl::exception(uint16_t errorCode, setTimeout(0); } +// Message methods: +void SessionImpl::accept(const qpid::framing::SequenceSet&) +{ +} + +void SessionImpl::reject(const qpid::framing::SequenceSet&, uint16_t, const std::string&) +{ +} + +void SessionImpl::release(const qpid::framing::SequenceSet&, bool) +{ +} + +MessageResumeResult SessionImpl::resume(const std::string&, const std::string&) +{ + throw NotImplementedException("resuming transfers not yet supported"); +} + +namespace { + const std::string QPID_SESSION_DEST = ""; + const uint8_t FLOW_MODE_CREDIT = 0; + const uint8_t CREDIT_MODE_MSG = 0; +} + +void SessionImpl::setFlowMode(const std::string& dest, uint8_t flowMode) +{ + if ( dest != QPID_SESSION_DEST ) { + QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest); + return; + } + + if ( flowMode != FLOW_MODE_CREDIT ) { + throw NotImplementedException("window flow control mode not supported by producer"); + } + Lock l(state); + sendMsgCredit = new sys::Semaphore(0); +} + +void SessionImpl::flow(const std::string& dest, uint8_t mode, uint32_t credit) +{ + if ( dest != QPID_SESSION_DEST ) { + QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest); + return; + } + + if ( mode != CREDIT_MODE_MSG ) { + return; + } + if (sendMsgCredit) { + sendMsgCredit->release(credit); + } +} + +void SessionImpl::stop(const std::string& dest) +{ + if ( dest != QPID_SESSION_DEST ) { + QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest); + return; + } + if (sendMsgCredit) { + sendMsgCredit->forceLock(); + } +} //private utility methods: |
