summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-08-28 19:38:17 +0000
committerGordon Sim <gsim@apache.org>2007-08-28 19:38:17 +0000
commit9e10f4ea3b2f8ab6650f635cada48e4735ca20d7 (patch)
tree26ad3b8dffa17fa665fe7a033a7c8092839df011 /cpp/src/qpid/broker
parent6b09696b216c090b512c6af92bf7976ae3407add (diff)
downloadqpid-python-9e10f4ea3b2f8ab6650f635cada48e4735ca20d7.tar.gz
Updated message.transfer encoding to use header and content segments (including new structs).
Unified more between the basic and message classes messages. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@570538 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/AccumulatedAck.h7
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp19
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h3
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp52
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.h25
-rw-r--r--cpp/src/qpid/broker/BrokerExchange.h2
-rw-r--r--cpp/src/qpid/broker/BrokerMessage.cpp299
-rw-r--r--cpp/src/qpid/broker/BrokerMessage.h146
-rw-r--r--cpp/src/qpid/broker/BrokerMessageBase.h168
-rw-r--r--cpp/src/qpid/broker/BrokerMessageMessage.cpp328
-rw-r--r--cpp/src/qpid/broker/BrokerMessageMessage.h90
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.cpp2
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.h3
-rw-r--r--cpp/src/qpid/broker/CompletionHandler.h39
-rw-r--r--cpp/src/qpid/broker/Consumer.h2
-rw-r--r--cpp/src/qpid/broker/Content.h64
-rw-r--r--cpp/src/qpid/broker/DeliverableMessage.h2
-rw-r--r--cpp/src/qpid/broker/DeliveryAdapter.h2
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h4
-rw-r--r--cpp/src/qpid/broker/DirectExchange.h1
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.h1
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.h1
-rw-r--r--cpp/src/qpid/broker/InMemoryContent.cpp70
-rw-r--r--cpp/src/qpid/broker/InMemoryContent.h46
-rw-r--r--cpp/src/qpid/broker/LazyLoadedContent.cpp68
-rw-r--r--cpp/src/qpid/broker/LazyLoadedContent.h50
-rw-r--r--cpp/src/qpid/broker/Message.cpp195
-rw-r--r--cpp/src/qpid/broker/Message.h139
-rw-r--r--cpp/src/qpid/broker/MessageAdapter.h108
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.cpp85
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.h38
-rw-r--r--cpp/src/qpid/broker/MessageDelivery.cpp140
-rw-r--r--cpp/src/qpid/broker/MessageDelivery.h60
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp25
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.h7
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.h1
-rw-r--r--cpp/src/qpid/broker/NameGenerator.h2
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.h1
-rw-r--r--cpp/src/qpid/broker/PersistableExchange.h2
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.h9
-rw-r--r--cpp/src/qpid/broker/RecoveredDequeue.h2
-rw-r--r--cpp/src/qpid/broker/RecoveredEnqueue.h2
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp23
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.h4
-rw-r--r--cpp/src/qpid/broker/Reference.cpp53
-rw-r--r--cpp/src/qpid/broker/Reference.h115
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp198
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.h12
-rw-r--r--cpp/src/qpid/broker/TopicExchange.h1
-rw-r--r--cpp/src/qpid/broker/TxPublish.h4
50 files changed, 892 insertions, 1828 deletions
diff --git a/cpp/src/qpid/broker/AccumulatedAck.h b/cpp/src/qpid/broker/AccumulatedAck.h
index be01c5e02c..b53f4a8ba5 100644
--- a/cpp/src/qpid/broker/AccumulatedAck.h
+++ b/cpp/src/qpid/broker/AccumulatedAck.h
@@ -48,13 +48,12 @@ namespace qpid {
class AccumulatedAck {
public:
/**
- * If not zero, then everything up to this value has been
- * acked.
+ * Everything up to this value has been acked.
*/
DeliveryId mark;
/**
- * List of individually acked messages that are not
- * included in the range marked by 'range'.
+ * List of individually acked messages greater than the
+ * 'mark'.
*/
std::list<Range> ranges;
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index b733f77390..07b7b4f638 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -21,6 +21,7 @@
#include "BrokerChannel.h"
#include "Connection.h"
#include "DeliveryToken.h"
+#include "MessageDelivery.h"
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/Exception.h"
@@ -327,7 +328,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
//need to generate name here, so we have it for the adapter (it is
//also version specific behaviour now)
if (newTag.empty()) newTag = tagGenerator.generate();
- DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken(newTag));
+ DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag));
channel.consume(token, newTag, queue, noLocal, !noAck, exclusive, &fields);
if(!nowait) client.consumeOk(newTag);
@@ -340,21 +341,9 @@ void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){
channel.cancel(consumerTag);
}
-void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/,
- const string& exchangeName, const string& routingKey,
- bool rejectUnroutable, bool immediate)
-{
-
- // exeption moved to ChannelAdaptor -- TODO this code should be removed once basic is removed
-
- BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, rejectUnroutable, immediate);
- channel.handlePublish(msg);
-
-}
-
void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = getQueue(queueName);
- DeliveryToken::shared_ptr token(BasicMessage::createGetToken(queue));
+ DeliveryToken::shared_ptr token(MessageDelivery::getBasicGetToken(queue));
if(!channel.get(token, queue, !noAck)){
string clusterId;//not used, part of an imatix hack
@@ -384,7 +373,7 @@ void BrokerAdapter::TxHandlerImpl::select()
void BrokerAdapter::TxHandlerImpl::commit()
{
- channel.commit();
+ channel.commit(&broker.getStore());
}
void BrokerAdapter::TxHandlerImpl::rollback()
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index 99b7f14525..9e0cf64b7f 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -183,9 +183,6 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
bool noLocal, bool noAck, bool exclusive, bool nowait,
const qpid::framing::FieldTable& fields);
void cancel(const std::string& consumerTag);
- void publish(uint16_t ticket,
- const std::string& exchange, const std::string& routingKey,
- bool rejectUnroutable, bool immediate);
void get(uint16_t ticket, const std::string& queue, bool noAck);
void ack(uint64_t deliveryTag, bool multiple);
void reject(uint64_t deliveryTag, bool requeue);
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index 9712b3903f..615a26beab 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -32,13 +32,12 @@
#include "BrokerAdapter.h"
#include "BrokerChannel.h"
-#include "BrokerMessage.h"
#include "BrokerQueue.h"
#include "Connection.h"
#include "DeliverableMessage.h"
#include "DtxAck.h"
#include "DtxTimeout.h"
-#include "MessageStore.h"
+#include "Message.h"
#include "TxAck.h"
#include "TxPublish.h"
@@ -49,7 +48,7 @@ using namespace qpid::framing;
using namespace qpid::sys;
-Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id, MessageStore* const _store) :
+Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id) :
id(_id),
connection(con),
out(_out),
@@ -58,8 +57,6 @@ Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id, MessageS
tagGenerator("sgen"),
dtxSelected(false),
accumulatedAck(0),
- store(_store),
- messageBuilder(this, _store, connection.getStagingThreshold()),
opened(id == 0),//channel 0 is automatically open, other must be explicitly opened
flowActive(true)
{
@@ -108,7 +105,7 @@ void Channel::startTx()
txBuffer = TxBuffer::shared_ptr(new TxBuffer());
}
-void Channel::commit()
+void Channel::commit(MessageStore* const store)
{
if (!txBuffer) throw ConnectionException(503, "Channel has not been selected for use with transactions");
@@ -296,34 +293,7 @@ void Channel::ConsumerImpl::requestDispatch()
queue->requestDispatch();
}
-void Channel::handleInlineTransfer(Message::shared_ptr msg)
-{
- complete(msg);
-}
-
-void Channel::handlePublish(Message* _message)
-{
- Message::shared_ptr message(_message);
- messageBuilder.initialise(message);
-}
-
-void Channel::handleHeader(AMQHeaderBody* header)
-{
- messageBuilder.setHeader(header);
- //at this point, decide based on the size of the message whether we want
- //to stage it by saving content directly to disk as it arrives
-}
-
-void Channel::handleContent(AMQContentBody* content)
-{
- messageBuilder.addContent(content);
-}
-
-void Channel::handleHeartbeat(AMQHeartbeatBody*) {
- // TODO aconway 2007-01-17: Implement heartbeating.
-}
-
-void Channel::complete(Message::shared_ptr msg) {
+void Channel::handle(Message::shared_ptr msg) {
if (txBuffer.get()) {
TxPublish* deliverable(new TxPublish(msg));
TxOp::shared_ptr op(deliverable);
@@ -335,20 +305,12 @@ void Channel::complete(Message::shared_ptr msg) {
}
}
-
-
void Channel::route(Message::shared_ptr msg, Deliverable& strategy) {
-
- std::string routeToExchangeName = msg->getExchange();
- // cache the exchange lookup
- if (!cacheExchange.get() || cacheExchangeName != routeToExchangeName){
- cacheExchangeName = routeToExchangeName;
- cacheExchange = connection.broker.getExchanges().get(routeToExchangeName);
+ std::string exchangeName = msg->getExchangeName();
+ if (!cacheExchange || cacheExchange->getName() != exchangeName){
+ cacheExchange = connection.broker.getExchanges().get(exchangeName);
}
- if (!cacheExchange.get() )
- throw ChannelException(404, "Exchange not found '" + routeToExchangeName + "'");
-
cacheExchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
if (!strategy.delivered) {
diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h
index fcfcd73679..cdbab37ebc 100644
--- a/cpp/src/qpid/broker/BrokerChannel.h
+++ b/cpp/src/qpid/broker/BrokerChannel.h
@@ -37,14 +37,12 @@
#include "Deliverable.h"
#include "DtxBuffer.h"
#include "DtxManager.h"
-#include "MessageBuilder.h"
#include "NameGenerator.h"
#include "Prefetch.h"
#include "TxBuffer.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/ChannelAdapter.h"
#include "qpid/framing/ChannelOpenBody.h"
-#include "CompletionHandler.h"
namespace qpid {
namespace broker {
@@ -60,7 +58,7 @@ using framing::string;
* Maintains state for an AMQP channel. Handles incoming and
* outgoing messages for that channel.
*/
-class Channel : public CompletionHandler
+class Channel
{
class ConsumerImpl : public Consumer
{
@@ -113,25 +111,22 @@ class Channel : public CompletionHandler
DtxBuffer::shared_ptr dtxBuffer;
bool dtxSelected;
AccumulatedAck accumulatedAck;
- MessageStore* const store;
- MessageBuilder messageBuilder;//builder for in-progress message
bool opened;
bool flowActive;
- std::string cacheExchangeName; // pair holds last exchange used for routing
- Exchange::shared_ptr cacheExchange;
+ boost::shared_ptr<Exchange> cacheExchange;
void route(Message::shared_ptr msg, Deliverable& strategy);
- void complete(Message::shared_ptr msg);// completion handler for MessageBuilder
void record(const DeliveryRecord& delivery);
bool checkPrefetch(Message::shared_ptr& msg);
void checkDtxTimeout();
ConsumerImpl& find(const std::string& destination);
void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
void acknowledged(const DeliveryRecord&);
-
+
+
public:
- Channel(Connection& parent, DeliveryAdapter& out, framing::ChannelId id, MessageStore* const store = 0);
+ Channel(Connection& parent, DeliveryAdapter& out, framing::ChannelId id);
~Channel();
bool isOpen() const { return opened; }
@@ -162,7 +157,7 @@ class Channel : public CompletionHandler
bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected);
void close();
void startTx();
- void commit();
+ void commit(MessageStore* const store);
void rollback();
void selectDtx();
void startDtx(const std::string& xid, DtxManager& mgr, bool join);
@@ -174,12 +169,8 @@ class Channel : public CompletionHandler
void recover(bool requeue);
void flow(bool active);
void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag);
- void handlePublish(Message* msg);
- void handleHeader(framing::AMQHeaderBody*);
- void handleContent(framing::AMQContentBody*);
- void handleHeartbeat(framing::AMQHeartbeatBody*);
-
- void handleInlineTransfer(Message::shared_ptr msg);
+
+ void handle(Message::shared_ptr msg);
};
}} // namespace broker
diff --git a/cpp/src/qpid/broker/BrokerExchange.h b/cpp/src/qpid/broker/BrokerExchange.h
index 91c295e1b7..c3dd7b998d 100644
--- a/cpp/src/qpid/broker/BrokerExchange.h
+++ b/cpp/src/qpid/broker/BrokerExchange.h
@@ -51,7 +51,7 @@ namespace qpid {
: name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0){}
virtual ~Exchange(){}
- string getName() const { return name; }
+ const string& getName() const { return name; }
bool isDurable() { return durable; }
qpid::framing::FieldTable& getArgs() { return args; }
diff --git a/cpp/src/qpid/broker/BrokerMessage.cpp b/cpp/src/qpid/broker/BrokerMessage.cpp
deleted file mode 100644
index bddd5802cf..0000000000
--- a/cpp/src/qpid/broker/BrokerMessage.cpp
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <boost/cast.hpp>
-
-#include "BrokerMessage.h"
-#include <iostream>
-
-#include "InMemoryContent.h"
-#include "LazyLoadedContent.h"
-#include "MessageStore.h"
-#include "BrokerQueue.h"
-#include "qpid/log/Statement.h"
-#include "qpid/framing/BasicDeliverBody.h"
-#include "qpid/framing/BasicGetOkBody.h"
-#include "qpid/framing/BasicPublishBody.h"
-#include "qpid/framing/AMQContentBody.h"
-#include "qpid/framing/AMQHeaderBody.h"
-#include "qpid/framing/AMQMethodBody.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/ChannelAdapter.h"
-#include "RecoveryManagerImpl.h"
-
-namespace qpid{
-namespace broker{
-
-struct BasicGetToken : DeliveryToken
-{
- typedef boost::shared_ptr<BasicGetToken> shared_ptr;
-
- Queue::shared_ptr queue;
-
- BasicGetToken(Queue::shared_ptr q) : queue(q) {}
-};
-
-struct BasicConsumeToken : DeliveryToken
-{
- typedef boost::shared_ptr<BasicConsumeToken> shared_ptr;
-
- const string consumer;
-
- BasicConsumeToken(const string c) : consumer(c) {}
-};
-
-}
-}
-
-using namespace boost;
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-BasicMessage::BasicMessage(
- const ConnectionToken* const _publisher,
- const string& _exchange, const string& _routingKey,
- bool _mandatory, bool _immediate
-) :
- Message(_publisher, _exchange, _routingKey, _mandatory, _immediate),
- size(0)
-{}
-
-// For tests only.
-BasicMessage::BasicMessage() : isHeaderSet(false), size(0) {}
-
-BasicMessage::~BasicMessage(){}
-
-void BasicMessage::setHeader(AMQHeaderBody* _header){
- if (_header) {
- this->header = *_header;
- isHeaderSet = true;
- }
- else
- isHeaderSet = false;
-}
-
-void BasicMessage::addContent(AMQContentBody* data){
- if (!content.get()) {
- content = std::auto_ptr<Content>(new InMemoryContent());
- }
- content->add(data);
- size += data->size();
-}
-
-bool BasicMessage::isComplete(){
- return isHeaderSet && (header.getContentSize() == contentSize());
-}
-
-DeliveryToken::shared_ptr BasicMessage::createGetToken(Queue::shared_ptr queue)
-{
- return DeliveryToken::shared_ptr(new BasicGetToken(queue));
-}
-
-DeliveryToken::shared_ptr BasicMessage::createConsumeToken(const string& consumer)
-{
- return DeliveryToken::shared_ptr(new BasicConsumeToken(consumer));
-}
-
-void BasicMessage::deliver(ChannelAdapter& channel,
- const string& consumerTag, DeliveryId id,
- uint32_t framesize)
-{
- channel.send(BasicDeliverBody(
- channel.getVersion(), consumerTag, id.getValue(),
- getRedelivered(), getExchange(), getRoutingKey()));
- sendContent(channel, framesize);
-}
-
-void BasicMessage::sendGetOk(ChannelAdapter& channel,
- uint32_t messageCount,
- DeliveryId id,
- uint32_t framesize)
-{
- channel.send(
- BasicGetOkBody(
- channel.getVersion(),
- id.getValue(), getRedelivered(), getExchange(),
- getRoutingKey(), messageCount));
- sendContent(channel, framesize);
-}
-
-void BasicMessage::deliver(framing::ChannelAdapter& channel, DeliveryId id, DeliveryToken::shared_ptr token, uint32_t framesize)
-{
- BasicConsumeToken::shared_ptr consume = dynamic_pointer_cast<BasicConsumeToken>(token);
- if (consume) {
- deliver(channel, consume->consumer, id, framesize);
- } else {
- BasicGetToken::shared_ptr get = dynamic_pointer_cast<BasicGetToken>(token);
- if (get) {
- sendGetOk(channel, get->queue->getMessageCount(), id.getValue(), framesize);
- } else {
- //TODO:
- //either need to be able to convert to a message transfer or
- //throw error of some kind to allow this to be handled higher up
- throw Exception("Conversion to BasicMessage not defined!");
- }
- }
-}
-
-void BasicMessage::sendContent(ChannelAdapter& channel, uint32_t framesize)
-{
- channel.send(header);
- Mutex::ScopedLock locker(contentLock);
- if (content.get())
- content->send(channel, framesize);
-}
-
-BasicHeaderProperties* BasicMessage::getHeaderProperties(){
- return isHeaderSet ? dynamic_cast<BasicHeaderProperties*>(header.getProperties()) : 0;
-}
-
-const FieldTable& BasicMessage::getApplicationHeaders(){
- return getHeaderProperties()->getHeaders();
-}
-
-bool BasicMessage::isPersistent()
-{
- if(!isHeaderSet) return false;
- BasicHeaderProperties* props = getHeaderProperties();
- return props && props->getDeliveryMode() == PERSISTENT;
-}
-
-void BasicMessage::decode(Buffer& buffer, bool headersOnly, uint32_t contentChunkSize)
-{
- decodeHeader(buffer);
- if (!headersOnly) decodeContent(buffer, contentChunkSize);
-}
-
-void BasicMessage::decodeHeader(Buffer& buffer)
-{
- //don't care about the type here, but want encode/decode to be symmetric
- RecoveryManagerImpl::decodeMessageType(buffer);
-
- string exchange;
- string routingKey;
-
- buffer.getShortString(exchange);
- buffer.getShortString(routingKey);
- setRouting(exchange, routingKey);
-
- uint32_t headerSize = buffer.getLong();
- AMQHeaderBody headerBody;
- headerBody.decode(buffer, headerSize);
- setHeader(&headerBody);
-}
-
-void BasicMessage::decodeContent(Buffer& buffer, uint32_t chunkSize)
-{
- uint64_t expected = expectedContentSize();
- if (expected != buffer.available()) {
- QPID_LOG(error, "Expected " << expectedContentSize() << " bytes, got " << buffer.available());
- throw Exception("Cannot decode content, buffer not large enough.");
- }
-
- if (!chunkSize || chunkSize > expected) {
- chunkSize = expected;
- }
-
- uint64_t total = 0;
- while (total < expectedContentSize()) {
- uint64_t remaining = expected - total;
- AMQContentBody contentBody;
- contentBody.decode(buffer, remaining < chunkSize ? remaining : chunkSize);
- addContent(&contentBody);
- total += chunkSize;
- }
-}
-
-void BasicMessage::encode(Buffer& buffer) const
-{
- encodeHeader(buffer);
- encodeContent(buffer);
-}
-
-void BasicMessage::encodeHeader(Buffer& buffer) const
-{
- RecoveryManagerImpl::encodeMessageType(*this, buffer);
- buffer.putShortString(getExchange());
- buffer.putShortString(getRoutingKey());
- buffer.putLong(header.size());
- header.encode(buffer);
-}
-
-void BasicMessage::encodeContent(Buffer& buffer) const
-{
- Mutex::ScopedLock locker(contentLock);
- if (content.get()) content->encode(buffer);
-}
-
-uint32_t BasicMessage::encodedSize() const
-{
- return encodedHeaderSize() + encodedContentSize();
-}
-
-uint32_t BasicMessage::encodedContentSize() const
-{
- Mutex::ScopedLock locker(contentLock);
- return content.get() ? content->size() : 0;
-}
-
-uint32_t BasicMessage::encodedHeaderSize() const
-{
- return RecoveryManagerImpl::encodedMessageTypeSize()
- +getExchange().size() + 1
- + getRoutingKey().size() + 1
- + header.size() + 4;//4 extra bytes for size
-}
-
-uint64_t BasicMessage::expectedContentSize()
-{
- return isHeaderSet ? header.getContentSize() : 0;
-}
-
-void BasicMessage::releaseContent(MessageStore* store)
-{
- Mutex::ScopedLock locker(contentLock);
- if (!isPersistent() && getPersistenceId() == 0) {
- store->stage(*this);
- }
- if (!content.get() || content->size() > 0) {
- //set content to lazy loading mode (but only if there is
- //stored content):
-
- //Note: the LazyLoadedContent instance contains a raw pointer
- //to the message, however it is then set as a member of that
- //message so its lifetime is guaranteed to be no longer than
- //that of the message itself
- content = std::auto_ptr<Content>(
- new LazyLoadedContent(store, this, expectedContentSize()));
- }
-}
-
-void BasicMessage::setContent(std::auto_ptr<Content>& _content)
-{
- Mutex::ScopedLock locker(contentLock);
- content = _content;
-}
-
-
-uint32_t BasicMessage::getRequiredCredit() const
-{
- return header.size() + contentSize();
-}
diff --git a/cpp/src/qpid/broker/BrokerMessage.h b/cpp/src/qpid/broker/BrokerMessage.h
deleted file mode 100644
index 0f46ff2e83..0000000000
--- a/cpp/src/qpid/broker/BrokerMessage.h
+++ /dev/null
@@ -1,146 +0,0 @@
-#ifndef _broker_BrokerMessage_h
-#define _broker_BrokerMessage_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <memory>
-#include <boost/shared_ptr.hpp>
-
-#include "BrokerMessageBase.h"
-#include "qpid/framing/BasicHeaderProperties.h"
-#include "qpid/framing/AMQHeaderBody.h"
-#include "ConnectionToken.h"
-#include "Content.h"
-#include "qpid/sys/Mutex.h"
-#include "TxBuffer.h"
-
-namespace qpid {
-
-namespace framing {
-class ChannelAdapter;
-class AMQHeaderBody;
-}
-
-namespace broker {
-
-class MessageStore;
-class Queue;
-using framing::string;
-
-/**
- * Represents an AMQP message, i.e. a header body, a list of
- * content bodies and some details about the publication
- * request.
- */
-class BasicMessage : public Message {
- framing::AMQHeaderBody header;
- bool isHeaderSet;
- std::auto_ptr<Content> content;
- mutable sys::Mutex contentLock;
- uint64_t size;
-
- void sendContent(framing::ChannelAdapter&, uint32_t framesize);
-
- public:
- typedef boost::shared_ptr<BasicMessage> shared_ptr;
-
- BasicMessage(const ConnectionToken* const publisher,
- const string& exchange, const string& routingKey,
- bool mandatory, bool immediate);
- BasicMessage();
- ~BasicMessage();
- void setHeader(framing::AMQHeaderBody* header);
- void addContent(framing::AMQContentBody* data);
- bool isComplete();
-
- static DeliveryToken::shared_ptr createGetToken(boost::shared_ptr<Queue> queue);
- static DeliveryToken::shared_ptr createConsumeToken(const string& consumer);
- void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize);
-
- void deliver(framing::ChannelAdapter&,
- const string& consumerTag,
- DeliveryId deliveryTag,
- uint32_t framesize);
-
- void sendGetOk(framing::ChannelAdapter& channel,
- uint32_t messageCount,
- DeliveryId deliveryTag,
- uint32_t framesize);
-
- framing::BasicHeaderProperties* getHeaderProperties();
- const framing::FieldTable& getApplicationHeaders();
- bool isPersistent();
- uint64_t contentSize() const { return size; }
-
- void decode(framing::Buffer& buffer, bool headersOnly = false,
- uint32_t contentChunkSize = 0);
- void decodeHeader(framing::Buffer& buffer);
- void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0);
-
- void encode(framing::Buffer& buffer) const;
- void encodeHeader(framing::Buffer& buffer) const;
- void encodeContent(framing::Buffer& buffer) const;
- /**
- * @returns the size of the buffer needed to encode this
- * message in its entirety
- */
- uint32_t encodedSize() const;
- /**
- * @returns the size of the buffer needed to encode the
- * 'header' of this message (not just the header frame,
- * but other meta data e.g.routing key and exchange)
- */
- uint32_t encodedHeaderSize() const;
- /**
- * @returns the size of the buffer needed to encode the
- * (possibly partial) content held by this message
- */
- uint32_t encodedContentSize() const;
- /**
- * Releases the in-memory content data held by this
- * message. Must pass in a store from which the data can
- * be reloaded.
- */
- void releaseContent(MessageStore* store);
- /**
- * If headers have been received, returns the expected
- * content size else returns 0.
- */
- uint64_t expectedContentSize();
- /**
- * Sets the 'content' implementation of this message (the
- * message controls the lifecycle of the content instance
- * it uses).
- */
- void setContent(std::auto_ptr<Content>& content);
-
- /**
- * Returns the byte credits required to transfer this message.
- */
- uint32_t getRequiredCredit() const;
-};
-
-}
-}
-
-
-#endif /*!_broker_BrokerMessage_h*/
diff --git a/cpp/src/qpid/broker/BrokerMessageBase.h b/cpp/src/qpid/broker/BrokerMessageBase.h
deleted file mode 100644
index bac5dc6386..0000000000
--- a/cpp/src/qpid/broker/BrokerMessageBase.h
+++ /dev/null
@@ -1,168 +0,0 @@
-#ifndef _broker_BrokerMessageBase_h
-#define _broker_BrokerMessageBase_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <string>
-#include <boost/shared_ptr.hpp>
-#include "Content.h"
-#include "DeliveryId.h"
-#include "DeliveryToken.h"
-#include "PersistableMessage.h"
-#include "qpid/framing/amqp_types.h"
-
-namespace qpid {
-
-namespace framing {
-class ChannelAdapter;
-class BasicHeaderProperties;
-class FieldTable;
-class AMQMethodBody;
-class AMQContentBody;
-class AMQHeaderBody;
-}
-
-
-namespace broker {
-class ConnectionToken;
-class MessageStore;
-
-/**
- * Base class for all types of internal broker messages
- * abstracting away the operations
- * TODO; AMS: for the moment this is mostly a placeholder
- */
-class Message : public PersistableMessage{
- public:
- typedef boost::shared_ptr<Message> shared_ptr;
-
- Message(const ConnectionToken* publisher_,
- const std::string& _exchange,
- const std::string& _routingKey,
- bool _mandatory, bool _immediate) :
- publisher(publisher_),
- exchange(_exchange),
- routingKey(_routingKey),
- mandatory(_mandatory),
- immediate(_immediate),
- persistenceId(0),
- redelivered(false)
- {}
-
- Message() :
- mandatory(false),
- immediate(false),
- persistenceId(0),
- redelivered(false)
- {}
-
- virtual ~Message() {};
-
- // Accessors
- const std::string& getRoutingKey() const { return routingKey; }
- const std::string& getExchange() const { return exchange; }
- uint64_t getPersistenceId() const { return persistenceId; }
- bool getRedelivered() const { return redelivered; }
-
- void setRouting(const std::string& _exchange, const std::string& _routingKey)
- { exchange = _exchange; routingKey = _routingKey; }
- void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
- void redeliver() { redelivered = true; }
-
- virtual void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag/*only needed for basic class*/,
- DeliveryToken::shared_ptr token, uint32_t framesize) = 0;
-
- virtual bool isComplete() = 0;
-
- virtual uint64_t contentSize() const = 0;
- virtual framing::BasicHeaderProperties* getHeaderProperties() = 0;
- virtual const framing::FieldTable& getApplicationHeaders() = 0;
- virtual bool isPersistent() = 0;
- virtual const ConnectionToken* getPublisher() const {
- return publisher;
- }
-
- virtual uint32_t getRequiredCredit() const = 0;
-
- virtual void encode(framing::Buffer& buffer) const = 0;
- virtual void encodeHeader(framing::Buffer& buffer) const = 0;
-
- /**
- * @returns the size of the buffer needed to encode this
- * message in its entirety
- */
- virtual uint32_t encodedSize() const = 0;
- /**
- * @returns the size of the buffer needed to encode the
- * 'header' of this message (not just the header frame,
- * but other meta data e.g.routing key and exchange)
- */
- virtual uint32_t encodedHeaderSize() const = 0;
- /**
- * @returns the size of the buffer needed to encode the
- * (possibly partial) content held by this message
- */
- virtual uint32_t encodedContentSize() const = 0;
- /**
- * If headers have been received, returns the expected
- * content size else returns 0.
- */
- virtual uint64_t expectedContentSize() = 0;
-
- virtual void decodeHeader(framing::Buffer& buffer) = 0;
- virtual void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0) = 0;
-
- static shared_ptr decode(framing::Buffer& buffer);
-
- // TODO: AMS 29/1/2007 Don't think these are really part of base class
-
- /**
- * Sets the 'content' implementation of this message (the
- * message controls the lifecycle of the content instance
- * it uses).
- */
- virtual void setContent(std::auto_ptr<Content>& /*content*/) {};
- virtual void setHeader(framing::AMQHeaderBody*) {};
- virtual void addContent(framing::AMQContentBody*) {};
- /**
- * Releases the in-memory content data held by this
- * message. Must pass in a store from which the data can
- * be reloaded.
- */
- virtual void releaseContent(MessageStore* /*store*/) {};
-
- bool isImmediate() const { return immediate; }
-
- private:
- const ConnectionToken* publisher;
- std::string exchange;
- std::string routingKey;
- const bool mandatory;
- const bool immediate;
- mutable uint64_t persistenceId;
- bool redelivered;
-};
-
-}}
-
-
-#endif /*!_broker_BrokerMessage_h*/
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
deleted file mode 100644
index 1184885aeb..0000000000
--- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/QpidError.h"
-#include "BrokerMessageMessage.h"
-#include "qpid/framing/ChannelAdapter.h"
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/MessageOpenBody.h"
-#include "qpid/framing/MessageCloseBody.h"
-#include "qpid/framing/MessageAppendBody.h"
-#include "Reference.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/FieldTable.h"
-#include "qpid/framing/BasicHeaderProperties.h"
-#include "RecoveryManagerImpl.h"
-
-#include <algorithm>
-
-using namespace std;
-using namespace boost;
-using namespace qpid::framing;
-
-namespace qpid {
-namespace broker {
-
-struct MessageDeliveryToken : public DeliveryToken
-{
- const std::string destination;
-
- MessageDeliveryToken(const std::string& d) : destination(d) {}
-};
-
-MessageMessage::MessageMessage(
- ConnectionToken* publisher, const MessageTransferBody* transfer_
-) : Message(publisher, transfer_->getDestination(),
- transfer_->getRoutingKey(),
- transfer_->getRejectUnroutable(),
- transfer_->getImmediate()),
- transfer(*transfer_)
-{
- assert(transfer.getBody().isInline());
-}
-
-MessageMessage::MessageMessage(
- ConnectionToken* publisher, const MessageTransferBody* transfer_,
- ReferencePtr reference_
-) : Message(publisher, transfer_->getDestination(),
- transfer_->getRoutingKey(),
- transfer_->getRejectUnroutable(),
- transfer_->getImmediate()),
- transfer(*transfer_),
- reference(reference_)
-{
- assert(!transfer.getBody().isInline());
- assert(reference_);
-}
-
-/**
- * Currently used by message store impls to recover messages
- */
-MessageMessage::MessageMessage() {}
-
-// TODO: astitcher 1-Mar-2007: This code desperately needs better factoring
-void MessageMessage::transferMessage(
- framing::ChannelAdapter& channel,
- const std::string& consumerTag,
- uint32_t framesize)
-{
- const framing::Content& body = transfer.getBody();
- // Send any reference data
- ReferencePtr ref= getReference();
- if (ref){
-
- // Open
- channel.send(MessageOpenBody(channel.getVersion(), ref->getId()));
- // Appends
- for(Reference::Appends::const_iterator a = ref->getAppends().begin();
- a != ref->getAppends().end();
- ++a) {
- uint32_t sizeleft = a->size();
- const string& content = a->getBytes();
- // Calculate overhead bytes
- // Assume that the overhead is constant as the reference name doesn't change
- uint32_t overhead = sizeleft - content.size();
- string::size_type contentStart = 0;
- while (sizeleft) {
- string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead;
-
- channel.send(MessageAppendBody(channel.getVersion(), ref->getId(),
- string(content, contentStart, contentSize)));
- sizeleft -= contentSize;
- contentStart += contentSize;
- }
- }
- }
-
- // The transfer
- if ( transfer.size()<=framesize ) {
- channel.send(MessageTransferBody(ProtocolVersion(),
- transfer.getTicket(),
- consumerTag,
- getRedelivered(),
- transfer.getRejectUnroutable(),
- transfer.getImmediate(),
- transfer.getTtl(),
- transfer.getPriority(),
- transfer.getTimestamp(),
- transfer.getDeliveryMode(),
- transfer.getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer.getMessageId(),
- transfer.getCorrelationId(),
- transfer.getReplyTo(),
- transfer.getContentType(),
- transfer.getContentEncoding(),
- 0, /*content-length*/
- string(), /*type*/
- transfer.getUserId(),
- transfer.getAppId(),
- transfer.getTransactionId(),
- transfer.getSecurityToken(),
- transfer.getApplicationHeaders(),
- body));
- } else {
- // Thing to do here is to construct a simple reference message then deliver that instead
- // fragmentation will be taken care of in the delivery if necessary;
- string content = body.getValue();
- string refname = "dummy";
- MessageTransferBody newTransfer(channel.getVersion(),
- transfer.getTicket(),
- consumerTag,
- getRedelivered(),
- transfer.getRejectUnroutable(),
- transfer.getImmediate(),
- transfer.getTtl(),
- transfer.getPriority(),
- transfer.getTimestamp(),
- transfer.getDeliveryMode(),
- transfer.getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer.getMessageId(),
- transfer.getCorrelationId(),
- transfer.getReplyTo(),
- transfer.getContentType(),
- transfer.getContentEncoding(),
- 0, /*content-length*/
- string(), /*type*/
- transfer.getUserId(),
- transfer.getAppId(),
- transfer.getTransactionId(),
- transfer.getSecurityToken(),
- transfer.getApplicationHeaders(),
- framing::Content(REFERENCE, refname));
- ReferencePtr newRef(new Reference(refname));
- newRef->append(MessageAppendBody(channel.getVersion(), refname, content));
- MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), &newTransfer, newRef);
- newMsg.transferMessage(channel, consumerTag, framesize);
- return;
- }
- // Close any reference data
- if (ref)
- channel.send(MessageCloseBody(ProtocolVersion(), ref->getId()));
-}
-
-
-void MessageMessage::deliver(ChannelAdapter& channel, DeliveryId, DeliveryToken::shared_ptr token, uint32_t framesize)
-{
- transferMessage(channel, shared_polymorphic_cast<MessageDeliveryToken>(token)->destination, framesize);
-}
-
-void MessageMessage::deliver(ChannelAdapter& channel, const std::string& destination, uint32_t framesize)
-{
- transferMessage(channel, destination, framesize);
-}
-
-bool MessageMessage::isComplete()
-{
- return true;
-}
-
-uint64_t MessageMessage::contentSize() const
-{
- if (transfer.getBody().isInline())
- return transfer.getBody().getValue().size();
- else {
- assert(getReference());
- return getReference()->getSize();
- }
-}
-
-qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties()
-{
- return 0; // FIXME aconway 2007-02-05:
-}
-
-const FieldTable& MessageMessage::getApplicationHeaders()
-{
- return transfer.getApplicationHeaders();
-}
-bool MessageMessage::isPersistent()
-{
- return transfer.getDeliveryMode() == PERSISTENT;
-}
-
-uint32_t MessageMessage::encodedSize() const
-{
- return encodedHeaderSize() + encodedContentSize();
-}
-
-uint32_t MessageMessage::encodedHeaderSize() const
-{
- return RecoveryManagerImpl::encodedMessageTypeSize() + transfer.size();
-}
-
-uint32_t MessageMessage::encodedContentSize() const
-{
- return 0;
-}
-
-uint64_t MessageMessage::expectedContentSize()
-{
- return 0;
-}
-
-void MessageMessage::encode(Buffer& buffer) const
-{
- encodeHeader(buffer);
-}
-
-void MessageMessage::encodeHeader(Buffer& buffer) const
-{
- RecoveryManagerImpl::encodeMessageType(*this, buffer);
- if (transfer.getBody().isInline()) {
- transfer.encode(buffer);
- } else {
- assert(getReference());
- string data;
- const Reference::Appends& appends = getReference()->getAppends();
- for(Reference::Appends::const_iterator a = appends.begin(); a != appends.end(); ++a) {
- data += a->getBytes();
- }
- framing::Content body(INLINE, data);
- copyTransfer(ProtocolVersion(), transfer.getDestination(), body).encode(buffer);
- }
-}
-
-void MessageMessage::decodeHeader(Buffer& buffer)
-{
- //don't care about the type here, but want encode/decode to be symmetric
- RecoveryManagerImpl::decodeMessageType(buffer);
- transfer.decode(buffer);
-}
-
-void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/)
-{
-}
-
-
-MessageTransferBody MessageMessage::copyTransfer(const ProtocolVersion& version,
- const string& destination,
- const framing::Content& body) const
-{
- return MessageTransferBody(version,
- transfer.getTicket(),
- destination,
- getRedelivered(),
- transfer.getRejectUnroutable(),
- transfer.getImmediate(),
- transfer.getTtl(),
- transfer.getPriority(),
- transfer.getTimestamp(),
- transfer.getDeliveryMode(),
- transfer.getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer.getMessageId(),
- transfer.getCorrelationId(),
- transfer.getReplyTo(),
- transfer.getContentType(),
- transfer.getContentEncoding(),
- 0, /*content-length*/
- string(), /*type*/
- transfer.getUserId(),
- transfer.getAppId(),
- transfer.getTransactionId(),
- transfer.getSecurityToken(),
- transfer.getApplicationHeaders(),
- body);
-}
-
-MessageMessage::ReferencePtr MessageMessage::getReference() const {
- return reference;
-}
-
-uint32_t MessageMessage::getRequiredCredit() const
-{
- //TODO: change when encoding changes. Should be the payload of any
- //header & body frames.
- return transfer.size();
-}
-
-
-DeliveryToken::shared_ptr MessageMessage::getToken(const std::string& destination)
-{
- return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination));
-}
-
-}} // namespace qpid::broker
-
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.h b/cpp/src/qpid/broker/BrokerMessageMessage.h
deleted file mode 100644
index 6bfd0e045d..0000000000
--- a/cpp/src/qpid/broker/BrokerMessageMessage.h
+++ /dev/null
@@ -1,90 +0,0 @@
-#ifndef _broker_BrokerMessageMessage_h
-#define _broker_BrokerMessageMessage_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "BrokerMessageBase.h"
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/amqp_types.h"
-#include <boost/weak_ptr.hpp>
-#include <vector>
-
-namespace qpid {
-
-namespace broker {
-class ConnectionToken;
-class Reference;
-
-class MessageMessage: public Message{
- public:
- typedef boost::shared_ptr<MessageMessage> shared_ptr;
- typedef boost::shared_ptr<Reference> ReferencePtr;
-
- MessageMessage(ConnectionToken* publisher, const framing::MessageTransferBody* transfer);
- MessageMessage(ConnectionToken* publisher, const framing::MessageTransferBody* transfer, ReferencePtr reference);
- MessageMessage();
-
- // Default destructor okay
-
- framing::MessageTransferBody* getTransfer() const { return const_cast<framing::MessageTransferBody*>(&transfer); }
- ReferencePtr getReference() const ;
-
- void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize);
- void deliver(framing::ChannelAdapter&, const std::string& destination, uint32_t framesize);
-
- bool isComplete();
-
- uint64_t contentSize() const;
- framing::BasicHeaderProperties* getHeaderProperties();
- const framing::FieldTable& getApplicationHeaders();
- bool isPersistent();
-
- void encode(framing::Buffer& buffer) const;
- void encodeHeader(framing::Buffer& buffer) const;
- uint32_t encodedSize() const;
- uint32_t encodedHeaderSize() const;
- uint32_t encodedContentSize() const;
- uint64_t expectedContentSize();
- void decodeHeader(framing::Buffer& buffer);
- void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0);
- uint32_t getRequiredCredit() const;
-
- static DeliveryToken::shared_ptr getToken(const std::string& destination);
-
- private:
- void transferMessage(
- framing::ChannelAdapter& channel,
- const std::string& consumerTag,
- uint32_t framesize);
-
- framing::MessageTransferBody copyTransfer(
- const framing::ProtocolVersion& version,
- const std::string& destination,
- const framing::Content& body) const;
-
- framing::MessageTransferBody transfer;
- const boost::shared_ptr<Reference> reference;
-};
-
-}}
-
-
-#endif /*!_broker_BrokerMessage_h*/
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp
index 5ff9f950eb..7311d043d0 100644
--- a/cpp/src/qpid/broker/BrokerQueue.cpp
+++ b/cpp/src/qpid/broker/BrokerQueue.cpp
@@ -88,7 +88,7 @@ void Queue::deliver(Message::shared_ptr& msg){
void Queue::recover(Message::shared_ptr& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
- if (store && msg->expectedContentSize() != msg->encodedContentSize()) {
+ if (store && !msg->isContentLoaded()) {
//content has not been loaded, need to ensure that lazy loading mode is set:
//TODO: find a nicer way to do this
msg->releaseContent(store);
diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h
index 962c11d8ee..5ba103d3ed 100644
--- a/cpp/src/qpid/broker/BrokerQueue.h
+++ b/cpp/src/qpid/broker/BrokerQueue.h
@@ -28,7 +28,7 @@
#include "qpid/framing/amqp_types.h"
#include "ConnectionToken.h"
#include "Consumer.h"
-#include "BrokerMessage.h"
+#include "Message.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Serializer.h"
#include "qpid/sys/Monitor.h"
@@ -43,6 +43,7 @@ namespace qpid {
namespace broker {
class MessageStore;
class QueueRegistry;
+ class TransactionContext;
class Exchange;
/**
diff --git a/cpp/src/qpid/broker/CompletionHandler.h b/cpp/src/qpid/broker/CompletionHandler.h
deleted file mode 100644
index 9d51656282..0000000000
--- a/cpp/src/qpid/broker/CompletionHandler.h
+++ /dev/null
@@ -1,39 +0,0 @@
-#ifndef _broker_CompletionHandler_h
-#define _broker_CompletionHandler_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-namespace qpid {
-namespace broker {
-
-/**
- * Callback interface to handle completion of a message.
- */
-class CompletionHandler
-{
- public:
- virtual ~CompletionHandler(){}
- virtual void complete(Message::shared_ptr) = 0;
-};
-
-}} // namespace qpid::broker
-
-
-
-#endif /*!_broker_CompletionHandler_h*/
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h
index d0c397d184..dc229947b9 100644
--- a/cpp/src/qpid/broker/Consumer.h
+++ b/cpp/src/qpid/broker/Consumer.h
@@ -21,7 +21,7 @@
#ifndef _Consumer_
#define _Consumer_
-#include "BrokerMessage.h"
+#include "Message.h"
namespace qpid {
namespace broker {
diff --git a/cpp/src/qpid/broker/Content.h b/cpp/src/qpid/broker/Content.h
deleted file mode 100644
index 97dce0d3f7..0000000000
--- a/cpp/src/qpid/broker/Content.h
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _Content_
-#define _Content_
-
-#include <boost/function.hpp>
-
-#include "qpid/framing/AMQContentBody.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/framing/OutputHandler.h"
-
-namespace qpid {
-
-namespace framing {
-class ChannelAdapter;
-}
-
-namespace broker {
-class Content{
- public:
- typedef std::string DataBlock;
- typedef boost::function1<void, const DataBlock&> SendFn;
-
- virtual ~Content(){}
-
- /** Add a block of data to the content */
- virtual void add(framing::AMQContentBody* data) = 0;
-
- /** Total size of content in bytes */
- virtual uint32_t size() = 0;
-
- /**
- * Iterate over the content calling SendFn for each block.
- * Subdivide blocks if necessary to ensure each block is
- * <= framesize bytes long.
- */
- virtual void send(framing::ChannelAdapter& channel, uint32_t framesize) = 0;
-
- //FIXME aconway 2007-02-07: This is inconsistently implemented
- //find out what is needed.
- virtual void encode(qpid::framing::Buffer& buffer) = 0;
-};
-}}
-
-
-#endif
diff --git a/cpp/src/qpid/broker/DeliverableMessage.h b/cpp/src/qpid/broker/DeliverableMessage.h
index e8c4f5ba19..9719d972fc 100644
--- a/cpp/src/qpid/broker/DeliverableMessage.h
+++ b/cpp/src/qpid/broker/DeliverableMessage.h
@@ -22,8 +22,8 @@
#define _DeliverableMessage_
#include "Deliverable.h"
-#include "BrokerMessage.h"
#include "BrokerQueue.h"
+#include "Message.h"
namespace qpid {
namespace broker {
diff --git a/cpp/src/qpid/broker/DeliveryAdapter.h b/cpp/src/qpid/broker/DeliveryAdapter.h
index d59c4769d7..f645b37c23 100644
--- a/cpp/src/qpid/broker/DeliveryAdapter.h
+++ b/cpp/src/qpid/broker/DeliveryAdapter.h
@@ -21,9 +21,9 @@
#ifndef _DeliveryAdapter_
#define _DeliveryAdapter_
-#include "BrokerMessageBase.h"
#include "DeliveryId.h"
#include "DeliveryToken.h"
+#include "Message.h"
#include "qpid/framing/amqp_types.h"
namespace qpid {
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
index 745a246c78..a1f82cb757 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -25,10 +25,10 @@
#include <list>
#include <ostream>
#include "AccumulatedAck.h"
-#include "BrokerMessage.h"
-#include "Prefetch.h"
#include "BrokerQueue.h"
#include "DeliveryId.h"
+#include "Message.h"
+#include "Prefetch.h"
namespace qpid {
namespace broker {
diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h
index 554be295bf..7b20bd610c 100644
--- a/cpp/src/qpid/broker/DirectExchange.h
+++ b/cpp/src/qpid/broker/DirectExchange.h
@@ -25,7 +25,6 @@
#include <vector>
#include "BrokerExchange.h"
#include "qpid/framing/FieldTable.h"
-#include "BrokerMessage.h"
#include "qpid/sys/Monitor.h"
#include "BrokerQueue.h"
diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h
index 3cbffc6f2f..070e438bcc 100644
--- a/cpp/src/qpid/broker/FanOutExchange.h
+++ b/cpp/src/qpid/broker/FanOutExchange.h
@@ -25,7 +25,6 @@
#include <vector>
#include "BrokerExchange.h"
#include "qpid/framing/FieldTable.h"
-#include "BrokerMessage.h"
#include "qpid/sys/Monitor.h"
#include "BrokerQueue.h"
diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h
index a99cc1c92c..48d115c1ec 100644
--- a/cpp/src/qpid/broker/HeadersExchange.h
+++ b/cpp/src/qpid/broker/HeadersExchange.h
@@ -24,7 +24,6 @@
#include <vector>
#include "BrokerExchange.h"
#include "qpid/framing/FieldTable.h"
-#include "BrokerMessage.h"
#include "qpid/sys/Monitor.h"
#include "BrokerQueue.h"
diff --git a/cpp/src/qpid/broker/InMemoryContent.cpp b/cpp/src/qpid/broker/InMemoryContent.cpp
deleted file mode 100644
index d69dcfafe7..0000000000
--- a/cpp/src/qpid/broker/InMemoryContent.cpp
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "InMemoryContent.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/ChannelAdapter.h"
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using boost::static_pointer_cast;
-
-void InMemoryContent::add(AMQContentBody* data)
-{
- content.push_back(*data);
-}
-
-uint32_t InMemoryContent::size()
-{
- int sum(0);
- for (content_iterator i = content.begin(); i != content.end(); i++) {
- sum += i->size();
- }
- return sum;
-}
-
-void InMemoryContent::send(ChannelAdapter& channel, uint32_t framesize)
-{
- for (content_iterator i = content.begin(); i != content.end(); i++) {
- if (i->size() > framesize) {
- uint32_t offset = 0;
- for (int chunk = i->size() / framesize; chunk > 0; chunk--) {
- string data = i->getData().substr(offset, framesize);
- channel.send(AMQContentBody(data));
- offset += framesize;
- }
- uint32_t remainder = i->size() % framesize;
- if (remainder) {
- string data = i->getData().substr(offset, remainder);
- channel.send(AMQContentBody(data));
- }
- } else {
- channel.send(*i);
- }
- }
-}
-
-void InMemoryContent::encode(Buffer& buffer)
-{
- for (content_iterator i = content.begin(); i != content.end(); i++) {
- i->encode(buffer);
- }
-}
-
diff --git a/cpp/src/qpid/broker/InMemoryContent.h b/cpp/src/qpid/broker/InMemoryContent.h
deleted file mode 100644
index a6fca7ca98..0000000000
--- a/cpp/src/qpid/broker/InMemoryContent.h
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _InMemoryContent_
-#define _InMemoryContent_
-
-#include "Content.h"
-#include "qpid/framing/AMQContentBody.h"
-#include <vector>
-
-
-namespace qpid {
- namespace broker {
- class InMemoryContent : public Content{
- typedef std::vector<framing::AMQContentBody> content_list;
- typedef content_list::iterator content_iterator;
-
- content_list content;
- public:
- void add(framing::AMQContentBody* data);
- uint32_t size();
- void send(framing::ChannelAdapter&, uint32_t framesize);
- void encode(framing::Buffer& buffer);
- };
- }
-}
-
-
-#endif
diff --git a/cpp/src/qpid/broker/LazyLoadedContent.cpp b/cpp/src/qpid/broker/LazyLoadedContent.cpp
deleted file mode 100644
index b8b5b37f45..0000000000
--- a/cpp/src/qpid/broker/LazyLoadedContent.cpp
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "LazyLoadedContent.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/ChannelAdapter.h"
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-
-LazyLoadedContent::~LazyLoadedContent()
-{
- store->destroy(*msg);
-}
-
-LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, Message* const _msg, uint64_t _expectedSize) :
- store(_store), msg(_msg), expectedSize(_expectedSize) {}
-
-void LazyLoadedContent::add(AMQContentBody* data)
-{
- store->appendContent(*msg, data->getData());
-}
-
-uint32_t LazyLoadedContent::size()
-{
- return 0;//all content is written as soon as it is added
-}
-
-void LazyLoadedContent::send(ChannelAdapter& channel, uint32_t framesize)
-{
- if (expectedSize > framesize) {
- for (uint64_t offset = 0; offset < expectedSize; offset += framesize)
- {
- uint64_t remaining = expectedSize - offset;
- string data;
- store->loadContent(*msg, data, offset,
- remaining > framesize ? framesize : remaining);
- channel.send(AMQContentBody(data));
- }
- } else {
- string data;
- store->loadContent(*msg, data, 0, expectedSize);
- channel.send(AMQContentBody(data));
- }
-}
-
-void LazyLoadedContent::encode(Buffer&)
-{
- //do nothing as all content is written as soon as it is added
-}
-
diff --git a/cpp/src/qpid/broker/LazyLoadedContent.h b/cpp/src/qpid/broker/LazyLoadedContent.h
deleted file mode 100644
index 79a33ed7a9..0000000000
--- a/cpp/src/qpid/broker/LazyLoadedContent.h
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _LazyLoadedContent_
-#define _LazyLoadedContent_
-
-#include "Content.h"
-#include "MessageStore.h"
-#include "BrokerMessageBase.h"
-
-namespace qpid {
- namespace broker {
- class LazyLoadedContent : public Content{
- MessageStore* const store;
- Message* const msg;
- const uint64_t expectedSize;
- public:
- LazyLoadedContent(
- MessageStore* const store, Message* const msg,
- uint64_t expectedSize);
- ~LazyLoadedContent();
- void add(qpid::framing::AMQContentBody* data);
- uint32_t size();
- void send(
- framing::ChannelAdapter&,
- uint32_t framesize);
- void encode(qpid::framing::Buffer& buffer);
- };
- }
-}
-
-
-#endif
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
new file mode 100644
index 0000000000..e5f92297b7
--- /dev/null
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -0,0 +1,195 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Message.h"
+#include "ExchangeRegistry.h"
+#include "qpid/framing/frame_functors.h"
+#include "qpid/framing/BasicPublishBody.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/SendContent.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/TypeFilter.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using std::string;
+
+TransferAdapter Message::TRANSFER;
+PublishAdapter Message::PUBLISH;
+
+Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), publisher(0), store(0), adapter(0) {}
+
+const std::string& Message::getRoutingKey() const
+{
+ return getAdapter().getRoutingKey(frames);
+}
+
+const std::string& Message::getExchangeName() const
+{
+ return getAdapter().getExchange(frames);
+}
+
+const boost::shared_ptr<Exchange> Message::getExchange(ExchangeRegistry& registry) const
+{
+ if (!exchange) {
+ exchange = registry.get(getExchangeName());
+ }
+ return exchange;
+}
+
+bool Message::isImmediate() const
+{
+ return getAdapter().isImmediate(frames);
+}
+
+const FieldTable& Message::getApplicationHeaders() const
+{
+ return getAdapter().getApplicationHeaders(frames);
+}
+
+bool Message::isPersistent()
+{
+ return getAdapter().isPersistent(frames);
+}
+
+uint32_t Message::getRequiredCredit() const
+{
+ //add up payload for all header and content frames in the frameset
+ SumBodySize sum;
+ frames.map_if(sum, TypeFilter(METHOD_BODY, HEADER_BODY));
+ return sum.getSize();
+}
+
+void Message::encode(framing::Buffer& buffer) const
+{
+ //encode method and header frames
+ EncodeFrame f1(buffer);
+ frames.map_if(f1, TypeFilter(METHOD_BODY, HEADER_BODY));
+
+ //then encode the payload of each content frame
+ EncodeBody f2(buffer);
+ frames.map_if(f2, TypeFilter(CONTENT_BODY));
+}
+
+uint32_t Message::encodedSize() const
+{
+ return encodedHeaderSize() + encodedContentSize();
+}
+
+uint32_t Message::encodedContentSize() const
+{
+ return frames.getContentSize();
+}
+
+uint32_t Message::encodedHeaderSize() const
+{
+ //add up the size for all method and header frames in the frameset
+ SumFrameSize sum;
+ frames.map_if(sum, TypeFilter(METHOD_BODY, HEADER_BODY));
+ return sum.getSize();
+}
+
+void Message::decodeHeader(framing::Buffer& buffer)
+{
+ AMQFrame method;
+ method.decode(buffer);
+ frames.append(method);
+
+ AMQFrame header;
+ header.decode(buffer);
+ frames.append(header);
+}
+
+void Message::decodeContent(framing::Buffer& buffer)
+{
+ //get the data as a string and set that as the content
+ //body on a frame then add that frame to the frameset
+ AMQFrame frame;
+ frame.setBody(AMQContentBody());
+ frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
+ frames.append(frame);
+}
+
+void Message::releaseContent(MessageStore* _store)
+{
+ store = _store;
+ if (!getPersistenceId()) {
+ store->stage(*this);
+ }
+ //remove any content frames from the frameset
+ frames.remove(TypeFilter(CONTENT_BODY));
+}
+
+void Message::sendContent(framing::FrameHandler& out, uint16_t channel, uint16_t maxFrameSize)
+{
+ if (isContentReleased()) {
+ //load content from store in chunks of maxContentSize
+ uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
+ uint64_t expectedSize(frames.getHeaders()->getContentLength());//TODO: how do we know how much data to load?
+ for (uint64_t offset = 0; offset < expectedSize; offset += maxContentSize)
+ {
+ uint64_t remaining = expectedSize - offset;
+ AMQFrame frame(channel, AMQContentBody());
+ string& data = frame.castBody<AMQContentBody>()->getData();
+
+ store->loadContent(*this, data, offset,
+ remaining > maxContentSize ? maxContentSize : remaining);
+ out.handle(frame);
+ }
+
+ } else {
+ SendContent f(out, channel, maxFrameSize);
+ frames.map_if(f, TypeFilter(CONTENT_BODY));
+ }
+}
+
+void Message::sendHeader(framing::FrameHandler& out, uint16_t channel, uint16_t /*maxFrameSize*/)
+{
+ Relay f(out, channel);
+ frames.map_if(f, TypeFilter(HEADER_BODY));
+}
+
+MessageAdapter& Message::getAdapter() const
+{
+ if (!adapter) {
+ if (frames.isA<BasicPublishBody>()) {
+ adapter = &PUBLISH;
+ } else if(frames.isA<MessageTransferBody>()) {
+ adapter = &TRANSFER;
+ } else {
+ const AMQMethodBody* method = frames.getMethod();
+ if (!method) throw Exception("Can't adapt message with no method");
+ else throw Exception(QPID_MSG("Can't adapt message based on " << *method));
+ }
+ }
+ return *adapter;
+}
+
+uint64_t Message::contentSize() const
+{
+ return frames.getContentSize();
+}
+
+bool Message::isContentLoaded() const
+{
+ return contentSize() > 0;
+}
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
new file mode 100644
index 0000000000..95b3f38b55
--- /dev/null
+++ b/cpp/src/qpid/broker/Message.h
@@ -0,0 +1,139 @@
+#ifndef _broker_Message_h
+#define _broker_Message_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <boost/shared_ptr.hpp>
+#include <boost/variant.hpp>
+#include "PersistableMessage.h"
+#include "MessageAdapter.h"
+#include "qpid/framing/amqp_types.h"
+
+namespace qpid {
+
+namespace framing {
+class FieldTable;
+class SequenceNumber;
+}
+
+namespace broker {
+class ConnectionToken;
+class Exchange;
+class ExchangeRegistry;
+class MessageStore;
+
+class Message : public PersistableMessage {
+public:
+ typedef boost::shared_ptr<Message> shared_ptr;
+
+ Message(const framing::SequenceNumber& id = framing::SequenceNumber());
+
+ uint64_t getPersistenceId() const { return persistenceId; }
+ void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
+
+ bool getRedelivered() const { return redelivered; }
+ void redeliver() { redelivered = true; }
+
+ const ConnectionToken* getPublisher() const { return publisher; }
+ void setPublisher(ConnectionToken* p) { publisher = p; }
+
+ uint64_t contentSize() const;
+
+ const std::string& getRoutingKey() const;
+ const boost::shared_ptr<Exchange> getExchange(ExchangeRegistry&) const;
+ const std::string& getExchangeName() const;
+ bool isImmediate() const;
+ const framing::FieldTable& getApplicationHeaders() const;
+ bool isPersistent();
+
+ framing::FrameSet& getFrames() { return frames; }
+ const framing::FrameSet& getFrames() const { return frames; }
+
+ template <class T> T* getProperties() {
+ return frames.getHeaders()->get<T>(true);
+ }
+
+ template <class T> const T* getProperties() const {
+ return frames.getHeaders()->get<T>();
+ }
+
+ template <class T> const T* getMethod() const {
+ return frames.as<T>();
+ }
+
+ template <class T> bool isA() const {
+ return frames.isA<T>();
+ }
+
+ uint32_t getRequiredCredit() const;
+
+ void encode(framing::Buffer& buffer) const;
+
+ /**
+ * @returns the size of the buffer needed to encode this
+ * message in its entirety
+ */
+ uint32_t encodedSize() const;
+ /**
+ * @returns the size of the buffer needed to encode the
+ * 'header' of this message (not just the header frame,
+ * but other meta data e.g.routing key and exchange)
+ */
+ uint32_t encodedHeaderSize() const;
+ uint32_t encodedContentSize() const;
+
+ void decodeHeader(framing::Buffer& buffer);
+ void decodeContent(framing::Buffer& buffer);
+
+ /**
+ * Releases the in-memory content data held by this
+ * message. Must pass in a store from which the data can
+ * be reloaded.
+ */
+ void releaseContent(MessageStore* store);
+
+ void sendContent(framing::FrameHandler& out, uint16_t channel, uint16_t maxFrameSize);
+ void sendHeader(framing::FrameHandler& out, uint16_t channel, uint16_t maxFrameSize);
+
+ bool isContentLoaded() const;
+
+ private:
+ framing::FrameSet frames;
+ mutable boost::shared_ptr<Exchange> exchange;
+ mutable uint64_t persistenceId;
+ bool redelivered;
+ ConnectionToken* publisher;
+ MessageStore* store;
+ mutable MessageAdapter* adapter;
+
+ static TransferAdapter TRANSFER;
+ static PublishAdapter PUBLISH;
+
+ MessageAdapter& getAdapter() const;
+ bool isContentReleased() { return store; }
+};
+
+}}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/MessageAdapter.h b/cpp/src/qpid/broker/MessageAdapter.h
new file mode 100644
index 0000000000..0b2dc6307a
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageAdapter.h
@@ -0,0 +1,108 @@
+#ifndef _broker_MessageAdapter_h
+#define _broker_MessageAdapter_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include "qpid/framing/BasicPublishBody.h"
+#include "qpid/framing/BasicHeaderProperties.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/MessageTransferBody.h"
+
+namespace qpid {
+namespace broker {
+
+struct MessageAdapter
+{
+ virtual ~MessageAdapter() {}
+
+ virtual const std::string& getRoutingKey(const framing::FrameSet& f) = 0;
+ virtual const std::string& getExchange(const framing::FrameSet& f) = 0;
+ virtual bool isImmediate(const framing::FrameSet& f) = 0;
+ virtual const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f) = 0;
+ virtual bool isPersistent(const framing::FrameSet& f) = 0;
+};
+
+struct PublishAdapter : MessageAdapter
+{
+ const std::string& getRoutingKey(const framing::FrameSet& f)
+ {
+ return f.as<framing::BasicPublishBody>()->getRoutingKey();
+ }
+
+ const std::string& getExchange(const framing::FrameSet& f)
+ {
+ return f.as<framing::BasicPublishBody>()->getExchange();
+ }
+
+ bool isImmediate(const framing::FrameSet& f)
+ {
+ return f.as<framing::BasicPublishBody>()->getImmediate();
+ }
+
+ const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f)
+ {
+ return f.getHeaders()->get<framing::BasicHeaderProperties>()->getHeaders();
+ }
+
+ bool isPersistent(const framing::FrameSet& f)
+ {
+ return f.getHeaders()->get<framing::BasicHeaderProperties>()->getDeliveryMode() == 2;
+ }
+};
+
+struct TransferAdapter : MessageAdapter
+{
+ const std::string& getRoutingKey(const framing::FrameSet& f)
+ {
+ return f.getHeaders()->get<framing::DeliveryProperties>()->getRoutingKey();
+ }
+
+ const std::string& getExchange(const framing::FrameSet& f)
+ {
+ return f.as<framing::MessageTransferBody>()->getDestination();
+ }
+
+ bool isImmediate(const framing::FrameSet&)
+ {
+ //TODO: we seem to have lost the immediate flag
+ return false;
+ }
+
+ const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f)
+ {
+ return f.getHeaders()->get<framing::MessageProperties>()->getApplicationHeaders();
+ }
+
+ bool isPersistent(const framing::FrameSet& f)
+ {
+ return f.getHeaders()->get<framing::DeliveryProperties>()->getDeliveryMode() == 2;
+ }
+};
+
+}}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp
index f19927b708..1a84aa9b65 100644
--- a/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -20,55 +20,64 @@
*/
#include "MessageBuilder.h"
-#include "InMemoryContent.h"
-#include "LazyLoadedContent.h"
+#include "Message.h"
+#include "MessageStore.h"
+#include "qpid/Exception.h"
+#include "qpid/framing/AMQFrame.h"
using namespace qpid::broker;
using namespace qpid::framing;
-using std::auto_ptr;
-MessageBuilder::MessageBuilder(CompletionHandler* _handler,
- MessageStore* const _store,
- uint64_t _stagingThreshold
-) :
- handler(_handler),
- store(_store),
- stagingThreshold(_stagingThreshold)
-{}
+MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThreshold) :
+ state(DORMANT), store(_store), stagingThreshold(_stagingThreshold), staging(false) {}
-void MessageBuilder::route(){
- if (message->isComplete()) {
- if (handler) handler->complete(message);
- message.reset();
+void MessageBuilder::handle(AMQFrame& frame)
+{
+ switch(state) {
+ case METHOD:
+ checkType(METHOD_BODY, frame.getBody()->type());
+ state = HEADER;
+ break;
+ case HEADER:
+ checkType(HEADER_BODY, frame.getBody()->type());
+ state = CONTENT;
+ break;
+ case CONTENT:
+ checkType(CONTENT_BODY, frame.getBody()->type());
+ break;
+ default:
+ throw ConnectionException(504, "Invalid frame sequence for message.");
+ }
+ if (staging) {
+ store->appendContent(*message, frame.castBody<AMQContentBody>()->getData());
+ } else {
+ message->getFrames().append(frame);
+ //have we reached the staging limit? if so stage message and release content
+ if (state == CONTENT && stagingThreshold && message->getFrames().getContentSize() >= stagingThreshold) {
+ store->stage(*message);
+ message->releaseContent(store);
+ staging = true;
+ }
}
}
-void MessageBuilder::initialise(Message::shared_ptr& msg){
- if(message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed.");
+void MessageBuilder::checkType(uint8_t expected, uint8_t actual)
+{
+ if (expected != actual) {
+ throw ConnectionException(504, "Invalid frame sequence for message.");
}
- message = msg;
}
-void MessageBuilder::setHeader(AMQHeaderBody* header){
- if(!message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish.");
- }
- message->setHeader(header);
- if (stagingThreshold && header->getContentSize() >= stagingThreshold) {
- store->stage(*message);
- message->releaseContent(store);
- } else {
- auto_ptr<Content> content(new InMemoryContent());
- message->setContent(content);
- }
- route();
+void MessageBuilder::end()
+{
+ message.reset();
+ state = DORMANT;
+ staging = false;
}
-void MessageBuilder::addContent(AMQContentBody* content){
- if(!message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish.");
- }
- message->addContent(content);
- route();
+void MessageBuilder::start(const SequenceNumber& id)
+{
+ message = Message::shared_ptr(new Message(id));
+ state = METHOD;
+ staging = false;
}
diff --git a/cpp/src/qpid/broker/MessageBuilder.h b/cpp/src/qpid/broker/MessageBuilder.h
index 18e85d7383..134f93b68f 100644
--- a/cpp/src/qpid/broker/MessageBuilder.h
+++ b/cpp/src/qpid/broker/MessageBuilder.h
@@ -21,37 +21,35 @@
#ifndef _MessageBuilder_
#define _MessageBuilder_
-#include <memory>
-#include "qpid/QpidError.h"
-#include "BrokerExchange.h"
-#include "BrokerMessage.h"
-#include "MessageStore.h"
-#include "qpid/framing/AMQContentBody.h"
-#include "qpid/framing/AMQHeaderBody.h"
-#include "qpid/framing/BasicPublishBody.h"
-#include "CompletionHandler.h"
+#include "boost/shared_ptr.hpp"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/SequenceNumber.h"
namespace qpid {
namespace broker {
- class MessageBuilder{
+ class Message;
+ class MessageStore;
+
+ class MessageBuilder : public framing::FrameHandler{
public:
- MessageBuilder(CompletionHandler* _handler,
- MessageStore* const store = 0,
- uint64_t stagingThreshold = 0);
- void initialise(Message::shared_ptr& msg);
- void setHeader(framing::AMQHeaderBody* header);
- void addContent(framing::AMQContentBody* content);
- Message::shared_ptr getMessage() { return message; }
+ MessageBuilder(MessageStore* const store = 0, uint64_t stagingThreshold = 0);
+ void handle(framing::AMQFrame& frame);
+ boost::shared_ptr<Message> getMessage() { return message; }
+ void start(const framing::SequenceNumber& id);
+ void end();
private:
- Message::shared_ptr message;
- CompletionHandler* handler;
+ enum State {DORMANT, METHOD, HEADER, CONTENT};
+ State state;
+ boost::shared_ptr<Message> message;
MessageStore* const store;
const uint64_t stagingThreshold;
+ bool staging;
- void route();
+ void checkType(uint8_t expected, uint8_t actual);
};
}
}
#endif
+
diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp
new file mode 100644
index 0000000000..09ab8ec465
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageDelivery.cpp
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MessageDelivery.h"
+
+#include "DeliveryToken.h"
+#include "Message.h"
+#include "BrokerQueue.h"
+#include "qpid/framing/ChannelAdapter.h"
+#include "qpid/framing/BasicDeliverBody.h"
+#include "qpid/framing/BasicGetOkBody.h"
+#include "qpid/framing/MessageTransferBody.h"
+
+
+using namespace boost;
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+namespace qpid{
+namespace broker{
+
+struct BaseToken : DeliveryToken
+{
+ virtual ~BaseToken() {}
+ virtual void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id) = 0;
+};
+
+struct BasicGetToken : BaseToken
+{
+ typedef boost::shared_ptr<BasicGetToken> shared_ptr;
+
+ Queue::shared_ptr queue;
+
+ BasicGetToken(Queue::shared_ptr q) : queue(q) {}
+
+ void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id)
+ {
+ channel.send(BasicGetOkBody(
+ channel.getVersion(), id.getValue(), msg->getRedelivered(), msg->getExchangeName(),
+ msg->getRoutingKey(), queue->getMessageCount()));
+
+ }
+};
+
+struct BasicConsumeToken : BaseToken
+{
+ typedef boost::shared_ptr<BasicConsumeToken> shared_ptr;
+
+ const string consumer;
+
+ BasicConsumeToken(const string c) : consumer(c) {}
+
+ void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id)
+ {
+ channel.send(BasicDeliverBody(
+ channel.getVersion(), consumer, id.getValue(),
+ msg->getRedelivered(), msg->getExchangeName(), msg->getRoutingKey()));
+ }
+
+};
+
+struct MessageDeliveryToken : BaseToken
+{
+ const std::string destination;
+ const u_int8_t confirmMode;
+ const u_int8_t acquireMode;
+
+ MessageDeliveryToken(const std::string& d, u_int8_t c, u_int8_t a) :
+ destination(d), confirmMode(c), acquireMode(a) {}
+
+ void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId /*id*/)
+ {
+ //TODO; need to figure out how the acquire mode gets
+ //communicated (this is just a temporary solution)
+ channel.send(MessageTransferBody(channel.getVersion(), 0, destination, confirmMode, acquireMode));
+
+ //may need to set the redelivered flag:
+ if (msg->getRedelivered()){
+ msg->getProperties<DeliveryProperties>()->setRedelivered(true);
+ }
+ }
+};
+
+}
+}
+
+DeliveryToken::shared_ptr MessageDelivery::getBasicGetToken(Queue::shared_ptr queue)
+{
+ return DeliveryToken::shared_ptr(new BasicGetToken(queue));
+}
+
+DeliveryToken::shared_ptr MessageDelivery::getBasicConsumeToken(const string& consumer)
+{
+ return DeliveryToken::shared_ptr(new BasicConsumeToken(consumer));
+}
+
+DeliveryToken::shared_ptr MessageDelivery::getMessageDeliveryToken(const std::string& destination,
+ u_int8_t confirmMode, u_int8_t acquireMode)
+{
+ return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode));
+}
+
+void MessageDelivery::deliver(Message::shared_ptr msg,
+ framing::ChannelAdapter& channel,
+ DeliveryId id,
+ DeliveryToken::shared_ptr token,
+ uint16_t framesize)
+{
+ //currently a message published from one class and delivered to
+ //another may well have the wrong headers; however we will only
+ //have one content class for 0-10 proper
+
+ //send method
+ boost::shared_ptr<BaseToken> t = dynamic_pointer_cast<BaseToken>(token);
+ t->sendMethod(msg, channel, id);
+
+ boost::shared_ptr<FrameHandler> handler = channel.getHandlers().out;
+ //send header
+ msg->sendHeader(*handler, channel.getId(), framesize);
+
+ //send content
+ msg->sendContent(*handler, channel.getId(), framesize);
+}
diff --git a/cpp/src/qpid/broker/MessageDelivery.h b/cpp/src/qpid/broker/MessageDelivery.h
new file mode 100644
index 0000000000..b87ef2a5ce
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageDelivery.h
@@ -0,0 +1,60 @@
+#ifndef _broker_MessageDelivery_h
+#define _broker_MessageDelivery_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/shared_ptr.hpp>
+#include "DeliveryId.h"
+
+namespace qpid {
+
+namespace framing {
+
+class ChannelAdapter;
+
+}
+
+namespace broker {
+
+class DeliveryToken;
+class Message;
+class Queue;
+
+/**
+ * Encapsulates the different options for message delivery currently supported.
+ */
+class MessageDelivery {
+public:
+ static boost::shared_ptr<DeliveryToken> getBasicGetToken(boost::shared_ptr<Queue> queue);
+ static boost::shared_ptr<DeliveryToken> getBasicConsumeToken(const std::string& consumer);
+ static boost::shared_ptr<DeliveryToken> getMessageDeliveryToken(const std::string& destination,
+ u_int8_t confirmMode,
+ u_int8_t acquireMode);
+
+ static void deliver(boost::shared_ptr<Message> msg, framing::ChannelAdapter& channel,
+ DeliveryId deliveryTag, boost::shared_ptr<DeliveryToken> token, uint16_t framesize);
+};
+
+}
+}
+
+
+#endif /*!_broker_MessageDelivery_h*/
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index ce1fa1e028..a4ceb77c12 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -22,7 +22,7 @@
#include "qpid/framing/FramingContent.h"
#include "Connection.h"
#include "Broker.h"
-#include "BrokerMessageMessage.h"
+#include "MessageDelivery.h"
#include "qpid/framing/MessageAppendBody.h"
#include "qpid/framing/MessageTransferBody.h"
#include "BrokerAdapter.h"
@@ -55,7 +55,7 @@ MessageHandlerImpl::open(const string& /*reference*/)
}
void
-MessageHandlerImpl::append(const framing::AMQMethodBody& )
+MessageHandlerImpl::append(const std::string& /*reference*/, const std::string& /*bytes*/)
{
throw ConnectionException(540, "References no longer supported");
}
@@ -92,7 +92,7 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
const string& destination,
bool noLocal,
u_int8_t confirmMode,
- u_int8_t /*acquireMode*/,//TODO: implement acquire modes
+ u_int8_t acquireMode,//TODO: implement acquire modes
bool exclusive,
const framing::FieldTable& filter )
{
@@ -101,7 +101,8 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
throw ConnectionException(530, "Consumer tags must be unique");
string tag = destination;
- channel.consume(MessageMessage::getToken(destination), tag, queue, noLocal, confirmMode == 1, exclusive, &filter);
+ channel.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
+ tag, queue, noLocal, confirmMode == 1, exclusive, &filter);
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
}
@@ -115,7 +116,7 @@ MessageHandlerImpl::get(uint16_t /*ticket*/,
{
Queue::shared_ptr queue = getQueue(queueName);
- if (channel.get(MessageMessage::getToken(destination), queue, !noAck)){
+ if (channel.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
//don't send any response... rely on execution completion
} else {
//temporarily disabled:
@@ -160,20 +161,6 @@ MessageHandlerImpl::reject(const SequenceNumberSet& /*transfers*/, uint16_t /*co
//TODO: implement
}
-void
-MessageHandlerImpl::transfer(const framing::AMQMethodBody& context)
-{
- const MessageTransferBody* transfer = boost::polymorphic_downcast<const MessageTransferBody*>(&context);
- if (transfer->getBody().isInline()) {
- MessageMessage::shared_ptr message(new MessageMessage(&connection, transfer));
- channel.handleInlineTransfer(message);
- } else {
- throw ConnectionException(540, "References no longer supported");
- }
-}
-
-
-
void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value)
{
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.h b/cpp/src/qpid/broker/MessageHandlerImpl.h
index f4d9fa0c76..35d34bf94e 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.h
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.h
@@ -23,7 +23,6 @@
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/AMQP_ClientProxy.h"
-#include "Reference.h"
#include "HandlerImpl.h"
namespace qpid {
@@ -40,7 +39,7 @@ class MessageHandlerImpl :
public:
MessageHandlerImpl(CoreRefs& parent);
- void append(const framing::AMQMethodBody& context);
+ void append(const std::string& reference, const std::string& bytes);
void cancel(const std::string& destination );
@@ -75,8 +74,6 @@ class MessageHandlerImpl :
void resume(const std::string& reference,
const std::string& identifier );
- void transfer(const framing::AMQMethodBody& context);
-
void flow(const std::string& destination, u_int8_t unit, u_int32_t value);
void flowMode(const std::string& destination, u_int8_t mode);
@@ -98,8 +95,6 @@ class MessageHandlerImpl :
bool exclusive,
const framing::FieldTable& filter);
- private:
- ReferenceRegistry references;
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h
index 0da12a1a75..1254c3890b 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.h
+++ b/cpp/src/qpid/broker/MessageStoreModule.h
@@ -21,7 +21,6 @@
#ifndef _MessageStoreModule_
#define _MessageStoreModule_
-#include "BrokerMessage.h"
#include "MessageStore.h"
#include "BrokerQueue.h"
#include "RecoveryManager.h"
diff --git a/cpp/src/qpid/broker/NameGenerator.h b/cpp/src/qpid/broker/NameGenerator.h
index affcedba41..6ea25c9797 100644
--- a/cpp/src/qpid/broker/NameGenerator.h
+++ b/cpp/src/qpid/broker/NameGenerator.h
@@ -21,7 +21,7 @@
#ifndef _NameGenerator_
#define _NameGenerator_
-#include "BrokerMessage.h"
+#include <string>
namespace qpid {
namespace broker {
diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h
index 0d5a5b55f9..95f55f21b9 100644
--- a/cpp/src/qpid/broker/NullMessageStore.h
+++ b/cpp/src/qpid/broker/NullMessageStore.h
@@ -22,7 +22,6 @@
#define _NullMessageStore_
#include <set>
-#include "BrokerMessage.h"
#include "MessageStore.h"
#include "BrokerQueue.h"
diff --git a/cpp/src/qpid/broker/PersistableExchange.h b/cpp/src/qpid/broker/PersistableExchange.h
index 9ba883cec0..683b740ddc 100644
--- a/cpp/src/qpid/broker/PersistableExchange.h
+++ b/cpp/src/qpid/broker/PersistableExchange.h
@@ -35,7 +35,7 @@ namespace broker {
class PersistableExchange : public Persistable
{
public:
- virtual std::string getName() const = 0;
+ virtual const std::string& getName() const = 0;
virtual ~PersistableExchange() {};
};
diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h
index e47ca0ae48..06fc59107e 100644
--- a/cpp/src/qpid/broker/PersistableMessage.h
+++ b/cpp/src/qpid/broker/PersistableMessage.h
@@ -34,7 +34,7 @@ namespace broker {
* The interface messages must expose to the MessageStore in order to
* be persistable.
*/
- class PersistableMessage : public Persistable
+class PersistableMessage : public Persistable
{
@@ -72,10 +72,11 @@ public:
virtual uint32_t encodedHeaderSize() const = 0;
virtual ~PersistableMessage() {};
+
PersistableMessage():
- enqueueCompleted(false),
- asyncCounter(0),
- dequeueCompleted(false){};
+ enqueueCompleted(false),
+ asyncCounter(0),
+ dequeueCompleted(false){};
inline bool isEnqueueComplete() {return enqueueCompleted;};
inline void enqueueComplete() {
diff --git a/cpp/src/qpid/broker/RecoveredDequeue.h b/cpp/src/qpid/broker/RecoveredDequeue.h
index 9e0c334dc3..9dcc9d4233 100644
--- a/cpp/src/qpid/broker/RecoveredDequeue.h
+++ b/cpp/src/qpid/broker/RecoveredDequeue.h
@@ -25,7 +25,7 @@
#include <functional>
#include <list>
#include "Deliverable.h"
-#include "BrokerMessage.h"
+#include "Message.h"
#include "MessageStore.h"
#include "BrokerQueue.h"
#include "TxOp.h"
diff --git a/cpp/src/qpid/broker/RecoveredEnqueue.h b/cpp/src/qpid/broker/RecoveredEnqueue.h
index 25c5baf364..a571343e93 100644
--- a/cpp/src/qpid/broker/RecoveredEnqueue.h
+++ b/cpp/src/qpid/broker/RecoveredEnqueue.h
@@ -25,7 +25,7 @@
#include <functional>
#include <list>
#include "Deliverable.h"
-#include "BrokerMessage.h"
+#include "Message.h"
#include "MessageStore.h"
#include "BrokerQueue.h"
#include "TxOp.h"
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index 954c50faee..29390a6452 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -20,8 +20,7 @@
*/
#include "RecoveryManagerImpl.h"
-#include "BrokerMessage.h"
-#include "BrokerMessageMessage.h"
+#include "Message.h"
#include "BrokerQueue.h"
#include "RecoveredEnqueue.h"
#include "RecoveredDequeue.h"
@@ -110,10 +109,7 @@ RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buff
{
buffer.record();
//peek at type:
- Message::shared_ptr message(decodeMessageType(buffer) == MESSAGE ?
- ((Message*) new MessageMessage()) :
- ((Message*) new BasicMessage()));
- buffer.restore();
+ Message::shared_ptr message(new Message());
message->decodeHeader(buffer);
return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold));
}
@@ -131,21 +127,6 @@ void RecoveryManagerImpl::recoveryComplete()
//TODO (finalise binding setup etc)
}
-uint8_t RecoveryManagerImpl::decodeMessageType(framing::Buffer& buffer)
-{
- return buffer.getOctet();
-}
-
-void RecoveryManagerImpl::encodeMessageType(const Message& msg, framing::Buffer& buffer)
-{
- buffer.putOctet(dynamic_cast<const MessageMessage*>(&msg) ? MESSAGE : BASIC);
-}
-
-uint32_t RecoveryManagerImpl::encodedMessageTypeSize()
-{
- return 1;
-}
-
bool RecoverableMessageImpl::loadContent(uint64_t available)
{
return !stagingThreshold || available < stagingThreshold;
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.h b/cpp/src/qpid/broker/RecoveryManagerImpl.h
index bcd71defb1..58ec63926c 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.h
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.h
@@ -45,10 +45,6 @@ namespace broker {
RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid,
std::auto_ptr<TPCTransactionContext> txn);
void recoveryComplete();
-
- static uint8_t decodeMessageType(framing::Buffer& buffer);
- static void encodeMessageType(const Message& msg, framing::Buffer& buffer);
- static uint32_t encodedMessageTypeSize();
};
diff --git a/cpp/src/qpid/broker/Reference.cpp b/cpp/src/qpid/broker/Reference.cpp
deleted file mode 100644
index 283b231b60..0000000000
--- a/cpp/src/qpid/broker/Reference.cpp
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <boost/bind.hpp>
-#include "Reference.h"
-#include "BrokerMessageMessage.h"
-#include "qpid/QpidError.h"
-#include "qpid/framing/MessageAppendBody.h"
-#include "CompletionHandler.h"
-
-namespace qpid {
-namespace broker {
-
-Reference::shared_ptr ReferenceRegistry::open(const Reference::Id& id) {
- ReferenceMap::iterator i = references.find(id);
- if (i != references.end())
- throw ConnectionException(503, "Attempt to re-open reference " +id);
- return references[id] = Reference::shared_ptr(new Reference(id, this));
-}
-
-Reference::shared_ptr ReferenceRegistry::get(const Reference::Id& id) {
- ReferenceMap::iterator i = references.find(id);
- if (i == references.end())
- throw ConnectionException(503, "Attempt to use non-existent reference "+id);
- return i->second;
-}
-
-void Reference::append(const framing::MessageAppendBody& app) {
- appends.push_back(app);
- size += app.getBytes().length();
-}
-
-void Reference::close() {
- messages.clear();
- registry->references.erase(getId());
-}
-
-}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Reference.h b/cpp/src/qpid/broker/Reference.h
deleted file mode 100644
index 5a373fbeba..0000000000
--- a/cpp/src/qpid/broker/Reference.h
+++ /dev/null
@@ -1,115 +0,0 @@
-#ifndef _broker_Reference_h
-#define _broker_Reference_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/framing/MessageAppendBody.h"
-
-#include <string>
-#include <vector>
-#include <map>
-#include <boost/shared_ptr.hpp>
-#include <boost/range.hpp>
-
-namespace qpid {
-
-namespace framing {
-class MessageAppendBody;
-}
-
-namespace broker {
-
-class MessageMessage;
-class ReferenceRegistry;
-
-// FIXME aconway 2007-03-27: Merge with client::IncomingMessage
-// to common reference handling code.
-
-/**
- * A reference is an accumulation point for data in a multi-frame
- * message. A reference can be used by multiple transfer commands to
- * create multiple messages, so the reference tracks which commands
- * are using it. When the reference is closed, all the associated
- * transfers are completed.
- *
- * THREAD UNSAFE: per-channel resource, access to channels is
- * serialized.
- */
-class Reference
-{
- public:
- typedef std::string Id;
- typedef boost::shared_ptr<Reference> shared_ptr;
- typedef boost::shared_ptr<MessageMessage> MessagePtr;
- typedef std::vector<MessagePtr> Messages;
- typedef std::vector<framing::MessageAppendBody> Appends;
-
- Reference(const Id& id_=Id(), ReferenceRegistry* reg=0)
- : id(id_), size(0), registry(reg) {}
-
- const std::string& getId() const { return id; }
- uint64_t getSize() const { return size; }
-
- /** Add a message to be completed with this reference */
- void addMessage(MessagePtr message) { messages.push_back(message); }
-
- /** Append more data to the reference */
- void append(const framing::MessageAppendBody&);
-
- /** Close the reference, complete each associated message */
- void close();
-
- const Appends& getAppends() const { return appends; }
- const Messages& getMessages() const { return messages; }
-
- private:
- Id id;
- uint64_t size;
- ReferenceRegistry* registry;
- Messages messages;
- Appends appends;
-};
-
-
-/**
- * A registry/factory for references.
- *
- * THREAD UNSAFE: per-channel resource, access to channels is
- * serialized.
- */
-class ReferenceRegistry {
- public:
- ReferenceRegistry() {};
- Reference::shared_ptr open(const Reference::Id& id);
- Reference::shared_ptr get(const Reference::Id& id);
-
- private:
- typedef std::map<Reference::Id, Reference::shared_ptr> ReferenceMap;
- ReferenceMap references;
-
- // Reference calls references.erase().
- friend class Reference;
-};
-
-
-}} // namespace qpid::broker
-
-
-
-#endif /*!_broker_Reference_h*/
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index f65e450e82..5e9106c1dd 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -20,7 +20,10 @@
*/
#include "SemanticHandler.h"
+
+#include "boost/format.hpp"
#include "BrokerAdapter.h"
+#include "MessageDelivery.h"
#include "qpid/framing/ChannelAdapter.h"
#include "qpid/framing/ChannelCloseOkBody.h"
#include "qpid/framing/ExecutionCompleteBody.h"
@@ -32,18 +35,16 @@ using namespace qpid::framing;
using namespace qpid::sys;
SemanticHandler::SemanticHandler(ChannelId id, Connection& c) :
- connection(c),
- channel(c, *this, id, &c.broker.getStore())
+ connection(c), channel(c, *this, id)
{
init(id, connection.getOutput(), connection.getVersion());
adapter = std::auto_ptr<BrokerAdapter>(new BrokerAdapter(channel, connection, connection.broker, *this));
}
-
void SemanticHandler::handle(framing::AMQFrame& frame)
{
- //TODO: assembly etc when move to 0-10 framing
- //
+ //TODO: assembly for method and headers
+
//have potentially three separate tracks at this point:
//
// (1) execution controls
@@ -51,46 +52,43 @@ void SemanticHandler::handle(framing::AMQFrame& frame)
// (3) data i.e. content-bearing commands
//
//framesets on each can be interleaved. framesets on the latter
- //two share a command-id sequence.
+ //two share a command-id sequence. controls on the first track are
+ //used to communicate details about that command-id sequence.
//
//need to decide what to do if a frame on the command track
//arrives while a frameset on the data track is still
//open. execute it (i.e. out-of order execution with respect to
- //the command id sequence) or queue it up.
+ //the command id sequence) or queue it up?
- //if ready to execute (i.e. if segment is complete or frame is
- //message content):
- handleBody(frame.getBody());
-}
-
-//ChannelAdapter virtual methods:
-void SemanticHandler::handleMethod(framing::AMQMethodBody* method)
-{
- try {
- if (!method->invoke(this)) {
- //temporary hack until channel management is moved to its own handler:
- if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
- ++(incoming.lwm);
- }
+ try{
- //else do the usual:
- handleL4(method);
- //(if the frameset is complete) we can move the execution-mark
- //forward
-
- //temporary hack until channel management is moved to its own handler:
- if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
- //TODO: need to account for async store opreations
- //when this command is a message publication
- ++(incoming.hwm);
+ TrackId track = getTrack(frame);//will be replaced by field in 0-10 frame header
+
+ switch(track) {
+ case SESSION_CONTROL_TRACK://TODO: L2 should be handled by separate handler
+ handleL2(frame.castBody<AMQMethodBody>());
+ break;
+ case EXECUTION_CONTROL_TRACK:
+ handleL3(frame.castBody<AMQMethodBody>());
+ break;
+ case MODEL_COMMAND_TRACK:
+ if (!isOpen()) {
+ throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str());
}
-
- //note: need to be more sophisticated than this if we execute
- //commands that arrive within an active message frameset (that
- //can't happen until 0-10 framing is implemented)
+ handleCommand(frame.castBody<AMQMethodBody>());
+ break;
+ case MODEL_CONTENT_TRACK:
+ handleContent(frame);
+ break;
}
+
+ }catch(const ChannelException& e){
+ adapter->getProxy().getChannel().close(e.code, e.toString(), getClassId(frame), getMethodId(frame));
+ connection.closeChannel(getId());
+ }catch(const ConnectionException& e){
+ connection.close(e.code, e.toString(), getClassId(frame), getMethodId(frame));
}catch(const std::exception& e){
- connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
+ connection.close(541/*internal error*/, e.what(), getClassId(frame), getMethodId(frame));
}
}
@@ -102,7 +100,6 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran
outgoing.lwm = mark;
//ack messages:
channel.ackCumulative(mark.getValue());
- //std::cout << "[" << this << "] acknowledged: " << mark << std::endl;
}
if (range.size() % 2) { //must be even number
throw ConnectionException(530, "Received odd number of elements in ranged mark");
@@ -116,7 +113,6 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran
void SemanticHandler::flush()
{
//flush doubles as a sync to begin with - send an execution.complete
- incoming.lwm = incoming.hwm;
if (isOpen()) {
Mutex::ScopedLock l(outLock);
ChannelAdapter::send(ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()));
@@ -142,52 +138,59 @@ void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/)
//never actually sent by client at present
}
-void SemanticHandler::handleL4(framing::AMQMethodBody* method)
+void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
{
- try{
- if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
- if (!method->isA<ChannelCloseOkBody>()) {
- std::stringstream out;
- out << "Attempt to use unopened channel: " << getId();
- throw ConnectionException(504, out.str());
- }
- } else {
- InvocationVisitor v(adapter.get());
- method->accept(v);
- if (!v.wasHandled()) {
- throw ConnectionException(540, "Not implemented");
- } else if (v.hasResult()) {
- ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult()));
- }
- }
- }catch(const ChannelException& e){
- adapter->getProxy().getChannel().close(
- e.code, e.toString(),
- method->amqpClassId(), method->amqpMethodId());
- connection.closeChannel(getId());
- }catch(const ConnectionException& e){
- connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
+ ++(incoming.lwm);
+ InvocationVisitor v(adapter.get());
+ method->accept(v);
+ //TODO: need to account for async store operations and interleaving
+ ++(incoming.hwm);
+
+ if (!v.wasHandled()) {
+ throw ConnectionException(540, "Not implemented");
+ } else if (v.hasResult()) {
+ ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult()));
}
}
-bool SemanticHandler::isOpen() const
-{
- return channel.isOpen();
+void SemanticHandler::handleL2(framing::AMQMethodBody* method)
+{
+ if(!method->isA<ChannelOpenBody>() && !isOpen()) {
+ if (!method->isA<ChannelCloseOkBody>()) {
+ throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str());
+ }
+ } else {
+ method->invoke(adapter->getChannelHandler());
+ }
}
-void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody* body)
+void SemanticHandler::handleL3(framing::AMQMethodBody* method)
{
- channel.handleHeader(body);
+ if (!method->invoke(this)) {
+ throw ConnectionException(540, "Not implemented");
+ }
}
-void SemanticHandler::handleContent(qpid::framing::AMQContentBody* body)
+void SemanticHandler::handleContent(AMQFrame& frame)
{
- channel.handleContent(body);
+ Message::shared_ptr msg(msgBuilder.getMessage());
+ if (!msg) {//start of frameset will be indicated by frame flags
+ msgBuilder.start(++(incoming.lwm));
+ msg = msgBuilder.getMessage();
+ }
+ msgBuilder.handle(frame);
+ if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags
+ msg->setPublisher(&connection);
+ channel.handle(msg);
+ msgBuilder.end();
+ //TODO: need to account for async store operations and interleaving
+ ++(incoming.hwm);
+ }
}
-void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody* body)
-{
- channel.handleHeartbeat(body);
+bool SemanticHandler::isOpen() const
+{
+ return channel.isOpen();
}
DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
@@ -195,14 +198,13 @@ DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::sha
Mutex::ScopedLock l(outLock);
SequenceNumber copy(outgoing.hwm);
++copy;
- msg->deliver(*this, copy.getValue(), token, connection.getFrameMax());
- //std::cout << "[" << this << "] delivered: " << outgoing.hwm.getValue() << std::endl;
+ MessageDelivery::deliver(msg, *this, copy.getValue(), token, connection.getFrameMax());
return outgoing.hwm.getValue();
}
void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag)
{
- msg->deliver(*this, tag, token, connection.getFrameMax());
+ MessageDelivery::deliver(msg, *this, tag, token, connection.getFrameMax());
}
void SemanticHandler::send(const AMQBody& body)
@@ -214,3 +216,49 @@ void SemanticHandler::send(const AMQBody& body)
}
ChannelAdapter::send(body);
}
+
+uint16_t SemanticHandler::getClassId(const AMQFrame& frame)
+{
+ return frame.getBody()->type() == METHOD_BODY ? frame.castBody<AMQMethodBody>()->amqpClassId() : 0;
+}
+
+uint16_t SemanticHandler::getMethodId(const AMQFrame& frame)
+{
+ return frame.getBody()->type() == METHOD_BODY ? frame.castBody<AMQMethodBody>()->amqpMethodId() : 0;
+}
+
+SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame)
+{
+ //will be replaced by field in 0-10 frame header
+ uint8_t type = frame.getBody()->type();
+ uint16_t classId;
+ switch(type) {
+ case METHOD_BODY:
+ if (frame.castBody<AMQMethodBody>()->isContentBearing()) {
+ return MODEL_CONTENT_TRACK;
+ }
+
+ classId = frame.castBody<AMQMethodBody>()->amqpClassId();
+ switch (classId) {
+ case ChannelOpenBody::CLASS_ID:
+ return SESSION_CONTROL_TRACK;
+ case ExecutionCompleteBody::CLASS_ID:
+ return EXECUTION_CONTROL_TRACK;
+ }
+
+ return MODEL_COMMAND_TRACK;
+ case HEADER_BODY:
+ case CONTENT_BODY:
+ return MODEL_CONTENT_TRACK;
+ }
+ throw Exception("Could not determine track");
+}
+
+//ChannelAdapter virtual methods, no longer used:
+void SemanticHandler::handleMethod(framing::AMQMethodBody*){}
+
+void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody*) {}
+
+void SemanticHandler::handleContent(qpid::framing::AMQContentBody*) {}
+
+void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody*) {}
diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h
index 672c6ad929..611cd3a99b 100644
--- a/cpp/src/qpid/broker/SemanticHandler.h
+++ b/cpp/src/qpid/broker/SemanticHandler.h
@@ -25,6 +25,7 @@
#include "BrokerChannel.h"
#include "Connection.h"
#include "DeliveryAdapter.h"
+#include "MessageBuilder.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/FrameHandler.h"
@@ -55,8 +56,17 @@ class SemanticHandler : private framing::ChannelAdapter,
framing::Window incoming;
framing::Window outgoing;
sys::Mutex outLock;
+ MessageBuilder msgBuilder;
- void handleL4(framing::AMQMethodBody* method);
+ enum TrackId {SESSION_CONTROL_TRACK, EXECUTION_CONTROL_TRACK, MODEL_COMMAND_TRACK, MODEL_CONTENT_TRACK};
+ TrackId getTrack(const framing::AMQFrame& frame);
+ uint16_t getClassId(const framing::AMQFrame& frame);
+ uint16_t getMethodId(const framing::AMQFrame& frame);
+
+ void handleL3(framing::AMQMethodBody* method);
+ void handleL2(framing::AMQMethodBody* method);
+ void handleCommand(framing::AMQMethodBody* method);
+ void handleContent(framing::AMQFrame& frame);
//ChannelAdapter virtual methods:
void handleMethod(framing::AMQMethodBody* method);
diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h
index 6536a7c4ce..c411fb1965 100644
--- a/cpp/src/qpid/broker/TopicExchange.h
+++ b/cpp/src/qpid/broker/TopicExchange.h
@@ -25,7 +25,6 @@
#include <vector>
#include "BrokerExchange.h"
#include "qpid/framing/FieldTable.h"
-#include "BrokerMessage.h"
#include "qpid/sys/Monitor.h"
#include "BrokerQueue.h"
diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h
index 29b1dc38af..564e021c5a 100644
--- a/cpp/src/qpid/broker/TxPublish.h
+++ b/cpp/src/qpid/broker/TxPublish.h
@@ -24,10 +24,10 @@
#include <algorithm>
#include <functional>
#include <list>
+#include "BrokerQueue.h"
#include "Deliverable.h"
-#include "BrokerMessage.h"
+#include "Message.h"
#include "MessageStore.h"
-#include "BrokerQueue.h"
#include "TxOp.h"
namespace qpid {