summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/SessionImpl.cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2009-01-27 21:17:47 +0000
committerAndrew Stitcher <astitcher@apache.org>2009-01-27 21:17:47 +0000
commit2dff9493ceb62d37a3b70a4abd6bc0539bdb581e (patch)
tree0003c4766da8f32e8fc2b9f5b4968391b7319492 /cpp/src/qpid/client/SessionImpl.cpp
parent3f547381f1af5cdb9d7c5f9cc30f7303d643afd9 (diff)
downloadqpid-python-2dff9493ceb62d37a3b70a4abd6bc0539bdb581e.tar.gz
Producer side rate throttling:
This uses the Message.Flow command to send credit from broker to client to ensure that the client doesn't exceed a rate configured on the broker per session. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@738247 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/SessionImpl.cpp')
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp101
1 files changed, 88 insertions, 13 deletions
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index 4fadf236f8..7cf68956ea 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -62,7 +62,8 @@ SessionImpl::SessionImpl(const std::string& name, shared_ptr<ConnectionImpl> con
ioHandler(*this),
proxy(ioHandler),
nextIn(0),
- nextOut(0)
+ nextOut(0),
+ sendMsgCredit(0)
{
channel.next = connectionShared.get();
}
@@ -76,6 +77,7 @@ SessionImpl::~SessionImpl() {
handleClosed();
state.waitWaiters();
}
+ delete sendMsgCredit;
}
boost::shared_ptr<ConnectionImpl> c = connectionWeak.lock();
if (c) c->erase(channel);
@@ -359,7 +361,7 @@ void SessionImpl::sendContent(const MethodContent& content)
uint64_t data_length = content.getData().length();
if(data_length > 0){
header.setLastSegment(false);
- handleOut(header);
+ handleContentOut(header);
/*Note: end of frame marker included in overhead but not in size*/
const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead();
@@ -388,7 +390,7 @@ void SessionImpl::sendContent(const MethodContent& content)
}
}
} else {
- handleOut(header);
+ handleContentOut(header);
}
}
@@ -414,16 +416,18 @@ bool isContentFrame(AMQFrame& frame)
void SessionImpl::handleIn(AMQFrame& frame) // network thread
{
try {
- if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
- if (invoke(static_cast<ExecutionHandler&>(*this), *frame.getBody())) {
- //make sure the command id sequence and completion
- //tracking takes account of execution commands
- Lock l(state);
- completedIn.add(nextIn++);
- } else {
- //if not handled by this class, its for the application:
- deliver(frame);
- }
+ if (invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
+ ;
+ } else if (invoke(static_cast<ExecutionHandler&>(*this), *frame.getBody())) {
+ //make sure the command id sequence and completion
+ //tracking takes account of execution commands
+ Lock l(state);
+ completedIn.add(nextIn++);
+ } else if (invoke(static_cast<MessageHandler&>(*this), *frame.getBody())) {
+ ;
+ } else {
+ //if not handled by this class, its for the application:
+ deliver(frame);
}
}
catch (const SessionException& e) {
@@ -439,6 +443,14 @@ void SessionImpl::handleOut(AMQFrame& frame) // user thread
sendFrame(frame, true);
}
+void SessionImpl::handleContentOut(AMQFrame& frame) // user thread
+{
+ if (sendMsgCredit) {
+ sendMsgCredit->acquire();
+ }
+ sendFrame(frame, true);
+}
+
void SessionImpl::proxyOut(AMQFrame& frame) // network thread
{
//Note: this case is treated slightly differently that command
@@ -631,6 +643,69 @@ void SessionImpl::exception(uint16_t errorCode,
setTimeout(0);
}
+// Message methods:
+void SessionImpl::accept(const qpid::framing::SequenceSet&)
+{
+}
+
+void SessionImpl::reject(const qpid::framing::SequenceSet&, uint16_t, const std::string&)
+{
+}
+
+void SessionImpl::release(const qpid::framing::SequenceSet&, bool)
+{
+}
+
+MessageResumeResult SessionImpl::resume(const std::string&, const std::string&)
+{
+ throw NotImplementedException("resuming transfers not yet supported");
+}
+
+namespace {
+ const std::string QPID_SESSION_DEST = "";
+ const uint8_t FLOW_MODE_CREDIT = 0;
+ const uint8_t CREDIT_MODE_MSG = 0;
+}
+
+void SessionImpl::setFlowMode(const std::string& dest, uint8_t flowMode)
+{
+ if ( dest != QPID_SESSION_DEST ) {
+ QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest);
+ return;
+ }
+
+ if ( flowMode != FLOW_MODE_CREDIT ) {
+ throw NotImplementedException("window flow control mode not supported by producer");
+ }
+ Lock l(state);
+ sendMsgCredit = new sys::Semaphore(0);
+}
+
+void SessionImpl::flow(const std::string& dest, uint8_t mode, uint32_t credit)
+{
+ if ( dest != QPID_SESSION_DEST ) {
+ QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest);
+ return;
+ }
+
+ if ( mode != CREDIT_MODE_MSG ) {
+ return;
+ }
+ if (sendMsgCredit) {
+ sendMsgCredit->release(credit);
+ }
+}
+
+void SessionImpl::stop(const std::string& dest)
+{
+ if ( dest != QPID_SESSION_DEST ) {
+ QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest);
+ return;
+ }
+ if (sendMsgCredit) {
+ sendMsgCredit->forceLock();
+ }
+}
//private utility methods: