diff options
| author | Carl C. Trieloff <cctrieloff@apache.org> | 2009-01-30 18:59:24 +0000 |
|---|---|---|
| committer | Carl C. Trieloff <cctrieloff@apache.org> | 2009-01-30 18:59:24 +0000 |
| commit | bcfa0a86baebb2598c0be270121d98a0f31b19c6 (patch) | |
| tree | 726ac7a2e448a6a2d60752b63fd0d8c9ca7c9b31 /cpp/src/qpid/broker | |
| parent | a74a06df1f2dddc5c58e33b7a7290c712510e941 (diff) | |
| download | qpid-python-bcfa0a86baebb2598c0be270121d98a0f31b19c6.tar.gz | |
Correction for: start a broker in cluster, send messages that are flow to disk, then join a broker to the cluster. Then consume from the new node. Cotent released messages where loosing content. This patch corrects that.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@739378 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 43 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Message.h | 3 |
2 files changed, 28 insertions, 18 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 6bcee99f49..e5a0c3e9e1 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -197,30 +197,39 @@ void Message::destroy() } } -void Message::sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const +bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const { if (isContentReleased()) { - //load content from store in chunks of maxContentSize - uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); intrusive_ptr<const PersistableMessage> pmsg(this); bool done = false; - for (uint64_t offset = 0; !done; offset += maxContentSize) + string& data = frame.castBody<AMQContentBody>()->getData(); + store->loadContent(queue, pmsg, data, offset, maxContentSize); + done = data.size() < maxContentSize; + frame.setBof(false); + frame.setEof(true); + QPID_LOG(debug, "loaded frame" << frame); + if (offset > 0) { + frame.setBos(false); + } + if (!done) { + frame.setEos(false); + } else return false; + return true; + } + else return false; +} + +void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const +{ + if (isContentReleased()) { + + uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); + bool morecontent = true; + for (uint64_t offset = 0; morecontent; offset += maxContentSize) { AMQFrame frame((AMQContentBody())); - string& data = frame.castBody<AMQContentBody>()->getData(); - - store->loadContent(queue, pmsg, data, offset, maxContentSize); - done = data.size() < maxContentSize; - frame.setBof(false); - frame.setEof(true); - if (offset > 0) { - frame.setBos(false); - } - if (!done) { - frame.setEos(false); - } - QPID_LOG(debug, "loaded frame for delivery: " << frame); + morecontent = getContentFrame(queue, frame, maxContentSize, offset); out.handle(frame); } } else { diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index bed191fb8d..de716e9441 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -131,7 +131,8 @@ public: void releaseContent(MessageStore* store); void destroy(); - void sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const; + bool getContentFrame(const Queue& queue, framing::AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const; + void sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const; void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize) const; bool isContentLoaded() const; |
