diff options
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 16 |
1 files changed, 16 insertions, 0 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index f2580cb777..2ed0b26a0d 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -168,6 +168,7 @@ class MessageUpdater { session.exchangeUnbind(queue, UpdateClient::UPDATE); } + void updateQueuedMessage(const broker::QueuedMessage& message) { if (!haveLastPos || message.position - lastPos != 1) { ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1); @@ -177,12 +178,27 @@ class MessageUpdater { SessionBase_0_10Access sb(session); framing::MessageTransferBody transfer( framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED); + sb.get()->send(transfer, message.payload->getFrames()); + if (message.payload->isContentReleased()){ + uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize; + + uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); + bool morecontent = true; + for (uint64_t offset = 0; morecontent; offset += maxContentSize) + { + AMQFrame frame((AMQContentBody())); + morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset); + sb.get()->sendRawFrame(frame); + } + } } void updateMessage(const boost::intrusive_ptr<broker::Message>& message) { updateQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1)); } + + }; |
