diff options
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 101 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionImpl.h | 20 |
2 files changed, 107 insertions, 14 deletions
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 4fadf236f8..7cf68956ea 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -62,7 +62,8 @@ SessionImpl::SessionImpl(const std::string& name, shared_ptr<ConnectionImpl> con ioHandler(*this), proxy(ioHandler), nextIn(0), - nextOut(0) + nextOut(0), + sendMsgCredit(0) { channel.next = connectionShared.get(); } @@ -76,6 +77,7 @@ SessionImpl::~SessionImpl() { handleClosed(); state.waitWaiters(); } + delete sendMsgCredit; } boost::shared_ptr<ConnectionImpl> c = connectionWeak.lock(); if (c) c->erase(channel); @@ -359,7 +361,7 @@ void SessionImpl::sendContent(const MethodContent& content) uint64_t data_length = content.getData().length(); if(data_length > 0){ header.setLastSegment(false); - handleOut(header); + handleContentOut(header); /*Note: end of frame marker included in overhead but not in size*/ const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead(); @@ -388,7 +390,7 @@ void SessionImpl::sendContent(const MethodContent& content) } } } else { - handleOut(header); + handleContentOut(header); } } @@ -414,16 +416,18 @@ bool isContentFrame(AMQFrame& frame) void SessionImpl::handleIn(AMQFrame& frame) // network thread { try { - if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { - if (invoke(static_cast<ExecutionHandler&>(*this), *frame.getBody())) { - //make sure the command id sequence and completion - //tracking takes account of execution commands - Lock l(state); - completedIn.add(nextIn++); - } else { - //if not handled by this class, its for the application: - deliver(frame); - } + if (invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { + ; + } else if (invoke(static_cast<ExecutionHandler&>(*this), *frame.getBody())) { + //make sure the command id sequence and completion + //tracking takes account of execution commands + Lock l(state); + completedIn.add(nextIn++); + } else if (invoke(static_cast<MessageHandler&>(*this), *frame.getBody())) { + ; + } else { + //if not handled by this class, its for the application: + deliver(frame); } } catch (const SessionException& e) { @@ -439,6 +443,14 @@ void SessionImpl::handleOut(AMQFrame& frame) // user thread sendFrame(frame, true); } +void SessionImpl::handleContentOut(AMQFrame& frame) // user thread +{ + if (sendMsgCredit) { + sendMsgCredit->acquire(); + } + sendFrame(frame, true); +} + void SessionImpl::proxyOut(AMQFrame& frame) // network thread { //Note: this case is treated slightly differently that command @@ -631,6 +643,69 @@ void SessionImpl::exception(uint16_t errorCode, setTimeout(0); } +// Message methods: +void SessionImpl::accept(const qpid::framing::SequenceSet&) +{ +} + +void SessionImpl::reject(const qpid::framing::SequenceSet&, uint16_t, const std::string&) +{ +} + +void SessionImpl::release(const qpid::framing::SequenceSet&, bool) +{ +} + +MessageResumeResult SessionImpl::resume(const std::string&, const std::string&) +{ + throw NotImplementedException("resuming transfers not yet supported"); +} + +namespace { + const std::string QPID_SESSION_DEST = ""; + const uint8_t FLOW_MODE_CREDIT = 0; + const uint8_t CREDIT_MODE_MSG = 0; +} + +void SessionImpl::setFlowMode(const std::string& dest, uint8_t flowMode) +{ + if ( dest != QPID_SESSION_DEST ) { + QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest); + return; + } + + if ( flowMode != FLOW_MODE_CREDIT ) { + throw NotImplementedException("window flow control mode not supported by producer"); + } + Lock l(state); + sendMsgCredit = new sys::Semaphore(0); +} + +void SessionImpl::flow(const std::string& dest, uint8_t mode, uint32_t credit) +{ + if ( dest != QPID_SESSION_DEST ) { + QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest); + return; + } + + if ( mode != CREDIT_MODE_MSG ) { + return; + } + if (sendMsgCredit) { + sendMsgCredit->release(credit); + } +} + +void SessionImpl::stop(const std::string& dest) +{ + if ( dest != QPID_SESSION_DEST ) { + QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest); + return; + } + if (sendMsgCredit) { + sendMsgCredit->forceLock(); + } +} //private utility methods: diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h index ea7776634a..9d0c4ff796 100644 --- a/cpp/src/qpid/client/SessionImpl.h +++ b/cpp/src/qpid/client/SessionImpl.h @@ -61,7 +61,8 @@ class SessionHandler; class SessionImpl : public framing::FrameHandler::InOutHandler, public Execution, private framing::AMQP_ClientOperations::SessionHandler, - private framing::AMQP_ClientOperations::ExecutionHandler + private framing::AMQP_ClientOperations::ExecutionHandler, + private framing::AMQP_ClientOperations::MessageHandler { public: SessionImpl(const std::string& name, shared_ptr<ConnectionImpl>); @@ -123,6 +124,7 @@ private: }; typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler; typedef framing::AMQP_ClientOperations::ExecutionHandler ExecutionHandler; + typedef framing::AMQP_ClientOperations::MessageHandler MessageHandler; typedef sys::StateMonitor<State, DETACHED> StateMonitor; typedef StateMonitor::Set States; @@ -138,6 +140,7 @@ private: void handleIn(framing::AMQFrame& frame); void handleOut(framing::AMQFrame& frame); + void handleContentOut(framing::AMQFrame& frame); /** * Sends session controls. This case is treated slightly * differently than command frames sent by the application via @@ -181,6 +184,18 @@ private: uint8_t fieldIndex, const std::string& description, const framing::FieldTable& errorInfo); + + // Note: Following methods are called by network thread in + // response to message commands from the broker + // EXCEPT Message.Transfer + void accept(const qpid::framing::SequenceSet&); + void reject(const qpid::framing::SequenceSet&, uint16_t, const std::string&); + void release(const qpid::framing::SequenceSet&, bool); + qpid::framing::MessageResumeResult resume(const std::string&, const std::string&); + void setFlowMode(const std::string&, uint8_t); + void flow(const std::string&, uint8_t, uint32_t); + void stop(const std::string&); + sys::ExceptionHolder exceptionHolder; mutable StateMonitor state; @@ -211,6 +226,9 @@ private: SessionState sessionState; + // Only keep track of message credit + sys::Semaphore* sendMsgCredit; + friend class client::SessionHandler; }; |
