diff options
| author | Gordon Sim <gsim@apache.org> | 2007-08-10 14:51:08 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-08-10 14:51:08 +0000 |
| commit | 6577b14632d81c15482cb0793e01166cdb28eaff (patch) | |
| tree | 8b8dc5e4db5690e9c024b862a1d725764687d6fc /cpp/src/qpid/broker/MessageHandlerImpl.cpp | |
| parent | c00a668cbf27d90edf18cc935cc982cab6581cae (diff) | |
| download | qpid-python-6577b14632d81c15482cb0793e01166cdb28eaff.tar.gz | |
Broker management of message acknowledgements now runs entirely off execution layer.
Flow control support.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@564611 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/MessageHandlerImpl.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 47 |
1 files changed, 45 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index da57439e21..c728a800ab 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -27,6 +27,8 @@ #include "qpid/framing/MessageTransferBody.h" #include "BrokerAdapter.h" +#include <boost/format.hpp> + namespace qpid { namespace broker { @@ -96,7 +98,7 @@ MessageHandlerImpl::consume(uint16_t /*ticket*/, if(!destination.empty() && channel.exists(destination)) throw ConnectionException(530, "Consumer tags must be unique"); string tag = destination; - channel.consume(MessageMessage::getToken(destination), tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); + channel.consume(MessageMessage::getToken(destination), tag, queue, noLocal, !noAck, exclusive, &filter); // Dispatch messages as there is now a consumer. queue->requestDispatch(); } @@ -130,7 +132,7 @@ MessageHandlerImpl::empty() void MessageHandlerImpl::ok() { - channel.ack(adapter.getFirstAckRequest(), adapter.getLastAckRequest()); + throw ConnectionException(540, "Message.Ok no longer supported"); } void @@ -171,4 +173,45 @@ MessageHandlerImpl::transfer(const framing::MethodContext& context) } + +void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value) +{ + + if (unit == 0) { + //message + channel.addMessageCredit(destination, value); + } else if (unit == 1) { + //bytes + channel.addByteCredit(destination, value); + } else { + //unknown + throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit); + } + +} + +void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode) +{ + if (mode == 0) { + //credit + channel.setCreditMode(destination); + } else if (mode == 1) { + //window + channel.setWindowMode(destination); + } else{ + throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode); + } +} + +void MessageHandlerImpl::flush(const std::string& destination) +{ + channel.flush(destination); +} + +void MessageHandlerImpl::stop(const std::string& destination) +{ + channel.stop(destination); +} + + }} // namespace qpid::broker |
