summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/MessageHandlerImpl.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-09-18 19:43:29 +0000
committerAlan Conway <aconway@apache.org>2007-09-18 19:43:29 +0000
commit6aeb03f0f5ac7ede957995fc784367a30920c683 (patch)
tree7fe35f0ce9fe6bf17dbd6416deb6069ef9c7b07c /cpp/src/qpid/broker/MessageHandlerImpl.cpp
parent8b039e1ed4e4340917d7fd3d8202049e691ca6ec (diff)
downloadqpid-python-6aeb03f0f5ac7ede957995fc784367a30920c683.tar.gz
Refactor HandlerImpl to use Session rather than CoreRefs.
Remove most uses of ChannelAdapter in broker code. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@577027 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/MessageHandlerImpl.cpp')
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp42
1 files changed, 21 insertions, 21 deletions
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index d9b91c1617..a31ac78aa4 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -36,8 +36,8 @@ namespace broker {
using namespace framing;
-MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent)
- : HandlerImplType(parent) {}
+MessageHandlerImpl::MessageHandlerImpl(Session& session)
+ : HandlerImpl(session) {}
//
// Message class method handlers
@@ -46,7 +46,7 @@ MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent)
void
MessageHandlerImpl::cancel(const string& destination )
{
- session.cancel(destination);
+ getSession().cancel(destination);
}
void
@@ -97,14 +97,14 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
bool exclusive,
const framing::FieldTable& filter )
{
- Queue::shared_ptr queue = session.getQueue(queueName);
- if(!destination.empty() && session.exists(destination))
+ Queue::shared_ptr queue = getSession().getQueue(queueName);
+ if(!destination.empty() && getSession().exists(destination))
throw ConnectionException(530, "Consumer tags must be unique");
string tag = destination;
//NB: am assuming pre-acquired = 0 as discussed on SIG list as is
//the previously expected behaviour
- session.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
+ getSession().consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
@@ -117,9 +117,9 @@ MessageHandlerImpl::get(uint16_t /*ticket*/,
const string& destination,
bool noAck )
{
- Queue::shared_ptr queue = session.getQueue(queueName);
+ Queue::shared_ptr queue = getSession().getQueue(queueName);
- if (session.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
+ if (getSession().get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
//don't send any response... rely on execution completion
} else {
//temporarily disabled:
@@ -148,14 +148,14 @@ MessageHandlerImpl::qos(uint32_t prefetchSize,
bool /*global*/ )
{
//TODO: handle global
- session.setPrefetchSize(prefetchSize);
- session.setPrefetchCount(prefetchCount);
+ getSession().setPrefetchSize(prefetchSize);
+ getSession().setPrefetchCount(prefetchCount);
}
void
MessageHandlerImpl::recover(bool requeue)
{
- session.recover(requeue);
+ getSession().recover(requeue);
}
void
@@ -166,7 +166,7 @@ MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/
}
for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
- session.reject(i->getValue(), (++i)->getValue());
+ getSession().reject(i->getValue(), (++i)->getValue());
}
}
@@ -175,10 +175,10 @@ void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_i
if (unit == 0) {
//message
- session.addMessageCredit(destination, value);
+ getSession().addMessageCredit(destination, value);
} else if (unit == 1) {
//bytes
- session.addByteCredit(destination, value);
+ getSession().addByteCredit(destination, value);
} else {
//unknown
throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit);
@@ -190,10 +190,10 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode)
{
if (mode == 0) {
//credit
- session.setCreditMode(destination);
+ getSession().setCreditMode(destination);
} else if (mode == 1) {
//window
- session.setWindowMode(destination);
+ getSession().setWindowMode(destination);
} else{
throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode);
}
@@ -201,12 +201,12 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode)
void MessageHandlerImpl::flush(const std::string& destination)
{
- session.flush(destination);
+ getSession().flush(destination);
}
void MessageHandlerImpl::stop(const std::string& destination)
{
- session.stop(destination);
+ getSession().stop(destination);
}
void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/)
@@ -218,11 +218,11 @@ void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*
}
for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
- session.acquire(i->getValue(), (++i)->getValue(), results);
+ getSession().acquire(i->getValue(), (++i)->getValue(), results);
}
results = results.condense();
- client.acquired(results);
+ getProxy().getMessage().acquired(results);
}
void MessageHandlerImpl::release(const SequenceNumberSet& transfers)
@@ -232,7 +232,7 @@ void MessageHandlerImpl::release(const SequenceNumberSet& transfers)
}
for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
- session.release(i->getValue(), (++i)->getValue());
+ getSession().release(i->getValue(), (++i)->getValue());
}
}