summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/UpdateClient.cpp
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2009-01-30 18:59:24 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2009-01-30 18:59:24 +0000
commitbcfa0a86baebb2598c0be270121d98a0f31b19c6 (patch)
tree726ac7a2e448a6a2d60752b63fd0d8c9ca7c9b31 /cpp/src/qpid/cluster/UpdateClient.cpp
parenta74a06df1f2dddc5c58e33b7a7290c712510e941 (diff)
downloadqpid-python-bcfa0a86baebb2598c0be270121d98a0f31b19c6.tar.gz
Correction for: start a broker in cluster, send messages that are flow to disk, then join a broker to the cluster. Then consume from the new node. Cotent released messages where loosing content. This patch corrects that.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@739378 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp16
1 files changed, 16 insertions, 0 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index f2580cb777..2ed0b26a0d 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -168,6 +168,7 @@ class MessageUpdater {
session.exchangeUnbind(queue, UpdateClient::UPDATE);
}
+
void updateQueuedMessage(const broker::QueuedMessage& message) {
if (!haveLastPos || message.position - lastPos != 1) {
ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1);
@@ -177,12 +178,27 @@ class MessageUpdater {
SessionBase_0_10Access sb(session);
framing::MessageTransferBody transfer(
framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
+
sb.get()->send(transfer, message.payload->getFrames());
+ if (message.payload->isContentReleased()){
+ uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize;
+
+ uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
+ bool morecontent = true;
+ for (uint64_t offset = 0; morecontent; offset += maxContentSize)
+ {
+ AMQFrame frame((AMQContentBody()));
+ morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset);
+ sb.get()->sendRawFrame(frame);
+ }
+ }
}
void updateMessage(const boost::intrusive_ptr<broker::Message>& message) {
updateQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1));
}
+
+
};