summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/BrokerMessageMessage.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/broker/BrokerMessageMessage.cpp')
-rw-r--r--cpp/lib/broker/BrokerMessageMessage.cpp89
1 files changed, 56 insertions, 33 deletions
diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp
index e1be57fad7..35e5039e12 100644
--- a/cpp/lib/broker/BrokerMessageMessage.cpp
+++ b/cpp/lib/broker/BrokerMessageMessage.cpp
@@ -61,14 +61,13 @@ MessageMessage::MessageMessage(
reference(reference_)
{}
-void MessageMessage::deliver(
+void MessageMessage::transferMessage(
framing::ChannelAdapter& channel,
const std::string& consumerTag,
- u_int64_t /*deliveryTag*/,
- u_int32_t /*framesize*/)
-{
+ u_int32_t framesize)
+{
const framing::Content& body = transfer->getBody();
-
+
// Send any reference data
if (!body.isInline()){
// Open
@@ -81,8 +80,9 @@ void MessageMessage::deliver(
}
}
- // The the transfer
- channel.send(
+ // The transfer
+ if ( transfer->size()<=framesize ) {
+ channel.send(
new MessageTransferBody(channel.getVersion(),
transfer->getTicket(),
consumerTag,
@@ -107,6 +107,44 @@ void MessageMessage::deliver(
transfer->getApplicationHeaders(),
body,
transfer->getMandatory()));
+ } else {
+ // Thing to do here is to construct a simple reference message then deliver that instead
+ // fragmentmentation will be taken care of in the delivery
+ // if necessary; problem is to invent a reference name to use
+ string content = body.getValue();
+ string refname = "dummy";
+ TransferPtr newTransfer(
+ new MessageTransferBody(channel.getVersion(),
+ transfer->getTicket(),
+ consumerTag,
+ getRedelivered(),
+ transfer->getImmediate(),
+ transfer->getTtl(),
+ transfer->getPriority(),
+ transfer->getTimestamp(),
+ transfer->getDeliveryMode(),
+ transfer->getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer->getMessageId(),
+ transfer->getCorrelationId(),
+ transfer->getReplyTo(),
+ transfer->getContentType(),
+ transfer->getContentEncoding(),
+ transfer->getUserId(),
+ transfer->getAppId(),
+ transfer->getTransactionId(),
+ transfer->getSecurityToken(),
+ transfer->getApplicationHeaders(),
+ framing::Content(REFERENCE, refname),
+ transfer->getMandatory()));
+ ReferencePtr newRef(new Reference(refname));
+ Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content));
+ newRef->append(newAppend);
+ MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), 0, newTransfer, newRef);
+ newMsg.transferMessage(channel, consumerTag, framesize);
+ return;
+ }
// Close any reference data
if (!body.isInline()){
// Close
@@ -114,39 +152,24 @@ void MessageMessage::deliver(
}
}
+void MessageMessage::deliver(
+ framing::ChannelAdapter& channel,
+ const std::string& consumerTag,
+ u_int64_t /*deliveryTag*/,
+ u_int32_t framesize)
+{
+ transferMessage(channel, consumerTag, framesize);
+}
+
void MessageMessage::sendGetOk(
const framing::MethodContext& context,
const std::string& destination,
u_int32_t /*messageCount*/,
u_int64_t /*deliveryTag*/,
- u_int32_t /*framesize*/)
+ u_int32_t framesize)
{
framing::ChannelAdapter* channel = context.channel;
- channel->send(
- new MessageTransferBody(channel->getVersion(),
- transfer->getTicket(),
- destination,
- getRedelivered(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- transfer->getBody(),
- transfer->getMandatory()));
+ transferMessage(*channel, destination, framesize);
}
bool MessageMessage::isComplete()