summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/UpdateClient.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp16
1 files changed, 16 insertions, 0 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index f2580cb777..2ed0b26a0d 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -168,6 +168,7 @@ class MessageUpdater {
session.exchangeUnbind(queue, UpdateClient::UPDATE);
}
+
void updateQueuedMessage(const broker::QueuedMessage& message) {
if (!haveLastPos || message.position - lastPos != 1) {
ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1);
@@ -177,12 +178,27 @@ class MessageUpdater {
SessionBase_0_10Access sb(session);
framing::MessageTransferBody transfer(
framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
+
sb.get()->send(transfer, message.payload->getFrames());
+ if (message.payload->isContentReleased()){
+ uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize;
+
+ uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
+ bool morecontent = true;
+ for (uint64_t offset = 0; morecontent; offset += maxContentSize)
+ {
+ AMQFrame frame((AMQContentBody()));
+ morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset);
+ sb.get()->sendRawFrame(frame);
+ }
+ }
}
void updateMessage(const boost::intrusive_ptr<broker::Message>& message) {
updateQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1));
}
+
+
};