diff options
Diffstat (limited to 'cpp/lib/broker/BrokerMessageMessage.cpp')
| -rw-r--r-- | cpp/lib/broker/BrokerMessageMessage.cpp | 89 |
1 files changed, 56 insertions, 33 deletions
diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp index e1be57fad7..35e5039e12 100644 --- a/cpp/lib/broker/BrokerMessageMessage.cpp +++ b/cpp/lib/broker/BrokerMessageMessage.cpp @@ -61,14 +61,13 @@ MessageMessage::MessageMessage( reference(reference_) {} -void MessageMessage::deliver( +void MessageMessage::transferMessage( framing::ChannelAdapter& channel, const std::string& consumerTag, - u_int64_t /*deliveryTag*/, - u_int32_t /*framesize*/) -{ + u_int32_t framesize) +{ const framing::Content& body = transfer->getBody(); - + // Send any reference data if (!body.isInline()){ // Open @@ -81,8 +80,9 @@ void MessageMessage::deliver( } } - // The the transfer - channel.send( + // The transfer + if ( transfer->size()<=framesize ) { + channel.send( new MessageTransferBody(channel.getVersion(), transfer->getTicket(), consumerTag, @@ -107,6 +107,44 @@ void MessageMessage::deliver( transfer->getApplicationHeaders(), body, transfer->getMandatory())); + } else { + // Thing to do here is to construct a simple reference message then deliver that instead + // fragmentmentation will be taken care of in the delivery + // if necessary; problem is to invent a reference name to use + string content = body.getValue(); + string refname = "dummy"; + TransferPtr newTransfer( + new MessageTransferBody(channel.getVersion(), + transfer->getTicket(), + consumerTag, + getRedelivered(), + transfer->getImmediate(), + transfer->getTtl(), + transfer->getPriority(), + transfer->getTimestamp(), + transfer->getDeliveryMode(), + transfer->getExpiration(), + getExchange(), + getRoutingKey(), + transfer->getMessageId(), + transfer->getCorrelationId(), + transfer->getReplyTo(), + transfer->getContentType(), + transfer->getContentEncoding(), + transfer->getUserId(), + transfer->getAppId(), + transfer->getTransactionId(), + transfer->getSecurityToken(), + transfer->getApplicationHeaders(), + framing::Content(REFERENCE, refname), + transfer->getMandatory())); + ReferencePtr newRef(new Reference(refname)); + Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content)); + newRef->append(newAppend); + MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), 0, newTransfer, newRef); + newMsg.transferMessage(channel, consumerTag, framesize); + return; + } // Close any reference data if (!body.isInline()){ // Close @@ -114,39 +152,24 @@ void MessageMessage::deliver( } } +void MessageMessage::deliver( + framing::ChannelAdapter& channel, + const std::string& consumerTag, + u_int64_t /*deliveryTag*/, + u_int32_t framesize) +{ + transferMessage(channel, consumerTag, framesize); +} + void MessageMessage::sendGetOk( const framing::MethodContext& context, const std::string& destination, u_int32_t /*messageCount*/, u_int64_t /*deliveryTag*/, - u_int32_t /*framesize*/) + u_int32_t framesize) { framing::ChannelAdapter* channel = context.channel; - channel->send( - new MessageTransferBody(channel->getVersion(), - transfer->getTicket(), - destination, - getRedelivered(), - transfer->getImmediate(), - transfer->getTtl(), - transfer->getPriority(), - transfer->getTimestamp(), - transfer->getDeliveryMode(), - transfer->getExpiration(), - getExchange(), - getRoutingKey(), - transfer->getMessageId(), - transfer->getCorrelationId(), - transfer->getReplyTo(), - transfer->getContentType(), - transfer->getContentEncoding(), - transfer->getUserId(), - transfer->getAppId(), - transfer->getTransactionId(), - transfer->getSecurityToken(), - transfer->getApplicationHeaders(), - transfer->getBody(), - transfer->getMandatory())); + transferMessage(*channel, destination, framesize); } bool MessageMessage::isComplete() |
