summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/SessionImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/SessionImpl.cpp')
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp101
1 files changed, 88 insertions, 13 deletions
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index 4fadf236f8..7cf68956ea 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -62,7 +62,8 @@ SessionImpl::SessionImpl(const std::string& name, shared_ptr<ConnectionImpl> con
ioHandler(*this),
proxy(ioHandler),
nextIn(0),
- nextOut(0)
+ nextOut(0),
+ sendMsgCredit(0)
{
channel.next = connectionShared.get();
}
@@ -76,6 +77,7 @@ SessionImpl::~SessionImpl() {
handleClosed();
state.waitWaiters();
}
+ delete sendMsgCredit;
}
boost::shared_ptr<ConnectionImpl> c = connectionWeak.lock();
if (c) c->erase(channel);
@@ -359,7 +361,7 @@ void SessionImpl::sendContent(const MethodContent& content)
uint64_t data_length = content.getData().length();
if(data_length > 0){
header.setLastSegment(false);
- handleOut(header);
+ handleContentOut(header);
/*Note: end of frame marker included in overhead but not in size*/
const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead();
@@ -388,7 +390,7 @@ void SessionImpl::sendContent(const MethodContent& content)
}
}
} else {
- handleOut(header);
+ handleContentOut(header);
}
}
@@ -414,16 +416,18 @@ bool isContentFrame(AMQFrame& frame)
void SessionImpl::handleIn(AMQFrame& frame) // network thread
{
try {
- if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
- if (invoke(static_cast<ExecutionHandler&>(*this), *frame.getBody())) {
- //make sure the command id sequence and completion
- //tracking takes account of execution commands
- Lock l(state);
- completedIn.add(nextIn++);
- } else {
- //if not handled by this class, its for the application:
- deliver(frame);
- }
+ if (invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
+ ;
+ } else if (invoke(static_cast<ExecutionHandler&>(*this), *frame.getBody())) {
+ //make sure the command id sequence and completion
+ //tracking takes account of execution commands
+ Lock l(state);
+ completedIn.add(nextIn++);
+ } else if (invoke(static_cast<MessageHandler&>(*this), *frame.getBody())) {
+ ;
+ } else {
+ //if not handled by this class, its for the application:
+ deliver(frame);
}
}
catch (const SessionException& e) {
@@ -439,6 +443,14 @@ void SessionImpl::handleOut(AMQFrame& frame) // user thread
sendFrame(frame, true);
}
+void SessionImpl::handleContentOut(AMQFrame& frame) // user thread
+{
+ if (sendMsgCredit) {
+ sendMsgCredit->acquire();
+ }
+ sendFrame(frame, true);
+}
+
void SessionImpl::proxyOut(AMQFrame& frame) // network thread
{
//Note: this case is treated slightly differently that command
@@ -631,6 +643,69 @@ void SessionImpl::exception(uint16_t errorCode,
setTimeout(0);
}
+// Message methods:
+void SessionImpl::accept(const qpid::framing::SequenceSet&)
+{
+}
+
+void SessionImpl::reject(const qpid::framing::SequenceSet&, uint16_t, const std::string&)
+{
+}
+
+void SessionImpl::release(const qpid::framing::SequenceSet&, bool)
+{
+}
+
+MessageResumeResult SessionImpl::resume(const std::string&, const std::string&)
+{
+ throw NotImplementedException("resuming transfers not yet supported");
+}
+
+namespace {
+ const std::string QPID_SESSION_DEST = "";
+ const uint8_t FLOW_MODE_CREDIT = 0;
+ const uint8_t CREDIT_MODE_MSG = 0;
+}
+
+void SessionImpl::setFlowMode(const std::string& dest, uint8_t flowMode)
+{
+ if ( dest != QPID_SESSION_DEST ) {
+ QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest);
+ return;
+ }
+
+ if ( flowMode != FLOW_MODE_CREDIT ) {
+ throw NotImplementedException("window flow control mode not supported by producer");
+ }
+ Lock l(state);
+ sendMsgCredit = new sys::Semaphore(0);
+}
+
+void SessionImpl::flow(const std::string& dest, uint8_t mode, uint32_t credit)
+{
+ if ( dest != QPID_SESSION_DEST ) {
+ QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest);
+ return;
+ }
+
+ if ( mode != CREDIT_MODE_MSG ) {
+ return;
+ }
+ if (sendMsgCredit) {
+ sendMsgCredit->release(credit);
+ }
+}
+
+void SessionImpl::stop(const std::string& dest)
+{
+ if ( dest != QPID_SESSION_DEST ) {
+ QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest);
+ return;
+ }
+ if (sendMsgCredit) {
+ sendMsgCredit->forceLock();
+ }
+}
//private utility methods: