diff options
| author | Carl C. Trieloff <cctrieloff@apache.org> | 2009-01-30 18:59:24 +0000 |
|---|---|---|
| committer | Carl C. Trieloff <cctrieloff@apache.org> | 2009-01-30 18:59:24 +0000 |
| commit | bcfa0a86baebb2598c0be270121d98a0f31b19c6 (patch) | |
| tree | 726ac7a2e448a6a2d60752b63fd0d8c9ca7c9b31 /cpp/src/qpid/cluster/UpdateClient.cpp | |
| parent | a74a06df1f2dddc5c58e33b7a7290c712510e941 (diff) | |
| download | qpid-python-bcfa0a86baebb2598c0be270121d98a0f31b19c6.tar.gz | |
Correction for: start a broker in cluster, send messages that are flow to disk, then join a broker to the cluster. Then consume from the new node. Cotent released messages where loosing content. This patch corrects that.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@739378 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 16 |
1 files changed, 16 insertions, 0 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index f2580cb777..2ed0b26a0d 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -168,6 +168,7 @@ class MessageUpdater { session.exchangeUnbind(queue, UpdateClient::UPDATE); } + void updateQueuedMessage(const broker::QueuedMessage& message) { if (!haveLastPos || message.position - lastPos != 1) { ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1); @@ -177,12 +178,27 @@ class MessageUpdater { SessionBase_0_10Access sb(session); framing::MessageTransferBody transfer( framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED); + sb.get()->send(transfer, message.payload->getFrames()); + if (message.payload->isContentReleased()){ + uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize; + + uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); + bool morecontent = true; + for (uint64_t offset = 0; morecontent; offset += maxContentSize) + { + AMQFrame frame((AMQContentBody())); + morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset); + sb.get()->sendRawFrame(frame); + } + } } void updateMessage(const boost::intrusive_ptr<broker::Message>& message) { updateQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1)); } + + }; |
