diff options
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 32 |
1 files changed, 18 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 1b1cec9f85..331bb5e716 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -41,13 +41,6 @@ Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redel Message::~Message() { - if (staged) { - if (store) { - store->destroy(*this); - } else { - QPID_LOG(error, "Message content was staged but no store is set so it can't be destroyed"); - } - } } std::string Message::getRoutingKey() const @@ -178,32 +171,43 @@ void Message::releaseContent(MessageStore* _store) } } +void Message::destroy() +{ + if (staged) { + if (store) { + store->destroy(*this); + } else { + QPID_LOG(error, "Message content was staged but no store is set so it can't be destroyed"); + } + } +} + void Message::sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const { if (isContentReleased()) { //load content from store in chunks of maxContentSize uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); - uint64_t expectedSize(frames.getHeaders()->getContentLength()); intrusive_ptr<const PersistableMessage> pmsg(this); - for (uint64_t offset = 0; offset < expectedSize; offset += maxContentSize) + + bool done = false; + for (uint64_t offset = 0; !done; offset += maxContentSize) { - uint64_t remaining = expectedSize - offset; AMQFrame frame(in_place<AMQContentBody>()); string& data = frame.castBody<AMQContentBody>()->getData(); - store->loadContent(queue, pmsg, data, offset, - remaining > maxContentSize ? maxContentSize : remaining); + store->loadContent(queue, pmsg, data, offset, maxContentSize); + done = data.size() < maxContentSize; frame.setBof(false); frame.setEof(true); if (offset > 0) { frame.setBos(false); } - if (remaining > maxContentSize) { + if (!done) { frame.setEos(false); } + QPID_LOG(debug, "loaded frame for delivery: " << frame); out.handle(frame); } - } else { Count c; frames.map_if(c, TypeFilter<CONTENT_BODY>()); |
