summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerMessageMessage.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-08-16 20:12:33 +0000
committerAlan Conway <aconway@apache.org>2007-08-16 20:12:33 +0000
commit49c7a491c98c26fe7d4f017a7ba655dfc029278c (patch)
tree304d51ba039a5391b4ebde08caab3da978b465fb /cpp/src/qpid/broker/BrokerMessageMessage.cpp
parentdc13ca80ff893f74ab57fee6543de6543aa366bc (diff)
downloadqpid-python-49c7a491c98c26fe7d4f017a7ba655dfc029278c.tar.gz
AMQBodies are no longer allocated on the heap and passed with shared_ptr.
AMQFrame contains a boost::variant of AMQHeaderBody,AMQContentBody, AMQHeatbeatBody, and MethodHolder. A variant is basically a type-safe union, it can allocate any of the types in-place. MethodHolder contains a Blob, a less sophisticated kind of variant, which can contain any of the concrete method body types. Using variants for all the method types causes outrageous compile times and bloated library symbol names. Blob lacks some of the finer features of variant and needs help from generated code. For now both are hidden to the rest of the code base behind AMQFrame and MethodBody classes so if/when we decide to settle on just one "variant" type solution we can do so. This commit touches nearly 100 files, mostly converting method signatures with shared_ptr<FooBody> to FooBody* or FooBody&, and converting stored shared_ptr<AMQBody> to AMQFrame and share_ptr<AMQMethodBody> to MethodHolder. There is one outstanding client memory leak, which I will fix in my next commit. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566822 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/BrokerMessageMessage.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerMessageMessage.cpp232
1 files changed, 113 insertions, 119 deletions
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
index 0da5f3d8f5..1184885aeb 100644
--- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp
+++ b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
@@ -48,35 +48,34 @@ struct MessageDeliveryToken : public DeliveryToken
};
MessageMessage::MessageMessage(
- ConnectionToken* publisher, TransferPtr transfer_
+ ConnectionToken* publisher, const MessageTransferBody* transfer_
) : Message(publisher, transfer_->getDestination(),
transfer_->getRoutingKey(),
transfer_->getRejectUnroutable(),
- transfer_->getImmediate(),
- transfer_),
- transfer(transfer_)
+ transfer_->getImmediate()),
+ transfer(*transfer_)
{
- assert(transfer->getBody().isInline());
+ assert(transfer.getBody().isInline());
}
MessageMessage::MessageMessage(
- ConnectionToken* publisher, TransferPtr transfer_, ReferencePtr reference_
+ ConnectionToken* publisher, const MessageTransferBody* transfer_,
+ ReferencePtr reference_
) : Message(publisher, transfer_->getDestination(),
transfer_->getRoutingKey(),
transfer_->getRejectUnroutable(),
- transfer_->getImmediate(),
- transfer_),
- transfer(transfer_),
+ transfer_->getImmediate()),
+ transfer(*transfer_),
reference(reference_)
{
- assert(!transfer->getBody().isInline());
+ assert(!transfer.getBody().isInline());
assert(reference_);
}
/**
* Currently used by message store impls to recover messages
*/
-MessageMessage::MessageMessage() : transfer(new MessageTransferBody(qpid::framing::highestProtocolVersion)) {}
+MessageMessage::MessageMessage() {}
// TODO: astitcher 1-Mar-2007: This code desperately needs better factoring
void MessageMessage::transferMessage(
@@ -84,27 +83,28 @@ void MessageMessage::transferMessage(
const std::string& consumerTag,
uint32_t framesize)
{
- const framing::Content& body = transfer->getBody();
+ const framing::Content& body = transfer.getBody();
// Send any reference data
ReferencePtr ref= getReference();
if (ref){
// Open
- channel.send(make_shared_ptr(new MessageOpenBody(channel.getVersion(), ref->getId())));
+ channel.send(MessageOpenBody(channel.getVersion(), ref->getId()));
// Appends
for(Reference::Appends::const_iterator a = ref->getAppends().begin();
a != ref->getAppends().end();
++a) {
- uint32_t sizeleft = (*a)->size();
- const string& content = (*a)->getBytes();
+ uint32_t sizeleft = a->size();
+ const string& content = a->getBytes();
// Calculate overhead bytes
// Assume that the overhead is constant as the reference name doesn't change
uint32_t overhead = sizeleft - content.size();
string::size_type contentStart = 0;
while (sizeleft) {
string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead;
- channel.send(make_shared_ptr(new MessageAppendBody(channel.getVersion(), ref->getId(),
- string(content, contentStart, contentSize))));
+
+ channel.send(MessageAppendBody(channel.getVersion(), ref->getId(),
+ string(content, contentStart, contentSize)));
sizeleft -= contentSize;
contentStart += contentSize;
}
@@ -112,76 +112,73 @@ void MessageMessage::transferMessage(
}
// The transfer
- if ( transfer->size()<=framesize ) {
- channel.send(make_shared_ptr(
- new MessageTransferBody(channel.getVersion(),
- transfer->getTicket(),
- consumerTag,
- getRedelivered(),
- transfer->getRejectUnroutable(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- 0, /*content-length*/
- string(), /*type*/
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- body)));
+ if ( transfer.size()<=framesize ) {
+ channel.send(MessageTransferBody(ProtocolVersion(),
+ transfer.getTicket(),
+ consumerTag,
+ getRedelivered(),
+ transfer.getRejectUnroutable(),
+ transfer.getImmediate(),
+ transfer.getTtl(),
+ transfer.getPriority(),
+ transfer.getTimestamp(),
+ transfer.getDeliveryMode(),
+ transfer.getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer.getMessageId(),
+ transfer.getCorrelationId(),
+ transfer.getReplyTo(),
+ transfer.getContentType(),
+ transfer.getContentEncoding(),
+ 0, /*content-length*/
+ string(), /*type*/
+ transfer.getUserId(),
+ transfer.getAppId(),
+ transfer.getTransactionId(),
+ transfer.getSecurityToken(),
+ transfer.getApplicationHeaders(),
+ body));
} else {
// Thing to do here is to construct a simple reference message then deliver that instead
// fragmentation will be taken care of in the delivery if necessary;
string content = body.getValue();
string refname = "dummy";
- TransferPtr newTransfer(
- new MessageTransferBody(channel.getVersion(),
- transfer->getTicket(),
- consumerTag,
- getRedelivered(),
- transfer->getRejectUnroutable(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- 0, /*content-length*/
- string(), /*type*/
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- framing::Content(REFERENCE, refname)));
+ MessageTransferBody newTransfer(channel.getVersion(),
+ transfer.getTicket(),
+ consumerTag,
+ getRedelivered(),
+ transfer.getRejectUnroutable(),
+ transfer.getImmediate(),
+ transfer.getTtl(),
+ transfer.getPriority(),
+ transfer.getTimestamp(),
+ transfer.getDeliveryMode(),
+ transfer.getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer.getMessageId(),
+ transfer.getCorrelationId(),
+ transfer.getReplyTo(),
+ transfer.getContentType(),
+ transfer.getContentEncoding(),
+ 0, /*content-length*/
+ string(), /*type*/
+ transfer.getUserId(),
+ transfer.getAppId(),
+ transfer.getTransactionId(),
+ transfer.getSecurityToken(),
+ transfer.getApplicationHeaders(),
+ framing::Content(REFERENCE, refname));
ReferencePtr newRef(new Reference(refname));
- Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content));
- newRef->append(newAppend);
- MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), newTransfer, newRef);
+ newRef->append(MessageAppendBody(channel.getVersion(), refname, content));
+ MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), &newTransfer, newRef);
newMsg.transferMessage(channel, consumerTag, framesize);
return;
}
// Close any reference data
if (ref)
- channel.send(make_shared_ptr(new MessageCloseBody(channel.getVersion(), ref->getId())));
+ channel.send(MessageCloseBody(ProtocolVersion(), ref->getId()));
}
@@ -202,8 +199,8 @@ bool MessageMessage::isComplete()
uint64_t MessageMessage::contentSize() const
{
- if (transfer->getBody().isInline())
- return transfer->getBody().getValue().size();
+ if (transfer.getBody().isInline())
+ return transfer.getBody().getValue().size();
else {
assert(getReference());
return getReference()->getSize();
@@ -217,11 +214,11 @@ qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties()
const FieldTable& MessageMessage::getApplicationHeaders()
{
- return transfer->getApplicationHeaders();
+ return transfer.getApplicationHeaders();
}
bool MessageMessage::isPersistent()
{
- return transfer->getDeliveryMode() == PERSISTENT;
+ return transfer.getDeliveryMode() == PERSISTENT;
}
uint32_t MessageMessage::encodedSize() const
@@ -231,7 +228,7 @@ uint32_t MessageMessage::encodedSize() const
uint32_t MessageMessage::encodedHeaderSize() const
{
- return RecoveryManagerImpl::encodedMessageTypeSize() + transfer->size() - transfer->baseSize();
+ return RecoveryManagerImpl::encodedMessageTypeSize() + transfer.size();
}
uint32_t MessageMessage::encodedContentSize() const
@@ -252,18 +249,17 @@ void MessageMessage::encode(Buffer& buffer) const
void MessageMessage::encodeHeader(Buffer& buffer) const
{
RecoveryManagerImpl::encodeMessageType(*this, buffer);
- if (transfer->getBody().isInline()) {
- transfer->encodeContent(buffer);
+ if (transfer.getBody().isInline()) {
+ transfer.encode(buffer);
} else {
assert(getReference());
string data;
const Reference::Appends& appends = getReference()->getAppends();
for(Reference::Appends::const_iterator a = appends.begin(); a != appends.end(); ++a) {
- data += (*a)->getBytes();
+ data += a->getBytes();
}
framing::Content body(INLINE, data);
- std::auto_ptr<MessageTransferBody> copy(copyTransfer(transfer->version, transfer->getDestination(), body));
- copy->encodeContent(buffer);
+ copyTransfer(ProtocolVersion(), transfer.getDestination(), body).encode(buffer);
}
}
@@ -271,8 +267,7 @@ void MessageMessage::decodeHeader(Buffer& buffer)
{
//don't care about the type here, but want encode/decode to be symmetric
RecoveryManagerImpl::decodeMessageType(buffer);
-
- transfer->decodeContent(buffer);
+ transfer.decode(buffer);
}
void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/)
@@ -280,37 +275,36 @@ void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/)
}
-MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version,
- const string& destination,
- const framing::Content& body) const
+MessageTransferBody MessageMessage::copyTransfer(const ProtocolVersion& version,
+ const string& destination,
+ const framing::Content& body) const
{
- return new MessageTransferBody(version,
- transfer->getTicket(),
- destination,
- getRedelivered(),
- transfer->getRejectUnroutable(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- 0, /*content-length*/
- string(), /*type*/
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- body);
-
+ return MessageTransferBody(version,
+ transfer.getTicket(),
+ destination,
+ getRedelivered(),
+ transfer.getRejectUnroutable(),
+ transfer.getImmediate(),
+ transfer.getTtl(),
+ transfer.getPriority(),
+ transfer.getTimestamp(),
+ transfer.getDeliveryMode(),
+ transfer.getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer.getMessageId(),
+ transfer.getCorrelationId(),
+ transfer.getReplyTo(),
+ transfer.getContentType(),
+ transfer.getContentEncoding(),
+ 0, /*content-length*/
+ string(), /*type*/
+ transfer.getUserId(),
+ transfer.getAppId(),
+ transfer.getTransactionId(),
+ transfer.getSecurityToken(),
+ transfer.getApplicationHeaders(),
+ body);
}
MessageMessage::ReferencePtr MessageMessage::getReference() const {
@@ -321,7 +315,7 @@ uint32_t MessageMessage::getRequiredCredit() const
{
//TODO: change when encoding changes. Should be the payload of any
//header & body frames.
- return transfer->size();
+ return transfer.size();
}