summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Message.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
-rw-r--r--cpp/src/qpid/broker/Message.cpp24
1 files changed, 16 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 78d6cd3891..6e3e6a55f7 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -36,7 +36,7 @@ using std::string;
TransferAdapter Message::TRANSFER;
PublishAdapter Message::PUBLISH;
-Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), publisher(0), adapter(0) {}
+Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), publisher(0), adapter(0) {}
std::string Message::getRoutingKey() const
{
@@ -121,12 +121,20 @@ void Message::decodeHeader(framing::Buffer& buffer)
void Message::decodeContent(framing::Buffer& buffer)
{
- //get the data as a string and set that as the content
- //body on a frame then add that frame to the frameset
- AMQFrame frame;
- frame.setBody(AMQContentBody());
- frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
- frames.append(frame);
+ if (buffer.available()) {
+ //get the data as a string and set that as the content
+ //body on a frame then add that frame to the frameset
+ AMQFrame frame;
+ frame.setBody(AMQContentBody());
+ frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
+ frames.append(frame);
+ } else {
+ //adjust header flags
+ MarkLastSegment f;
+ frames.map_if(f, TypeFilter(HEADER_BODY));
+ }
+ //mark content loaded
+ loaded = true;
}
void Message::releaseContent(MessageStore* _store)
@@ -205,5 +213,5 @@ uint64_t Message::contentSize() const
bool Message::isContentLoaded() const
{
- return contentSize() > 0;
+ return loaded;
}