From bcfa0a86baebb2598c0be270121d98a0f31b19c6 Mon Sep 17 00:00:00 2001 From: "Carl C. Trieloff" Date: Fri, 30 Jan 2009 18:59:24 +0000 Subject: Correction for: start a broker in cluster, send messages that are flow to disk, then join a broker to the cluster. Then consume from the new node. Cotent released messages where loosing content. This patch corrects that. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@739378 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/cluster/UpdateClient.cpp | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp') diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index f2580cb777..2ed0b26a0d 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -168,6 +168,7 @@ class MessageUpdater { session.exchangeUnbind(queue, UpdateClient::UPDATE); } + void updateQueuedMessage(const broker::QueuedMessage& message) { if (!haveLastPos || message.position - lastPos != 1) { ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1); @@ -177,12 +178,27 @@ class MessageUpdater { SessionBase_0_10Access sb(session); framing::MessageTransferBody transfer( framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED); + sb.get()->send(transfer, message.payload->getFrames()); + if (message.payload->isContentReleased()){ + uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize; + + uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); + bool morecontent = true; + for (uint64_t offset = 0; morecontent; offset += maxContentSize) + { + AMQFrame frame((AMQContentBody())); + morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset); + sb.get()->sendRawFrame(frame); + } + } } void updateMessage(const boost::intrusive_ptr& message) { updateQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1)); } + + }; -- cgit v1.2.1