summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/BrokerChannel.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp52
1 files changed, 7 insertions, 45 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index 9712b3903f..615a26beab 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -32,13 +32,12 @@
#include "BrokerAdapter.h"
#include "BrokerChannel.h"
-#include "BrokerMessage.h"
#include "BrokerQueue.h"
#include "Connection.h"
#include "DeliverableMessage.h"
#include "DtxAck.h"
#include "DtxTimeout.h"
-#include "MessageStore.h"
+#include "Message.h"
#include "TxAck.h"
#include "TxPublish.h"
@@ -49,7 +48,7 @@ using namespace qpid::framing;
using namespace qpid::sys;
-Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id, MessageStore* const _store) :
+Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id) :
id(_id),
connection(con),
out(_out),
@@ -58,8 +57,6 @@ Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id, MessageS
tagGenerator("sgen"),
dtxSelected(false),
accumulatedAck(0),
- store(_store),
- messageBuilder(this, _store, connection.getStagingThreshold()),
opened(id == 0),//channel 0 is automatically open, other must be explicitly opened
flowActive(true)
{
@@ -108,7 +105,7 @@ void Channel::startTx()
txBuffer = TxBuffer::shared_ptr(new TxBuffer());
}
-void Channel::commit()
+void Channel::commit(MessageStore* const store)
{
if (!txBuffer) throw ConnectionException(503, "Channel has not been selected for use with transactions");
@@ -296,34 +293,7 @@ void Channel::ConsumerImpl::requestDispatch()
queue->requestDispatch();
}
-void Channel::handleInlineTransfer(Message::shared_ptr msg)
-{
- complete(msg);
-}
-
-void Channel::handlePublish(Message* _message)
-{
- Message::shared_ptr message(_message);
- messageBuilder.initialise(message);
-}
-
-void Channel::handleHeader(AMQHeaderBody* header)
-{
- messageBuilder.setHeader(header);
- //at this point, decide based on the size of the message whether we want
- //to stage it by saving content directly to disk as it arrives
-}
-
-void Channel::handleContent(AMQContentBody* content)
-{
- messageBuilder.addContent(content);
-}
-
-void Channel::handleHeartbeat(AMQHeartbeatBody*) {
- // TODO aconway 2007-01-17: Implement heartbeating.
-}
-
-void Channel::complete(Message::shared_ptr msg) {
+void Channel::handle(Message::shared_ptr msg) {
if (txBuffer.get()) {
TxPublish* deliverable(new TxPublish(msg));
TxOp::shared_ptr op(deliverable);
@@ -335,20 +305,12 @@ void Channel::complete(Message::shared_ptr msg) {
}
}
-
-
void Channel::route(Message::shared_ptr msg, Deliverable& strategy) {
-
- std::string routeToExchangeName = msg->getExchange();
- // cache the exchange lookup
- if (!cacheExchange.get() || cacheExchangeName != routeToExchangeName){
- cacheExchangeName = routeToExchangeName;
- cacheExchange = connection.broker.getExchanges().get(routeToExchangeName);
+ std::string exchangeName = msg->getExchangeName();
+ if (!cacheExchange || cacheExchange->getName() != exchangeName){
+ cacheExchange = connection.broker.getExchanges().get(exchangeName);
}
- if (!cacheExchange.get() )
- throw ChannelException(404, "Exchange not found '" + routeToExchangeName + "'");
-
cacheExchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
if (!strategy.delivered) {