diff options
| author | Alan Conway <aconway@apache.org> | 2007-08-16 20:12:33 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-08-16 20:12:33 +0000 |
| commit | 49c7a491c98c26fe7d4f017a7ba655dfc029278c (patch) | |
| tree | 304d51ba039a5391b4ebde08caab3da978b465fb /cpp/src/qpid/broker/BrokerMessageMessage.cpp | |
| parent | dc13ca80ff893f74ab57fee6543de6543aa366bc (diff) | |
| download | qpid-python-49c7a491c98c26fe7d4f017a7ba655dfc029278c.tar.gz | |
AMQBodies are no longer allocated on the heap and passed with shared_ptr.
AMQFrame contains a boost::variant of AMQHeaderBody,AMQContentBody,
AMQHeatbeatBody, and MethodHolder. A variant is basically a type-safe
union, it can allocate any of the types in-place.
MethodHolder contains a Blob, a less sophisticated kind of variant,
which can contain any of the concrete method body types.
Using variants for all the method types causes outrageous compile
times and bloated library symbol names. Blob lacks some of the finer
features of variant and needs help from generated code. For now both
are hidden to the rest of the code base behind AMQFrame and MethodBody
classes so if/when we decide to settle on just one "variant" type
solution we can do so.
This commit touches nearly 100 files, mostly converting method
signatures with shared_ptr<FooBody> to FooBody* or FooBody&, and
converting stored shared_ptr<AMQBody> to AMQFrame and
share_ptr<AMQMethodBody> to MethodHolder.
There is one outstanding client memory leak, which I will fix in my next commit.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566822 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/BrokerMessageMessage.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/BrokerMessageMessage.cpp | 232 |
1 files changed, 113 insertions, 119 deletions
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp index 0da5f3d8f5..1184885aeb 100644 --- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp +++ b/cpp/src/qpid/broker/BrokerMessageMessage.cpp @@ -48,35 +48,34 @@ struct MessageDeliveryToken : public DeliveryToken }; MessageMessage::MessageMessage( - ConnectionToken* publisher, TransferPtr transfer_ + ConnectionToken* publisher, const MessageTransferBody* transfer_ ) : Message(publisher, transfer_->getDestination(), transfer_->getRoutingKey(), transfer_->getRejectUnroutable(), - transfer_->getImmediate(), - transfer_), - transfer(transfer_) + transfer_->getImmediate()), + transfer(*transfer_) { - assert(transfer->getBody().isInline()); + assert(transfer.getBody().isInline()); } MessageMessage::MessageMessage( - ConnectionToken* publisher, TransferPtr transfer_, ReferencePtr reference_ + ConnectionToken* publisher, const MessageTransferBody* transfer_, + ReferencePtr reference_ ) : Message(publisher, transfer_->getDestination(), transfer_->getRoutingKey(), transfer_->getRejectUnroutable(), - transfer_->getImmediate(), - transfer_), - transfer(transfer_), + transfer_->getImmediate()), + transfer(*transfer_), reference(reference_) { - assert(!transfer->getBody().isInline()); + assert(!transfer.getBody().isInline()); assert(reference_); } /** * Currently used by message store impls to recover messages */ -MessageMessage::MessageMessage() : transfer(new MessageTransferBody(qpid::framing::highestProtocolVersion)) {} +MessageMessage::MessageMessage() {} // TODO: astitcher 1-Mar-2007: This code desperately needs better factoring void MessageMessage::transferMessage( @@ -84,27 +83,28 @@ void MessageMessage::transferMessage( const std::string& consumerTag, uint32_t framesize) { - const framing::Content& body = transfer->getBody(); + const framing::Content& body = transfer.getBody(); // Send any reference data ReferencePtr ref= getReference(); if (ref){ // Open - channel.send(make_shared_ptr(new MessageOpenBody(channel.getVersion(), ref->getId()))); + channel.send(MessageOpenBody(channel.getVersion(), ref->getId())); // Appends for(Reference::Appends::const_iterator a = ref->getAppends().begin(); a != ref->getAppends().end(); ++a) { - uint32_t sizeleft = (*a)->size(); - const string& content = (*a)->getBytes(); + uint32_t sizeleft = a->size(); + const string& content = a->getBytes(); // Calculate overhead bytes // Assume that the overhead is constant as the reference name doesn't change uint32_t overhead = sizeleft - content.size(); string::size_type contentStart = 0; while (sizeleft) { string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead; - channel.send(make_shared_ptr(new MessageAppendBody(channel.getVersion(), ref->getId(), - string(content, contentStart, contentSize)))); + + channel.send(MessageAppendBody(channel.getVersion(), ref->getId(), + string(content, contentStart, contentSize))); sizeleft -= contentSize; contentStart += contentSize; } @@ -112,76 +112,73 @@ void MessageMessage::transferMessage( } // The transfer - if ( transfer->size()<=framesize ) { - channel.send(make_shared_ptr( - new MessageTransferBody(channel.getVersion(), - transfer->getTicket(), - consumerTag, - getRedelivered(), - transfer->getRejectUnroutable(), - transfer->getImmediate(), - transfer->getTtl(), - transfer->getPriority(), - transfer->getTimestamp(), - transfer->getDeliveryMode(), - transfer->getExpiration(), - getExchange(), - getRoutingKey(), - transfer->getMessageId(), - transfer->getCorrelationId(), - transfer->getReplyTo(), - transfer->getContentType(), - transfer->getContentEncoding(), - 0, /*content-length*/ - string(), /*type*/ - transfer->getUserId(), - transfer->getAppId(), - transfer->getTransactionId(), - transfer->getSecurityToken(), - transfer->getApplicationHeaders(), - body))); + if ( transfer.size()<=framesize ) { + channel.send(MessageTransferBody(ProtocolVersion(), + transfer.getTicket(), + consumerTag, + getRedelivered(), + transfer.getRejectUnroutable(), + transfer.getImmediate(), + transfer.getTtl(), + transfer.getPriority(), + transfer.getTimestamp(), + transfer.getDeliveryMode(), + transfer.getExpiration(), + getExchange(), + getRoutingKey(), + transfer.getMessageId(), + transfer.getCorrelationId(), + transfer.getReplyTo(), + transfer.getContentType(), + transfer.getContentEncoding(), + 0, /*content-length*/ + string(), /*type*/ + transfer.getUserId(), + transfer.getAppId(), + transfer.getTransactionId(), + transfer.getSecurityToken(), + transfer.getApplicationHeaders(), + body)); } else { // Thing to do here is to construct a simple reference message then deliver that instead // fragmentation will be taken care of in the delivery if necessary; string content = body.getValue(); string refname = "dummy"; - TransferPtr newTransfer( - new MessageTransferBody(channel.getVersion(), - transfer->getTicket(), - consumerTag, - getRedelivered(), - transfer->getRejectUnroutable(), - transfer->getImmediate(), - transfer->getTtl(), - transfer->getPriority(), - transfer->getTimestamp(), - transfer->getDeliveryMode(), - transfer->getExpiration(), - getExchange(), - getRoutingKey(), - transfer->getMessageId(), - transfer->getCorrelationId(), - transfer->getReplyTo(), - transfer->getContentType(), - transfer->getContentEncoding(), - 0, /*content-length*/ - string(), /*type*/ - transfer->getUserId(), - transfer->getAppId(), - transfer->getTransactionId(), - transfer->getSecurityToken(), - transfer->getApplicationHeaders(), - framing::Content(REFERENCE, refname))); + MessageTransferBody newTransfer(channel.getVersion(), + transfer.getTicket(), + consumerTag, + getRedelivered(), + transfer.getRejectUnroutable(), + transfer.getImmediate(), + transfer.getTtl(), + transfer.getPriority(), + transfer.getTimestamp(), + transfer.getDeliveryMode(), + transfer.getExpiration(), + getExchange(), + getRoutingKey(), + transfer.getMessageId(), + transfer.getCorrelationId(), + transfer.getReplyTo(), + transfer.getContentType(), + transfer.getContentEncoding(), + 0, /*content-length*/ + string(), /*type*/ + transfer.getUserId(), + transfer.getAppId(), + transfer.getTransactionId(), + transfer.getSecurityToken(), + transfer.getApplicationHeaders(), + framing::Content(REFERENCE, refname)); ReferencePtr newRef(new Reference(refname)); - Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content)); - newRef->append(newAppend); - MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), newTransfer, newRef); + newRef->append(MessageAppendBody(channel.getVersion(), refname, content)); + MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), &newTransfer, newRef); newMsg.transferMessage(channel, consumerTag, framesize); return; } // Close any reference data if (ref) - channel.send(make_shared_ptr(new MessageCloseBody(channel.getVersion(), ref->getId()))); + channel.send(MessageCloseBody(ProtocolVersion(), ref->getId())); } @@ -202,8 +199,8 @@ bool MessageMessage::isComplete() uint64_t MessageMessage::contentSize() const { - if (transfer->getBody().isInline()) - return transfer->getBody().getValue().size(); + if (transfer.getBody().isInline()) + return transfer.getBody().getValue().size(); else { assert(getReference()); return getReference()->getSize(); @@ -217,11 +214,11 @@ qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties() const FieldTable& MessageMessage::getApplicationHeaders() { - return transfer->getApplicationHeaders(); + return transfer.getApplicationHeaders(); } bool MessageMessage::isPersistent() { - return transfer->getDeliveryMode() == PERSISTENT; + return transfer.getDeliveryMode() == PERSISTENT; } uint32_t MessageMessage::encodedSize() const @@ -231,7 +228,7 @@ uint32_t MessageMessage::encodedSize() const uint32_t MessageMessage::encodedHeaderSize() const { - return RecoveryManagerImpl::encodedMessageTypeSize() + transfer->size() - transfer->baseSize(); + return RecoveryManagerImpl::encodedMessageTypeSize() + transfer.size(); } uint32_t MessageMessage::encodedContentSize() const @@ -252,18 +249,17 @@ void MessageMessage::encode(Buffer& buffer) const void MessageMessage::encodeHeader(Buffer& buffer) const { RecoveryManagerImpl::encodeMessageType(*this, buffer); - if (transfer->getBody().isInline()) { - transfer->encodeContent(buffer); + if (transfer.getBody().isInline()) { + transfer.encode(buffer); } else { assert(getReference()); string data; const Reference::Appends& appends = getReference()->getAppends(); for(Reference::Appends::const_iterator a = appends.begin(); a != appends.end(); ++a) { - data += (*a)->getBytes(); + data += a->getBytes(); } framing::Content body(INLINE, data); - std::auto_ptr<MessageTransferBody> copy(copyTransfer(transfer->version, transfer->getDestination(), body)); - copy->encodeContent(buffer); + copyTransfer(ProtocolVersion(), transfer.getDestination(), body).encode(buffer); } } @@ -271,8 +267,7 @@ void MessageMessage::decodeHeader(Buffer& buffer) { //don't care about the type here, but want encode/decode to be symmetric RecoveryManagerImpl::decodeMessageType(buffer); - - transfer->decodeContent(buffer); + transfer.decode(buffer); } void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/) @@ -280,37 +275,36 @@ void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/) } -MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version, - const string& destination, - const framing::Content& body) const +MessageTransferBody MessageMessage::copyTransfer(const ProtocolVersion& version, + const string& destination, + const framing::Content& body) const { - return new MessageTransferBody(version, - transfer->getTicket(), - destination, - getRedelivered(), - transfer->getRejectUnroutable(), - transfer->getImmediate(), - transfer->getTtl(), - transfer->getPriority(), - transfer->getTimestamp(), - transfer->getDeliveryMode(), - transfer->getExpiration(), - getExchange(), - getRoutingKey(), - transfer->getMessageId(), - transfer->getCorrelationId(), - transfer->getReplyTo(), - transfer->getContentType(), - transfer->getContentEncoding(), - 0, /*content-length*/ - string(), /*type*/ - transfer->getUserId(), - transfer->getAppId(), - transfer->getTransactionId(), - transfer->getSecurityToken(), - transfer->getApplicationHeaders(), - body); - + return MessageTransferBody(version, + transfer.getTicket(), + destination, + getRedelivered(), + transfer.getRejectUnroutable(), + transfer.getImmediate(), + transfer.getTtl(), + transfer.getPriority(), + transfer.getTimestamp(), + transfer.getDeliveryMode(), + transfer.getExpiration(), + getExchange(), + getRoutingKey(), + transfer.getMessageId(), + transfer.getCorrelationId(), + transfer.getReplyTo(), + transfer.getContentType(), + transfer.getContentEncoding(), + 0, /*content-length*/ + string(), /*type*/ + transfer.getUserId(), + transfer.getAppId(), + transfer.getTransactionId(), + transfer.getSecurityToken(), + transfer.getApplicationHeaders(), + body); } MessageMessage::ReferencePtr MessageMessage::getReference() const { @@ -321,7 +315,7 @@ uint32_t MessageMessage::getRequiredCredit() const { //TODO: change when encoding changes. Should be the payload of any //header & body frames. - return transfer->size(); + return transfer.size(); } |
