summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-08-16 20:12:33 +0000
committerAlan Conway <aconway@apache.org>2007-08-16 20:12:33 +0000
commit49c7a491c98c26fe7d4f017a7ba655dfc029278c (patch)
tree304d51ba039a5391b4ebde08caab3da978b465fb /cpp/src/qpid/broker
parentdc13ca80ff893f74ab57fee6543de6543aa366bc (diff)
downloadqpid-python-49c7a491c98c26fe7d4f017a7ba655dfc029278c.tar.gz
AMQBodies are no longer allocated on the heap and passed with shared_ptr.
AMQFrame contains a boost::variant of AMQHeaderBody,AMQContentBody, AMQHeatbeatBody, and MethodHolder. A variant is basically a type-safe union, it can allocate any of the types in-place. MethodHolder contains a Blob, a less sophisticated kind of variant, which can contain any of the concrete method body types. Using variants for all the method types causes outrageous compile times and bloated library symbol names. Blob lacks some of the finer features of variant and needs help from generated code. For now both are hidden to the rest of the code base behind AMQFrame and MethodBody classes so if/when we decide to settle on just one "variant" type solution we can do so. This commit touches nearly 100 files, mostly converting method signatures with shared_ptr<FooBody> to FooBody* or FooBody&, and converting stored shared_ptr<AMQBody> to AMQFrame and share_ptr<AMQMethodBody> to MethodHolder. There is one outstanding client memory leak, which I will fix in my next commit. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566822 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp6
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.h6
-rw-r--r--cpp/src/qpid/broker/BrokerMessage.cpp59
-rw-r--r--cpp/src/qpid/broker/BrokerMessage.h8
-rw-r--r--cpp/src/qpid/broker/BrokerMessageBase.h13
-rw-r--r--cpp/src/qpid/broker/BrokerMessageMessage.cpp232
-rw-r--r--cpp/src/qpid/broker/BrokerMessageMessage.h15
-rw-r--r--cpp/src/qpid/broker/ConnectionAdapter.cpp4
-rw-r--r--cpp/src/qpid/broker/ConnectionAdapter.h8
-rw-r--r--cpp/src/qpid/broker/Content.h2
-rw-r--r--cpp/src/qpid/broker/InMemoryContent.cpp26
-rw-r--r--cpp/src/qpid/broker/InMemoryContent.h7
-rw-r--r--cpp/src/qpid/broker/LazyLoadedContent.cpp6
-rw-r--r--cpp/src/qpid/broker/LazyLoadedContent.h2
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.cpp4
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.h4
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp5
-rw-r--r--cpp/src/qpid/broker/Reference.cpp6
-rw-r--r--cpp/src/qpid/broker/Reference.h7
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp24
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.h20
21 files changed, 228 insertions, 236 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index 6e577ab354..e135e960c4 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -307,19 +307,19 @@ void Channel::handlePublish(Message* _message)
messageBuilder.initialise(message);
}
-void Channel::handleHeader(AMQHeaderBody::shared_ptr header)
+void Channel::handleHeader(AMQHeaderBody* header)
{
messageBuilder.setHeader(header);
//at this point, decide based on the size of the message whether we want
//to stage it by saving content directly to disk as it arrives
}
-void Channel::handleContent(AMQContentBody::shared_ptr content)
+void Channel::handleContent(AMQContentBody* content)
{
messageBuilder.addContent(content);
}
-void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) {
+void Channel::handleHeartbeat(AMQHeartbeatBody*) {
// TODO aconway 2007-01-17: Implement heartbeating.
}
diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h
index 1f4f6f35e7..021110cf8c 100644
--- a/cpp/src/qpid/broker/BrokerChannel.h
+++ b/cpp/src/qpid/broker/BrokerChannel.h
@@ -170,9 +170,9 @@ class Channel : public CompletionHandler
void flow(bool active);
void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag);
void handlePublish(Message* msg);
- void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>);
- void handleContent(boost::shared_ptr<framing::AMQContentBody>);
- void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
+ void handleHeader(framing::AMQHeaderBody*);
+ void handleContent(framing::AMQContentBody*);
+ void handleHeartbeat(framing::AMQHeartbeatBody*);
void handleInlineTransfer(Message::shared_ptr msg);
};
diff --git a/cpp/src/qpid/broker/BrokerMessage.cpp b/cpp/src/qpid/broker/BrokerMessage.cpp
index 244bee4a92..bddd5802cf 100644
--- a/cpp/src/qpid/broker/BrokerMessage.cpp
+++ b/cpp/src/qpid/broker/BrokerMessage.cpp
@@ -72,22 +72,25 @@ BasicMessage::BasicMessage(
const string& _exchange, const string& _routingKey,
bool _mandatory, bool _immediate
) :
- Message(_publisher, _exchange, _routingKey, _mandatory,
- _immediate, framing::AMQMethodBody::shared_ptr(new BasicPublishBody(ProtocolVersion(0,9)))),
+ Message(_publisher, _exchange, _routingKey, _mandatory, _immediate),
size(0)
{}
// For tests only.
-BasicMessage::BasicMessage() : size(0)
-{}
+BasicMessage::BasicMessage() : isHeaderSet(false), size(0) {}
BasicMessage::~BasicMessage(){}
-void BasicMessage::setHeader(AMQHeaderBody::shared_ptr _header){
- this->header = _header;
+void BasicMessage::setHeader(AMQHeaderBody* _header){
+ if (_header) {
+ this->header = *_header;
+ isHeaderSet = true;
+ }
+ else
+ isHeaderSet = false;
}
-void BasicMessage::addContent(AMQContentBody::shared_ptr data){
+void BasicMessage::addContent(AMQContentBody* data){
if (!content.get()) {
content = std::auto_ptr<Content>(new InMemoryContent());
}
@@ -96,7 +99,7 @@ void BasicMessage::addContent(AMQContentBody::shared_ptr data){
}
bool BasicMessage::isComplete(){
- return header.get() && (header->getContentSize() == contentSize());
+ return isHeaderSet && (header.getContentSize() == contentSize());
}
DeliveryToken::shared_ptr BasicMessage::createGetToken(Queue::shared_ptr queue)
@@ -113,10 +116,9 @@ void BasicMessage::deliver(ChannelAdapter& channel,
const string& consumerTag, DeliveryId id,
uint32_t framesize)
{
- channel.send(make_shared_ptr(
- new BasicDeliverBody(
+ channel.send(BasicDeliverBody(
channel.getVersion(), consumerTag, id.getValue(),
- getRedelivered(), getExchange(), getRoutingKey())));
+ getRedelivered(), getExchange(), getRoutingKey()));
sendContent(channel, framesize);
}
@@ -125,11 +127,11 @@ void BasicMessage::sendGetOk(ChannelAdapter& channel,
DeliveryId id,
uint32_t framesize)
{
- channel.send(make_shared_ptr(
- new BasicGetOkBody(
+ channel.send(
+ BasicGetOkBody(
channel.getVersion(),
id.getValue(), getRedelivered(), getExchange(),
- getRoutingKey(), messageCount)));
+ getRoutingKey(), messageCount));
sendContent(channel, framesize);
}
@@ -156,12 +158,11 @@ void BasicMessage::sendContent(ChannelAdapter& channel, uint32_t framesize)
channel.send(header);
Mutex::ScopedLock locker(contentLock);
if (content.get())
- content->send(channel, framesize);
+ content->send(channel, framesize);
}
BasicHeaderProperties* BasicMessage::getHeaderProperties(){
- return boost::polymorphic_downcast<BasicHeaderProperties*>(
- header->getProperties());
+ return isHeaderSet ? dynamic_cast<BasicHeaderProperties*>(header.getProperties()) : 0;
}
const FieldTable& BasicMessage::getApplicationHeaders(){
@@ -170,7 +171,7 @@ const FieldTable& BasicMessage::getApplicationHeaders(){
bool BasicMessage::isPersistent()
{
- if(!header) return false;
+ if(!isHeaderSet) return false;
BasicHeaderProperties* props = getHeaderProperties();
return props && props->getDeliveryMode() == PERSISTENT;
}
@@ -194,9 +195,9 @@ void BasicMessage::decodeHeader(Buffer& buffer)
setRouting(exchange, routingKey);
uint32_t headerSize = buffer.getLong();
- AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody());
- headerBody->decode(buffer, headerSize);
- setHeader(headerBody);
+ AMQHeaderBody headerBody;
+ headerBody.decode(buffer, headerSize);
+ setHeader(&headerBody);
}
void BasicMessage::decodeContent(Buffer& buffer, uint32_t chunkSize)
@@ -214,9 +215,9 @@ void BasicMessage::decodeContent(Buffer& buffer, uint32_t chunkSize)
uint64_t total = 0;
while (total < expectedContentSize()) {
uint64_t remaining = expected - total;
- AMQContentBody::shared_ptr contentBody(new AMQContentBody());
- contentBody->decode(buffer, remaining < chunkSize ? remaining : chunkSize);
- addContent(contentBody);
+ AMQContentBody contentBody;
+ contentBody.decode(buffer, remaining < chunkSize ? remaining : chunkSize);
+ addContent(&contentBody);
total += chunkSize;
}
}
@@ -232,8 +233,8 @@ void BasicMessage::encodeHeader(Buffer& buffer) const
RecoveryManagerImpl::encodeMessageType(*this, buffer);
buffer.putShortString(getExchange());
buffer.putShortString(getRoutingKey());
- buffer.putLong(header->size());
- header->encode(buffer);
+ buffer.putLong(header.size());
+ header.encode(buffer);
}
void BasicMessage::encodeContent(Buffer& buffer) const
@@ -258,12 +259,12 @@ uint32_t BasicMessage::encodedHeaderSize() const
return RecoveryManagerImpl::encodedMessageTypeSize()
+getExchange().size() + 1
+ getRoutingKey().size() + 1
- + header->size() + 4;//4 extra bytes for size
+ + header.size() + 4;//4 extra bytes for size
}
uint64_t BasicMessage::expectedContentSize()
{
- return header.get() ? header->getContentSize() : 0;
+ return isHeaderSet ? header.getContentSize() : 0;
}
void BasicMessage::releaseContent(MessageStore* store)
@@ -294,5 +295,5 @@ void BasicMessage::setContent(std::auto_ptr<Content>& _content)
uint32_t BasicMessage::getRequiredCredit() const
{
- return header->size() + contentSize();
+ return header.size() + contentSize();
}
diff --git a/cpp/src/qpid/broker/BrokerMessage.h b/cpp/src/qpid/broker/BrokerMessage.h
index af8e4e62e9..0f46ff2e83 100644
--- a/cpp/src/qpid/broker/BrokerMessage.h
+++ b/cpp/src/qpid/broker/BrokerMessage.h
@@ -27,6 +27,7 @@
#include "BrokerMessageBase.h"
#include "qpid/framing/BasicHeaderProperties.h"
+#include "qpid/framing/AMQHeaderBody.h"
#include "ConnectionToken.h"
#include "Content.h"
#include "qpid/sys/Mutex.h"
@@ -51,7 +52,8 @@ using framing::string;
* request.
*/
class BasicMessage : public Message {
- boost::shared_ptr<framing::AMQHeaderBody> header;
+ framing::AMQHeaderBody header;
+ bool isHeaderSet;
std::auto_ptr<Content> content;
mutable sys::Mutex contentLock;
uint64_t size;
@@ -66,8 +68,8 @@ class BasicMessage : public Message {
bool mandatory, bool immediate);
BasicMessage();
~BasicMessage();
- void setHeader(boost::shared_ptr<framing::AMQHeaderBody> header);
- void addContent(framing::AMQContentBody::shared_ptr data);
+ void setHeader(framing::AMQHeaderBody* header);
+ void addContent(framing::AMQContentBody* data);
bool isComplete();
static DeliveryToken::shared_ptr createGetToken(boost::shared_ptr<Queue> queue);
diff --git a/cpp/src/qpid/broker/BrokerMessageBase.h b/cpp/src/qpid/broker/BrokerMessageBase.h
index 94035905ce..bac5dc6386 100644
--- a/cpp/src/qpid/broker/BrokerMessageBase.h
+++ b/cpp/src/qpid/broker/BrokerMessageBase.h
@@ -54,22 +54,18 @@ class MessageStore;
class Message : public PersistableMessage{
public:
typedef boost::shared_ptr<Message> shared_ptr;
- typedef boost::shared_ptr<framing::AMQMethodBody> AMQMethodBodyPtr;
-
Message(const ConnectionToken* publisher_,
const std::string& _exchange,
const std::string& _routingKey,
- bool _mandatory, bool _immediate,
- AMQMethodBodyPtr respondTo_) :
+ bool _mandatory, bool _immediate) :
publisher(publisher_),
exchange(_exchange),
routingKey(_routingKey),
mandatory(_mandatory),
immediate(_immediate),
persistenceId(0),
- redelivered(false),
- respondTo(respondTo_)
+ redelivered(false)
{}
Message() :
@@ -145,8 +141,8 @@ class Message : public PersistableMessage{
* it uses).
*/
virtual void setContent(std::auto_ptr<Content>& /*content*/) {};
- virtual void setHeader(boost::shared_ptr<framing::AMQHeaderBody>) {};
- virtual void addContent(boost::shared_ptr<framing::AMQContentBody>) {};
+ virtual void setHeader(framing::AMQHeaderBody*) {};
+ virtual void addContent(framing::AMQContentBody*) {};
/**
* Releases the in-memory content data held by this
* message. Must pass in a store from which the data can
@@ -164,7 +160,6 @@ class Message : public PersistableMessage{
const bool immediate;
mutable uint64_t persistenceId;
bool redelivered;
- AMQMethodBodyPtr respondTo;
};
}}
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
index 0da5f3d8f5..1184885aeb 100644
--- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp
+++ b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
@@ -48,35 +48,34 @@ struct MessageDeliveryToken : public DeliveryToken
};
MessageMessage::MessageMessage(
- ConnectionToken* publisher, TransferPtr transfer_
+ ConnectionToken* publisher, const MessageTransferBody* transfer_
) : Message(publisher, transfer_->getDestination(),
transfer_->getRoutingKey(),
transfer_->getRejectUnroutable(),
- transfer_->getImmediate(),
- transfer_),
- transfer(transfer_)
+ transfer_->getImmediate()),
+ transfer(*transfer_)
{
- assert(transfer->getBody().isInline());
+ assert(transfer.getBody().isInline());
}
MessageMessage::MessageMessage(
- ConnectionToken* publisher, TransferPtr transfer_, ReferencePtr reference_
+ ConnectionToken* publisher, const MessageTransferBody* transfer_,
+ ReferencePtr reference_
) : Message(publisher, transfer_->getDestination(),
transfer_->getRoutingKey(),
transfer_->getRejectUnroutable(),
- transfer_->getImmediate(),
- transfer_),
- transfer(transfer_),
+ transfer_->getImmediate()),
+ transfer(*transfer_),
reference(reference_)
{
- assert(!transfer->getBody().isInline());
+ assert(!transfer.getBody().isInline());
assert(reference_);
}
/**
* Currently used by message store impls to recover messages
*/
-MessageMessage::MessageMessage() : transfer(new MessageTransferBody(qpid::framing::highestProtocolVersion)) {}
+MessageMessage::MessageMessage() {}
// TODO: astitcher 1-Mar-2007: This code desperately needs better factoring
void MessageMessage::transferMessage(
@@ -84,27 +83,28 @@ void MessageMessage::transferMessage(
const std::string& consumerTag,
uint32_t framesize)
{
- const framing::Content& body = transfer->getBody();
+ const framing::Content& body = transfer.getBody();
// Send any reference data
ReferencePtr ref= getReference();
if (ref){
// Open
- channel.send(make_shared_ptr(new MessageOpenBody(channel.getVersion(), ref->getId())));
+ channel.send(MessageOpenBody(channel.getVersion(), ref->getId()));
// Appends
for(Reference::Appends::const_iterator a = ref->getAppends().begin();
a != ref->getAppends().end();
++a) {
- uint32_t sizeleft = (*a)->size();
- const string& content = (*a)->getBytes();
+ uint32_t sizeleft = a->size();
+ const string& content = a->getBytes();
// Calculate overhead bytes
// Assume that the overhead is constant as the reference name doesn't change
uint32_t overhead = sizeleft - content.size();
string::size_type contentStart = 0;
while (sizeleft) {
string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead;
- channel.send(make_shared_ptr(new MessageAppendBody(channel.getVersion(), ref->getId(),
- string(content, contentStart, contentSize))));
+
+ channel.send(MessageAppendBody(channel.getVersion(), ref->getId(),
+ string(content, contentStart, contentSize)));
sizeleft -= contentSize;
contentStart += contentSize;
}
@@ -112,76 +112,73 @@ void MessageMessage::transferMessage(
}
// The transfer
- if ( transfer->size()<=framesize ) {
- channel.send(make_shared_ptr(
- new MessageTransferBody(channel.getVersion(),
- transfer->getTicket(),
- consumerTag,
- getRedelivered(),
- transfer->getRejectUnroutable(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- 0, /*content-length*/
- string(), /*type*/
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- body)));
+ if ( transfer.size()<=framesize ) {
+ channel.send(MessageTransferBody(ProtocolVersion(),
+ transfer.getTicket(),
+ consumerTag,
+ getRedelivered(),
+ transfer.getRejectUnroutable(),
+ transfer.getImmediate(),
+ transfer.getTtl(),
+ transfer.getPriority(),
+ transfer.getTimestamp(),
+ transfer.getDeliveryMode(),
+ transfer.getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer.getMessageId(),
+ transfer.getCorrelationId(),
+ transfer.getReplyTo(),
+ transfer.getContentType(),
+ transfer.getContentEncoding(),
+ 0, /*content-length*/
+ string(), /*type*/
+ transfer.getUserId(),
+ transfer.getAppId(),
+ transfer.getTransactionId(),
+ transfer.getSecurityToken(),
+ transfer.getApplicationHeaders(),
+ body));
} else {
// Thing to do here is to construct a simple reference message then deliver that instead
// fragmentation will be taken care of in the delivery if necessary;
string content = body.getValue();
string refname = "dummy";
- TransferPtr newTransfer(
- new MessageTransferBody(channel.getVersion(),
- transfer->getTicket(),
- consumerTag,
- getRedelivered(),
- transfer->getRejectUnroutable(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- 0, /*content-length*/
- string(), /*type*/
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- framing::Content(REFERENCE, refname)));
+ MessageTransferBody newTransfer(channel.getVersion(),
+ transfer.getTicket(),
+ consumerTag,
+ getRedelivered(),
+ transfer.getRejectUnroutable(),
+ transfer.getImmediate(),
+ transfer.getTtl(),
+ transfer.getPriority(),
+ transfer.getTimestamp(),
+ transfer.getDeliveryMode(),
+ transfer.getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer.getMessageId(),
+ transfer.getCorrelationId(),
+ transfer.getReplyTo(),
+ transfer.getContentType(),
+ transfer.getContentEncoding(),
+ 0, /*content-length*/
+ string(), /*type*/
+ transfer.getUserId(),
+ transfer.getAppId(),
+ transfer.getTransactionId(),
+ transfer.getSecurityToken(),
+ transfer.getApplicationHeaders(),
+ framing::Content(REFERENCE, refname));
ReferencePtr newRef(new Reference(refname));
- Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content));
- newRef->append(newAppend);
- MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), newTransfer, newRef);
+ newRef->append(MessageAppendBody(channel.getVersion(), refname, content));
+ MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), &newTransfer, newRef);
newMsg.transferMessage(channel, consumerTag, framesize);
return;
}
// Close any reference data
if (ref)
- channel.send(make_shared_ptr(new MessageCloseBody(channel.getVersion(), ref->getId())));
+ channel.send(MessageCloseBody(ProtocolVersion(), ref->getId()));
}
@@ -202,8 +199,8 @@ bool MessageMessage::isComplete()
uint64_t MessageMessage::contentSize() const
{
- if (transfer->getBody().isInline())
- return transfer->getBody().getValue().size();
+ if (transfer.getBody().isInline())
+ return transfer.getBody().getValue().size();
else {
assert(getReference());
return getReference()->getSize();
@@ -217,11 +214,11 @@ qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties()
const FieldTable& MessageMessage::getApplicationHeaders()
{
- return transfer->getApplicationHeaders();
+ return transfer.getApplicationHeaders();
}
bool MessageMessage::isPersistent()
{
- return transfer->getDeliveryMode() == PERSISTENT;
+ return transfer.getDeliveryMode() == PERSISTENT;
}
uint32_t MessageMessage::encodedSize() const
@@ -231,7 +228,7 @@ uint32_t MessageMessage::encodedSize() const
uint32_t MessageMessage::encodedHeaderSize() const
{
- return RecoveryManagerImpl::encodedMessageTypeSize() + transfer->size() - transfer->baseSize();
+ return RecoveryManagerImpl::encodedMessageTypeSize() + transfer.size();
}
uint32_t MessageMessage::encodedContentSize() const
@@ -252,18 +249,17 @@ void MessageMessage::encode(Buffer& buffer) const
void MessageMessage::encodeHeader(Buffer& buffer) const
{
RecoveryManagerImpl::encodeMessageType(*this, buffer);
- if (transfer->getBody().isInline()) {
- transfer->encodeContent(buffer);
+ if (transfer.getBody().isInline()) {
+ transfer.encode(buffer);
} else {
assert(getReference());
string data;
const Reference::Appends& appends = getReference()->getAppends();
for(Reference::Appends::const_iterator a = appends.begin(); a != appends.end(); ++a) {
- data += (*a)->getBytes();
+ data += a->getBytes();
}
framing::Content body(INLINE, data);
- std::auto_ptr<MessageTransferBody> copy(copyTransfer(transfer->version, transfer->getDestination(), body));
- copy->encodeContent(buffer);
+ copyTransfer(ProtocolVersion(), transfer.getDestination(), body).encode(buffer);
}
}
@@ -271,8 +267,7 @@ void MessageMessage::decodeHeader(Buffer& buffer)
{
//don't care about the type here, but want encode/decode to be symmetric
RecoveryManagerImpl::decodeMessageType(buffer);
-
- transfer->decodeContent(buffer);
+ transfer.decode(buffer);
}
void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/)
@@ -280,37 +275,36 @@ void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/)
}
-MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version,
- const string& destination,
- const framing::Content& body) const
+MessageTransferBody MessageMessage::copyTransfer(const ProtocolVersion& version,
+ const string& destination,
+ const framing::Content& body) const
{
- return new MessageTransferBody(version,
- transfer->getTicket(),
- destination,
- getRedelivered(),
- transfer->getRejectUnroutable(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- 0, /*content-length*/
- string(), /*type*/
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- body);
-
+ return MessageTransferBody(version,
+ transfer.getTicket(),
+ destination,
+ getRedelivered(),
+ transfer.getRejectUnroutable(),
+ transfer.getImmediate(),
+ transfer.getTtl(),
+ transfer.getPriority(),
+ transfer.getTimestamp(),
+ transfer.getDeliveryMode(),
+ transfer.getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer.getMessageId(),
+ transfer.getCorrelationId(),
+ transfer.getReplyTo(),
+ transfer.getContentType(),
+ transfer.getContentEncoding(),
+ 0, /*content-length*/
+ string(), /*type*/
+ transfer.getUserId(),
+ transfer.getAppId(),
+ transfer.getTransactionId(),
+ transfer.getSecurityToken(),
+ transfer.getApplicationHeaders(),
+ body);
}
MessageMessage::ReferencePtr MessageMessage::getReference() const {
@@ -321,7 +315,7 @@ uint32_t MessageMessage::getRequiredCredit() const
{
//TODO: change when encoding changes. Should be the payload of any
//header & body frames.
- return transfer->size();
+ return transfer.size();
}
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.h b/cpp/src/qpid/broker/BrokerMessageMessage.h
index 6b1bd9ab5d..6bfd0e045d 100644
--- a/cpp/src/qpid/broker/BrokerMessageMessage.h
+++ b/cpp/src/qpid/broker/BrokerMessageMessage.h
@@ -29,10 +29,6 @@
namespace qpid {
-namespace framing {
-class MessageTransferBody;
-}
-
namespace broker {
class ConnectionToken;
class Reference;
@@ -40,16 +36,15 @@ class Reference;
class MessageMessage: public Message{
public:
typedef boost::shared_ptr<MessageMessage> shared_ptr;
- typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr;
typedef boost::shared_ptr<Reference> ReferencePtr;
- MessageMessage(ConnectionToken* publisher, TransferPtr transfer);
- MessageMessage(ConnectionToken* publisher, TransferPtr transfer, ReferencePtr reference);
+ MessageMessage(ConnectionToken* publisher, const framing::MessageTransferBody* transfer);
+ MessageMessage(ConnectionToken* publisher, const framing::MessageTransferBody* transfer, ReferencePtr reference);
MessageMessage();
// Default destructor okay
- TransferPtr getTransfer() const { return transfer; }
+ framing::MessageTransferBody* getTransfer() const { return const_cast<framing::MessageTransferBody*>(&transfer); }
ReferencePtr getReference() const ;
void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize);
@@ -80,12 +75,12 @@ class MessageMessage: public Message{
const std::string& consumerTag,
uint32_t framesize);
- framing::MessageTransferBody* copyTransfer(
+ framing::MessageTransferBody copyTransfer(
const framing::ProtocolVersion& version,
const std::string& destination,
const framing::Content& body) const;
- const TransferPtr transfer;
+ framing::MessageTransferBody transfer;
const boost::shared_ptr<Reference> reference;
};
diff --git a/cpp/src/qpid/broker/ConnectionAdapter.cpp b/cpp/src/qpid/broker/ConnectionAdapter.cpp
index a67a5557c6..175f57df7d 100644
--- a/cpp/src/qpid/broker/ConnectionAdapter.cpp
+++ b/cpp/src/qpid/broker/ConnectionAdapter.cpp
@@ -1,3 +1,4 @@
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -42,8 +43,7 @@ void ConnectionAdapter::close(ReplyCode code, const string& text, ClassId classI
handler->client.close(code, text, classId, methodId);
}
-void ConnectionAdapter::handleMethod(
- boost::shared_ptr<qpid::framing::AMQMethodBody> method)
+void ConnectionAdapter::handleMethod(framing::AMQMethodBody* method)
{
try{
method->invoke(*this);
diff --git a/cpp/src/qpid/broker/ConnectionAdapter.h b/cpp/src/qpid/broker/ConnectionAdapter.h
index 1ce850a659..9aa3d130e8 100644
--- a/cpp/src/qpid/broker/ConnectionAdapter.h
+++ b/cpp/src/qpid/broker/ConnectionAdapter.h
@@ -47,12 +47,12 @@ public:
void handle(framing::AMQFrame& frame);
//ChannelAdapter virtual methods:
- void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody> method);
+ void handleMethod(framing::AMQMethodBody* method);
bool isOpen() const { return true; } //channel 0 is always open
//never needed:
- void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>) {}
- void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>) {}
- void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>) {}
+ void handleHeader(framing::AMQHeaderBody*) {}
+ void handleContent(framing::AMQContentBody*) {}
+ void handleHeartbeat(framing::AMQHeartbeatBody*) {}
//AMQP_ServerOperations:
ConnectionHandler* getConnectionHandler();
diff --git a/cpp/src/qpid/broker/Content.h b/cpp/src/qpid/broker/Content.h
index df9e7a1132..97dce0d3f7 100644
--- a/cpp/src/qpid/broker/Content.h
+++ b/cpp/src/qpid/broker/Content.h
@@ -42,7 +42,7 @@ class Content{
virtual ~Content(){}
/** Add a block of data to the content */
- virtual void add(framing::AMQContentBody::shared_ptr data) = 0;
+ virtual void add(framing::AMQContentBody* data) = 0;
/** Total size of content in bytes */
virtual uint32_t size() = 0;
diff --git a/cpp/src/qpid/broker/InMemoryContent.cpp b/cpp/src/qpid/broker/InMemoryContent.cpp
index a6ce820f7e..d69dcfafe7 100644
--- a/cpp/src/qpid/broker/InMemoryContent.cpp
+++ b/cpp/src/qpid/broker/InMemoryContent.cpp
@@ -26,16 +26,16 @@ using namespace qpid::broker;
using namespace qpid::framing;
using boost::static_pointer_cast;
-void InMemoryContent::add(AMQContentBody::shared_ptr data)
+void InMemoryContent::add(AMQContentBody* data)
{
- content.push_back(data);
+ content.push_back(*data);
}
uint32_t InMemoryContent::size()
{
int sum(0);
for (content_iterator i = content.begin(); i != content.end(); i++) {
- sum += (*i)->size();
+ sum += i->size();
}
return sum;
}
@@ -43,22 +43,20 @@ uint32_t InMemoryContent::size()
void InMemoryContent::send(ChannelAdapter& channel, uint32_t framesize)
{
for (content_iterator i = content.begin(); i != content.end(); i++) {
- if ((*i)->size() > framesize) {
+ if (i->size() > framesize) {
uint32_t offset = 0;
- for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) {
- string data = (*i)->getData().substr(offset, framesize);
- channel.send(make_shared_ptr(new AMQContentBody(data)));
+ for (int chunk = i->size() / framesize; chunk > 0; chunk--) {
+ string data = i->getData().substr(offset, framesize);
+ channel.send(AMQContentBody(data));
offset += framesize;
}
- uint32_t remainder = (*i)->size() % framesize;
+ uint32_t remainder = i->size() % framesize;
if (remainder) {
- string data = (*i)->getData().substr(offset, remainder);
- channel.send(make_shared_ptr(new AMQContentBody(data)));
+ string data = i->getData().substr(offset, remainder);
+ channel.send(AMQContentBody(data));
}
} else {
- AMQBody::shared_ptr contentBody =
- static_pointer_cast<AMQBody, AMQContentBody>(*i);
- channel.send(contentBody);
+ channel.send(*i);
}
}
}
@@ -66,7 +64,7 @@ void InMemoryContent::send(ChannelAdapter& channel, uint32_t framesize)
void InMemoryContent::encode(Buffer& buffer)
{
for (content_iterator i = content.begin(); i != content.end(); i++) {
- (*i)->encode(buffer);
+ i->encode(buffer);
}
}
diff --git a/cpp/src/qpid/broker/InMemoryContent.h b/cpp/src/qpid/broker/InMemoryContent.h
index 425f0e4e26..a6fca7ca98 100644
--- a/cpp/src/qpid/broker/InMemoryContent.h
+++ b/cpp/src/qpid/broker/InMemoryContent.h
@@ -22,21 +22,22 @@
#define _InMemoryContent_
#include "Content.h"
+#include "qpid/framing/AMQContentBody.h"
#include <vector>
namespace qpid {
namespace broker {
class InMemoryContent : public Content{
- typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list;
+ typedef std::vector<framing::AMQContentBody> content_list;
typedef content_list::iterator content_iterator;
content_list content;
public:
- void add(qpid::framing::AMQContentBody::shared_ptr data);
+ void add(framing::AMQContentBody* data);
uint32_t size();
void send(framing::ChannelAdapter&, uint32_t framesize);
- void encode(qpid::framing::Buffer& buffer);
+ void encode(framing::Buffer& buffer);
};
}
}
diff --git a/cpp/src/qpid/broker/LazyLoadedContent.cpp b/cpp/src/qpid/broker/LazyLoadedContent.cpp
index 80d06ebf2b..b8b5b37f45 100644
--- a/cpp/src/qpid/broker/LazyLoadedContent.cpp
+++ b/cpp/src/qpid/broker/LazyLoadedContent.cpp
@@ -33,7 +33,7 @@ LazyLoadedContent::~LazyLoadedContent()
LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, Message* const _msg, uint64_t _expectedSize) :
store(_store), msg(_msg), expectedSize(_expectedSize) {}
-void LazyLoadedContent::add(AMQContentBody::shared_ptr data)
+void LazyLoadedContent::add(AMQContentBody* data)
{
store->appendContent(*msg, data->getData());
}
@@ -52,12 +52,12 @@ void LazyLoadedContent::send(ChannelAdapter& channel, uint32_t framesize)
string data;
store->loadContent(*msg, data, offset,
remaining > framesize ? framesize : remaining);
- channel.send(make_shared_ptr(new AMQContentBody(data)));
+ channel.send(AMQContentBody(data));
}
} else {
string data;
store->loadContent(*msg, data, 0, expectedSize);
- channel.send(make_shared_ptr(new AMQContentBody(data)));
+ channel.send(AMQContentBody(data));
}
}
diff --git a/cpp/src/qpid/broker/LazyLoadedContent.h b/cpp/src/qpid/broker/LazyLoadedContent.h
index 9dff6158a5..79a33ed7a9 100644
--- a/cpp/src/qpid/broker/LazyLoadedContent.h
+++ b/cpp/src/qpid/broker/LazyLoadedContent.h
@@ -36,7 +36,7 @@ namespace qpid {
MessageStore* const store, Message* const msg,
uint64_t expectedSize);
~LazyLoadedContent();
- void add(qpid::framing::AMQContentBody::shared_ptr data);
+ void add(qpid::framing::AMQContentBody* data);
uint32_t size();
void send(
framing::ChannelAdapter&,
diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp
index 6c33b38e72..f19927b708 100644
--- a/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -50,7 +50,7 @@ void MessageBuilder::initialise(Message::shared_ptr& msg){
message = msg;
}
-void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){
+void MessageBuilder::setHeader(AMQHeaderBody* header){
if(!message.get()){
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish.");
}
@@ -65,7 +65,7 @@ void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){
route();
}
-void MessageBuilder::addContent(AMQContentBody::shared_ptr& content){
+void MessageBuilder::addContent(AMQContentBody* content){
if(!message.get()){
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish.");
}
diff --git a/cpp/src/qpid/broker/MessageBuilder.h b/cpp/src/qpid/broker/MessageBuilder.h
index 7c4a529a64..18e85d7383 100644
--- a/cpp/src/qpid/broker/MessageBuilder.h
+++ b/cpp/src/qpid/broker/MessageBuilder.h
@@ -39,8 +39,8 @@ namespace qpid {
MessageStore* const store = 0,
uint64_t stagingThreshold = 0);
void initialise(Message::shared_ptr& msg);
- void setHeader(framing::AMQHeaderBody::shared_ptr& header);
- void addContent(framing::AMQContentBody::shared_ptr& content);
+ void setHeader(framing::AMQHeaderBody* header);
+ void addContent(framing::AMQContentBody* content);
Message::shared_ptr getMessage() { return message; }
private:
Message::shared_ptr message;
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index b5bea05eac..3f407c11f7 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -28,6 +28,7 @@
#include "BrokerAdapter.h"
#include <boost/format.hpp>
+#include <boost/cast.hpp>
namespace qpid {
namespace broker {
@@ -159,9 +160,7 @@ MessageHandlerImpl::reject(uint16_t /*code*/, const string& /*text*/ )
void
MessageHandlerImpl::transfer(const framing::AMQMethodBody& context)
{
- MessageTransferBody::shared_ptr transfer(
- make_shared_ptr(new MessageTransferBody(static_cast<const MessageTransferBody&>(context))));
-
+ const MessageTransferBody* transfer = boost::polymorphic_downcast<const MessageTransferBody*>(&context);
if (transfer->getBody().isInline()) {
MessageMessage::shared_ptr message(new MessageMessage(&connection, transfer));
channel.handleInlineTransfer(message);
diff --git a/cpp/src/qpid/broker/Reference.cpp b/cpp/src/qpid/broker/Reference.cpp
index c2060f8fee..283b231b60 100644
--- a/cpp/src/qpid/broker/Reference.cpp
+++ b/cpp/src/qpid/broker/Reference.cpp
@@ -40,9 +40,9 @@ Reference::shared_ptr ReferenceRegistry::get(const Reference::Id& id) {
return i->second;
}
-void Reference::append(AppendPtr ptr) {
- appends.push_back(ptr);
- size += ptr->getBytes().length();
+void Reference::append(const framing::MessageAppendBody& app) {
+ appends.push_back(app);
+ size += app.getBytes().length();
}
void Reference::close() {
diff --git a/cpp/src/qpid/broker/Reference.h b/cpp/src/qpid/broker/Reference.h
index 277eb7b917..5a373fbeba 100644
--- a/cpp/src/qpid/broker/Reference.h
+++ b/cpp/src/qpid/broker/Reference.h
@@ -19,6 +19,8 @@
*
*/
+#include "qpid/framing/MessageAppendBody.h"
+
#include <string>
#include <vector>
#include <map>
@@ -56,8 +58,7 @@ class Reference
typedef boost::shared_ptr<Reference> shared_ptr;
typedef boost::shared_ptr<MessageMessage> MessagePtr;
typedef std::vector<MessagePtr> Messages;
- typedef boost::shared_ptr<framing::MessageAppendBody> AppendPtr;
- typedef std::vector<AppendPtr> Appends;
+ typedef std::vector<framing::MessageAppendBody> Appends;
Reference(const Id& id_=Id(), ReferenceRegistry* reg=0)
: id(id_), size(0), registry(reg) {}
@@ -69,7 +70,7 @@ class Reference
void addMessage(MessagePtr message) { messages.push_back(message); }
/** Append more data to the reference */
- void append(AppendPtr ptr);
+ void append(const framing::MessageAppendBody&);
/** Close the reference, complete each associated message */
void close();
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index 6ef2162a4a..b7aa2aad25 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -22,6 +22,8 @@
#include "SemanticHandler.h"
#include "BrokerAdapter.h"
#include "qpid/framing/ChannelAdapter.h"
+#include "qpid/framing/ExecutionCompleteBody.h"
+#include "qpid/framing/ChannelCloseOkBody.h"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -60,7 +62,7 @@ void SemanticHandler::handle(framing::AMQFrame& frame)
}
//ChannelAdapter virtual methods:
-void SemanticHandler::handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody> method)
+void SemanticHandler::handleMethod(framing::AMQMethodBody* method)
{
try {
if (!method->invoke(this)) {
@@ -108,11 +110,11 @@ void SemanticHandler::flush()
incoming.lwm = incoming.hwm;
if (isOpen()) {
Mutex::ScopedLock l(outLock);
- ChannelAdapter::send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())));
+ ChannelAdapter::send(ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()));
}
}
-void SemanticHandler::handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method)
+void SemanticHandler::handleL4(framing::AMQMethodBody* method)
{
try{
if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
@@ -139,17 +141,17 @@ bool SemanticHandler::isOpen() const
return channel.isOpen();
}
-void SemanticHandler::handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody> body)
+void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody* body)
{
channel.handleHeader(body);
}
-void SemanticHandler::handleContent(boost::shared_ptr<qpid::framing::AMQContentBody> body)
+void SemanticHandler::handleContent(qpid::framing::AMQContentBody* body)
{
channel.handleContent(body);
}
-void SemanticHandler::handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody> body)
+void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody* body)
{
channel.handleHeartbeat(body);
}
@@ -169,16 +171,12 @@ void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_
msg->deliver(*this, tag, token, connection.getFrameMax());
}
-void SemanticHandler::send(shared_ptr<AMQBody> body)
+void SemanticHandler::send(const AMQBody& body)
{
Mutex::ScopedLock l(outLock);
- uint8_t type(body->type());
- if (type == METHOD_BODY) {
+ if (body.getMethod() && body.getMethod()->amqpClassId() != ChannelOpenBody::CLASS_ID) {
//temporary hack until channel management is moved to its own handler:
- if (dynamic_pointer_cast<AMQMethodBody>(body)->amqpClassId() != ChannelOpenBody::CLASS_ID) {
- ++outgoing.hwm;
- //std::cout << "[" << this << "] allocated: " << outgoing.hwm.getValue() << " to " << *body << std::endl;
- }
+ ++outgoing.hwm;
}
ChannelAdapter::send(body);
}
diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h
index 016c94738d..6748da8500 100644
--- a/cpp/src/qpid/broker/SemanticHandler.h
+++ b/cpp/src/qpid/broker/SemanticHandler.h
@@ -31,6 +31,14 @@
#include "qpid/framing/SequenceNumber.h"
namespace qpid {
+
+namespace framing {
+class AMQMethodBody;
+class AMQHeaderBody;
+class AMQContentBody;
+class AMQHeaderBody;
+}
+
namespace broker {
class BrokerAdapter;
@@ -48,16 +56,16 @@ class SemanticHandler : private framing::ChannelAdapter,
framing::Window outgoing;
sys::Mutex outLock;
- void handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method);
+ void handleL4(framing::AMQMethodBody* method);
//ChannelAdapter virtual methods:
- void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody> method);
+ void handleMethod(framing::AMQMethodBody* method);
bool isOpen() const;
- void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>);
- void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>);
- void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>);
+ void handleHeader(framing::AMQHeaderBody*);
+ void handleContent(framing::AMQContentBody*);
+ void handleHeartbeat(framing::AMQHeartbeatBody*);
- void send(shared_ptr<framing::AMQBody> body);
+ void send(const framing::AMQBody& body);
//delivery adapter methods: