diff options
Diffstat (limited to 'cpp/src/qpid/broker/BrokerChannel.cpp')
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 52 |
1 files changed, 7 insertions, 45 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index 9712b3903f..615a26beab 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -32,13 +32,12 @@ #include "BrokerAdapter.h" #include "BrokerChannel.h" -#include "BrokerMessage.h" #include "BrokerQueue.h" #include "Connection.h" #include "DeliverableMessage.h" #include "DtxAck.h" #include "DtxTimeout.h" -#include "MessageStore.h" +#include "Message.h" #include "TxAck.h" #include "TxPublish.h" @@ -49,7 +48,7 @@ using namespace qpid::framing; using namespace qpid::sys; -Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id, MessageStore* const _store) : +Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id) : id(_id), connection(con), out(_out), @@ -58,8 +57,6 @@ Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id, MessageS tagGenerator("sgen"), dtxSelected(false), accumulatedAck(0), - store(_store), - messageBuilder(this, _store, connection.getStagingThreshold()), opened(id == 0),//channel 0 is automatically open, other must be explicitly opened flowActive(true) { @@ -108,7 +105,7 @@ void Channel::startTx() txBuffer = TxBuffer::shared_ptr(new TxBuffer()); } -void Channel::commit() +void Channel::commit(MessageStore* const store) { if (!txBuffer) throw ConnectionException(503, "Channel has not been selected for use with transactions"); @@ -296,34 +293,7 @@ void Channel::ConsumerImpl::requestDispatch() queue->requestDispatch(); } -void Channel::handleInlineTransfer(Message::shared_ptr msg) -{ - complete(msg); -} - -void Channel::handlePublish(Message* _message) -{ - Message::shared_ptr message(_message); - messageBuilder.initialise(message); -} - -void Channel::handleHeader(AMQHeaderBody* header) -{ - messageBuilder.setHeader(header); - //at this point, decide based on the size of the message whether we want - //to stage it by saving content directly to disk as it arrives -} - -void Channel::handleContent(AMQContentBody* content) -{ - messageBuilder.addContent(content); -} - -void Channel::handleHeartbeat(AMQHeartbeatBody*) { - // TODO aconway 2007-01-17: Implement heartbeating. -} - -void Channel::complete(Message::shared_ptr msg) { +void Channel::handle(Message::shared_ptr msg) { if (txBuffer.get()) { TxPublish* deliverable(new TxPublish(msg)); TxOp::shared_ptr op(deliverable); @@ -335,20 +305,12 @@ void Channel::complete(Message::shared_ptr msg) { } } - - void Channel::route(Message::shared_ptr msg, Deliverable& strategy) { - - std::string routeToExchangeName = msg->getExchange(); - // cache the exchange lookup - if (!cacheExchange.get() || cacheExchangeName != routeToExchangeName){ - cacheExchangeName = routeToExchangeName; - cacheExchange = connection.broker.getExchanges().get(routeToExchangeName); + std::string exchangeName = msg->getExchangeName(); + if (!cacheExchange || cacheExchange->getName() != exchangeName){ + cacheExchange = connection.broker.getExchanges().get(exchangeName); } - if (!cacheExchange.get() ) - throw ChannelException(404, "Exchange not found '" + routeToExchangeName + "'"); - cacheExchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders())); if (!strategy.delivered) { |