diff options
Diffstat (limited to 'cpp/src/qpid/broker/MessageHandlerImpl.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 42 |
1 files changed, 21 insertions, 21 deletions
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index d9b91c1617..a31ac78aa4 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -36,8 +36,8 @@ namespace broker { using namespace framing; -MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent) - : HandlerImplType(parent) {} +MessageHandlerImpl::MessageHandlerImpl(Session& session) + : HandlerImpl(session) {} // // Message class method handlers @@ -46,7 +46,7 @@ MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent) void MessageHandlerImpl::cancel(const string& destination ) { - session.cancel(destination); + getSession().cancel(destination); } void @@ -97,14 +97,14 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/, bool exclusive, const framing::FieldTable& filter ) { - Queue::shared_ptr queue = session.getQueue(queueName); - if(!destination.empty() && session.exists(destination)) + Queue::shared_ptr queue = getSession().getQueue(queueName); + if(!destination.empty() && getSession().exists(destination)) throw ConnectionException(530, "Consumer tags must be unique"); string tag = destination; //NB: am assuming pre-acquired = 0 as discussed on SIG list as is //the previously expected behaviour - session.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), + getSession().consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter); // Dispatch messages as there is now a consumer. queue->requestDispatch(); @@ -117,9 +117,9 @@ MessageHandlerImpl::get(uint16_t /*ticket*/, const string& destination, bool noAck ) { - Queue::shared_ptr queue = session.getQueue(queueName); + Queue::shared_ptr queue = getSession().getQueue(queueName); - if (session.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){ + if (getSession().get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){ //don't send any response... rely on execution completion } else { //temporarily disabled: @@ -148,14 +148,14 @@ MessageHandlerImpl::qos(uint32_t prefetchSize, bool /*global*/ ) { //TODO: handle global - session.setPrefetchSize(prefetchSize); - session.setPrefetchCount(prefetchCount); + getSession().setPrefetchSize(prefetchSize); + getSession().setPrefetchCount(prefetchCount); } void MessageHandlerImpl::recover(bool requeue) { - session.recover(requeue); + getSession().recover(requeue); } void @@ -166,7 +166,7 @@ MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/ } for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - session.reject(i->getValue(), (++i)->getValue()); + getSession().reject(i->getValue(), (++i)->getValue()); } } @@ -175,10 +175,10 @@ void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_i if (unit == 0) { //message - session.addMessageCredit(destination, value); + getSession().addMessageCredit(destination, value); } else if (unit == 1) { //bytes - session.addByteCredit(destination, value); + getSession().addByteCredit(destination, value); } else { //unknown throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit); @@ -190,10 +190,10 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode) { if (mode == 0) { //credit - session.setCreditMode(destination); + getSession().setCreditMode(destination); } else if (mode == 1) { //window - session.setWindowMode(destination); + getSession().setWindowMode(destination); } else{ throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode); } @@ -201,12 +201,12 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode) void MessageHandlerImpl::flush(const std::string& destination) { - session.flush(destination); + getSession().flush(destination); } void MessageHandlerImpl::stop(const std::string& destination) { - session.stop(destination); + getSession().stop(destination); } void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/) @@ -218,11 +218,11 @@ void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /* } for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - session.acquire(i->getValue(), (++i)->getValue(), results); + getSession().acquire(i->getValue(), (++i)->getValue(), results); } results = results.condense(); - client.acquired(results); + getProxy().getMessage().acquired(results); } void MessageHandlerImpl::release(const SequenceNumberSet& transfers) @@ -232,7 +232,7 @@ void MessageHandlerImpl::release(const SequenceNumberSet& transfers) } for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - session.release(i->getValue(), (++i)->getValue()); + getSession().release(i->getValue(), (++i)->getValue()); } } |
