From 49c7a491c98c26fe7d4f017a7ba655dfc029278c Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 16 Aug 2007 20:12:33 +0000 Subject: AMQBodies are no longer allocated on the heap and passed with shared_ptr. AMQFrame contains a boost::variant of AMQHeaderBody,AMQContentBody, AMQHeatbeatBody, and MethodHolder. A variant is basically a type-safe union, it can allocate any of the types in-place. MethodHolder contains a Blob, a less sophisticated kind of variant, which can contain any of the concrete method body types. Using variants for all the method types causes outrageous compile times and bloated library symbol names. Blob lacks some of the finer features of variant and needs help from generated code. For now both are hidden to the rest of the code base behind AMQFrame and MethodBody classes so if/when we decide to settle on just one "variant" type solution we can do so. This commit touches nearly 100 files, mostly converting method signatures with shared_ptr to FooBody* or FooBody&, and converting stored shared_ptr to AMQFrame and share_ptr to MethodHolder. There is one outstanding client memory leak, which I will fix in my next commit. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566822 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/BrokerMessageMessage.cpp | 232 +++++++++++++-------------- 1 file changed, 113 insertions(+), 119 deletions(-) (limited to 'cpp/src/qpid/broker/BrokerMessageMessage.cpp') diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp index 0da5f3d8f5..1184885aeb 100644 --- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp +++ b/cpp/src/qpid/broker/BrokerMessageMessage.cpp @@ -48,35 +48,34 @@ struct MessageDeliveryToken : public DeliveryToken }; MessageMessage::MessageMessage( - ConnectionToken* publisher, TransferPtr transfer_ + ConnectionToken* publisher, const MessageTransferBody* transfer_ ) : Message(publisher, transfer_->getDestination(), transfer_->getRoutingKey(), transfer_->getRejectUnroutable(), - transfer_->getImmediate(), - transfer_), - transfer(transfer_) + transfer_->getImmediate()), + transfer(*transfer_) { - assert(transfer->getBody().isInline()); + assert(transfer.getBody().isInline()); } MessageMessage::MessageMessage( - ConnectionToken* publisher, TransferPtr transfer_, ReferencePtr reference_ + ConnectionToken* publisher, const MessageTransferBody* transfer_, + ReferencePtr reference_ ) : Message(publisher, transfer_->getDestination(), transfer_->getRoutingKey(), transfer_->getRejectUnroutable(), - transfer_->getImmediate(), - transfer_), - transfer(transfer_), + transfer_->getImmediate()), + transfer(*transfer_), reference(reference_) { - assert(!transfer->getBody().isInline()); + assert(!transfer.getBody().isInline()); assert(reference_); } /** * Currently used by message store impls to recover messages */ -MessageMessage::MessageMessage() : transfer(new MessageTransferBody(qpid::framing::highestProtocolVersion)) {} +MessageMessage::MessageMessage() {} // TODO: astitcher 1-Mar-2007: This code desperately needs better factoring void MessageMessage::transferMessage( @@ -84,27 +83,28 @@ void MessageMessage::transferMessage( const std::string& consumerTag, uint32_t framesize) { - const framing::Content& body = transfer->getBody(); + const framing::Content& body = transfer.getBody(); // Send any reference data ReferencePtr ref= getReference(); if (ref){ // Open - channel.send(make_shared_ptr(new MessageOpenBody(channel.getVersion(), ref->getId()))); + channel.send(MessageOpenBody(channel.getVersion(), ref->getId())); // Appends for(Reference::Appends::const_iterator a = ref->getAppends().begin(); a != ref->getAppends().end(); ++a) { - uint32_t sizeleft = (*a)->size(); - const string& content = (*a)->getBytes(); + uint32_t sizeleft = a->size(); + const string& content = a->getBytes(); // Calculate overhead bytes // Assume that the overhead is constant as the reference name doesn't change uint32_t overhead = sizeleft - content.size(); string::size_type contentStart = 0; while (sizeleft) { string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead; - channel.send(make_shared_ptr(new MessageAppendBody(channel.getVersion(), ref->getId(), - string(content, contentStart, contentSize)))); + + channel.send(MessageAppendBody(channel.getVersion(), ref->getId(), + string(content, contentStart, contentSize))); sizeleft -= contentSize; contentStart += contentSize; } @@ -112,76 +112,73 @@ void MessageMessage::transferMessage( } // The transfer - if ( transfer->size()<=framesize ) { - channel.send(make_shared_ptr( - new MessageTransferBody(channel.getVersion(), - transfer->getTicket(), - consumerTag, - getRedelivered(), - transfer->getRejectUnroutable(), - transfer->getImmediate(), - transfer->getTtl(), - transfer->getPriority(), - transfer->getTimestamp(), - transfer->getDeliveryMode(), - transfer->getExpiration(), - getExchange(), - getRoutingKey(), - transfer->getMessageId(), - transfer->getCorrelationId(), - transfer->getReplyTo(), - transfer->getContentType(), - transfer->getContentEncoding(), - 0, /*content-length*/ - string(), /*type*/ - transfer->getUserId(), - transfer->getAppId(), - transfer->getTransactionId(), - transfer->getSecurityToken(), - transfer->getApplicationHeaders(), - body))); + if ( transfer.size()<=framesize ) { + channel.send(MessageTransferBody(ProtocolVersion(), + transfer.getTicket(), + consumerTag, + getRedelivered(), + transfer.getRejectUnroutable(), + transfer.getImmediate(), + transfer.getTtl(), + transfer.getPriority(), + transfer.getTimestamp(), + transfer.getDeliveryMode(), + transfer.getExpiration(), + getExchange(), + getRoutingKey(), + transfer.getMessageId(), + transfer.getCorrelationId(), + transfer.getReplyTo(), + transfer.getContentType(), + transfer.getContentEncoding(), + 0, /*content-length*/ + string(), /*type*/ + transfer.getUserId(), + transfer.getAppId(), + transfer.getTransactionId(), + transfer.getSecurityToken(), + transfer.getApplicationHeaders(), + body)); } else { // Thing to do here is to construct a simple reference message then deliver that instead // fragmentation will be taken care of in the delivery if necessary; string content = body.getValue(); string refname = "dummy"; - TransferPtr newTransfer( - new MessageTransferBody(channel.getVersion(), - transfer->getTicket(), - consumerTag, - getRedelivered(), - transfer->getRejectUnroutable(), - transfer->getImmediate(), - transfer->getTtl(), - transfer->getPriority(), - transfer->getTimestamp(), - transfer->getDeliveryMode(), - transfer->getExpiration(), - getExchange(), - getRoutingKey(), - transfer->getMessageId(), - transfer->getCorrelationId(), - transfer->getReplyTo(), - transfer->getContentType(), - transfer->getContentEncoding(), - 0, /*content-length*/ - string(), /*type*/ - transfer->getUserId(), - transfer->getAppId(), - transfer->getTransactionId(), - transfer->getSecurityToken(), - transfer->getApplicationHeaders(), - framing::Content(REFERENCE, refname))); + MessageTransferBody newTransfer(channel.getVersion(), + transfer.getTicket(), + consumerTag, + getRedelivered(), + transfer.getRejectUnroutable(), + transfer.getImmediate(), + transfer.getTtl(), + transfer.getPriority(), + transfer.getTimestamp(), + transfer.getDeliveryMode(), + transfer.getExpiration(), + getExchange(), + getRoutingKey(), + transfer.getMessageId(), + transfer.getCorrelationId(), + transfer.getReplyTo(), + transfer.getContentType(), + transfer.getContentEncoding(), + 0, /*content-length*/ + string(), /*type*/ + transfer.getUserId(), + transfer.getAppId(), + transfer.getTransactionId(), + transfer.getSecurityToken(), + transfer.getApplicationHeaders(), + framing::Content(REFERENCE, refname)); ReferencePtr newRef(new Reference(refname)); - Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content)); - newRef->append(newAppend); - MessageMessage newMsg(const_cast(getPublisher()), newTransfer, newRef); + newRef->append(MessageAppendBody(channel.getVersion(), refname, content)); + MessageMessage newMsg(const_cast(getPublisher()), &newTransfer, newRef); newMsg.transferMessage(channel, consumerTag, framesize); return; } // Close any reference data if (ref) - channel.send(make_shared_ptr(new MessageCloseBody(channel.getVersion(), ref->getId()))); + channel.send(MessageCloseBody(ProtocolVersion(), ref->getId())); } @@ -202,8 +199,8 @@ bool MessageMessage::isComplete() uint64_t MessageMessage::contentSize() const { - if (transfer->getBody().isInline()) - return transfer->getBody().getValue().size(); + if (transfer.getBody().isInline()) + return transfer.getBody().getValue().size(); else { assert(getReference()); return getReference()->getSize(); @@ -217,11 +214,11 @@ qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties() const FieldTable& MessageMessage::getApplicationHeaders() { - return transfer->getApplicationHeaders(); + return transfer.getApplicationHeaders(); } bool MessageMessage::isPersistent() { - return transfer->getDeliveryMode() == PERSISTENT; + return transfer.getDeliveryMode() == PERSISTENT; } uint32_t MessageMessage::encodedSize() const @@ -231,7 +228,7 @@ uint32_t MessageMessage::encodedSize() const uint32_t MessageMessage::encodedHeaderSize() const { - return RecoveryManagerImpl::encodedMessageTypeSize() + transfer->size() - transfer->baseSize(); + return RecoveryManagerImpl::encodedMessageTypeSize() + transfer.size(); } uint32_t MessageMessage::encodedContentSize() const @@ -252,18 +249,17 @@ void MessageMessage::encode(Buffer& buffer) const void MessageMessage::encodeHeader(Buffer& buffer) const { RecoveryManagerImpl::encodeMessageType(*this, buffer); - if (transfer->getBody().isInline()) { - transfer->encodeContent(buffer); + if (transfer.getBody().isInline()) { + transfer.encode(buffer); } else { assert(getReference()); string data; const Reference::Appends& appends = getReference()->getAppends(); for(Reference::Appends::const_iterator a = appends.begin(); a != appends.end(); ++a) { - data += (*a)->getBytes(); + data += a->getBytes(); } framing::Content body(INLINE, data); - std::auto_ptr copy(copyTransfer(transfer->version, transfer->getDestination(), body)); - copy->encodeContent(buffer); + copyTransfer(ProtocolVersion(), transfer.getDestination(), body).encode(buffer); } } @@ -271,8 +267,7 @@ void MessageMessage::decodeHeader(Buffer& buffer) { //don't care about the type here, but want encode/decode to be symmetric RecoveryManagerImpl::decodeMessageType(buffer); - - transfer->decodeContent(buffer); + transfer.decode(buffer); } void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/) @@ -280,37 +275,36 @@ void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/) } -MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version, - const string& destination, - const framing::Content& body) const +MessageTransferBody MessageMessage::copyTransfer(const ProtocolVersion& version, + const string& destination, + const framing::Content& body) const { - return new MessageTransferBody(version, - transfer->getTicket(), - destination, - getRedelivered(), - transfer->getRejectUnroutable(), - transfer->getImmediate(), - transfer->getTtl(), - transfer->getPriority(), - transfer->getTimestamp(), - transfer->getDeliveryMode(), - transfer->getExpiration(), - getExchange(), - getRoutingKey(), - transfer->getMessageId(), - transfer->getCorrelationId(), - transfer->getReplyTo(), - transfer->getContentType(), - transfer->getContentEncoding(), - 0, /*content-length*/ - string(), /*type*/ - transfer->getUserId(), - transfer->getAppId(), - transfer->getTransactionId(), - transfer->getSecurityToken(), - transfer->getApplicationHeaders(), - body); - + return MessageTransferBody(version, + transfer.getTicket(), + destination, + getRedelivered(), + transfer.getRejectUnroutable(), + transfer.getImmediate(), + transfer.getTtl(), + transfer.getPriority(), + transfer.getTimestamp(), + transfer.getDeliveryMode(), + transfer.getExpiration(), + getExchange(), + getRoutingKey(), + transfer.getMessageId(), + transfer.getCorrelationId(), + transfer.getReplyTo(), + transfer.getContentType(), + transfer.getContentEncoding(), + 0, /*content-length*/ + string(), /*type*/ + transfer.getUserId(), + transfer.getAppId(), + transfer.getTransactionId(), + transfer.getSecurityToken(), + transfer.getApplicationHeaders(), + body); } MessageMessage::ReferencePtr MessageMessage::getReference() const { @@ -321,7 +315,7 @@ uint32_t MessageMessage::getRequiredCredit() const { //TODO: change when encoding changes. Should be the payload of any //header & body frames. - return transfer->size(); + return transfer.size(); } -- cgit v1.2.1