summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2009-01-30 18:59:24 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2009-01-30 18:59:24 +0000
commitbcfa0a86baebb2598c0be270121d98a0f31b19c6 (patch)
tree726ac7a2e448a6a2d60752b63fd0d8c9ca7c9b31 /cpp/src/qpid/broker
parenta74a06df1f2dddc5c58e33b7a7290c712510e941 (diff)
downloadqpid-python-bcfa0a86baebb2598c0be270121d98a0f31b19c6.tar.gz
Correction for: start a broker in cluster, send messages that are flow to disk, then join a broker to the cluster. Then consume from the new node. Cotent released messages where loosing content. This patch corrects that.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@739378 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Message.cpp43
-rw-r--r--cpp/src/qpid/broker/Message.h3
2 files changed, 28 insertions, 18 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 6bcee99f49..e5a0c3e9e1 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -197,30 +197,39 @@ void Message::destroy()
}
}
-void Message::sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const
+bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const
{
if (isContentReleased()) {
- //load content from store in chunks of maxContentSize
- uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
intrusive_ptr<const PersistableMessage> pmsg(this);
bool done = false;
- for (uint64_t offset = 0; !done; offset += maxContentSize)
+ string& data = frame.castBody<AMQContentBody>()->getData();
+ store->loadContent(queue, pmsg, data, offset, maxContentSize);
+ done = data.size() < maxContentSize;
+ frame.setBof(false);
+ frame.setEof(true);
+ QPID_LOG(debug, "loaded frame" << frame);
+ if (offset > 0) {
+ frame.setBos(false);
+ }
+ if (!done) {
+ frame.setEos(false);
+ } else return false;
+ return true;
+ }
+ else return false;
+}
+
+void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const
+{
+ if (isContentReleased()) {
+
+ uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
+ bool morecontent = true;
+ for (uint64_t offset = 0; morecontent; offset += maxContentSize)
{
AMQFrame frame((AMQContentBody()));
- string& data = frame.castBody<AMQContentBody>()->getData();
-
- store->loadContent(queue, pmsg, data, offset, maxContentSize);
- done = data.size() < maxContentSize;
- frame.setBof(false);
- frame.setEof(true);
- if (offset > 0) {
- frame.setBos(false);
- }
- if (!done) {
- frame.setEos(false);
- }
- QPID_LOG(debug, "loaded frame for delivery: " << frame);
+ morecontent = getContentFrame(queue, frame, maxContentSize, offset);
out.handle(frame);
}
} else {
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index bed191fb8d..de716e9441 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -131,7 +131,8 @@ public:
void releaseContent(MessageStore* store);
void destroy();
- void sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const;
+ bool getContentFrame(const Queue& queue, framing::AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const;
+ void sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const;
void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize) const;
bool isContentLoaded() const;