From 82e58be2ca433d01ca0210d9d05671663d61d6b4 Mon Sep 17 00:00:00 2001 From: "Carl C. Trieloff" Date: Tue, 6 Nov 2007 19:43:54 +0000 Subject: - clean up between base & subclasses git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@592530 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Message.cpp | 7 +++++-- qpid/cpp/src/qpid/broker/Message.h | 2 -- qpid/cpp/src/qpid/broker/MessageBuilder.cpp | 1 - qpid/cpp/src/qpid/broker/PersistableMessage.h | 9 ++++++++- 4 files changed, 13 insertions(+), 6 deletions(-) (limited to 'qpid/cpp') diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 5d572283ce..78d6cd3891 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -36,7 +36,7 @@ using std::string; TransferAdapter Message::TRANSFER; PublishAdapter Message::PUBLISH; -Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), publisher(0), store(0), adapter(0) {} +Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), publisher(0), adapter(0) {} std::string Message::getRoutingKey() const { @@ -131,12 +131,15 @@ void Message::decodeContent(framing::Buffer& buffer) void Message::releaseContent(MessageStore* _store) { - store = _store; + if (!store){ + store = _store; + } if (!getPersistenceId()) { store->stage(*this); } //remove any content frames from the frameset frames.remove(TypeFilter(CONTENT_BODY)); + setContentReleased(); } void Message::sendContent(framing::FrameHandler& out, uint16_t maxFrameSize) const diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index ac8e51a030..f9a3596a98 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -125,14 +125,12 @@ public: mutable uint64_t persistenceId; bool redelivered; ConnectionToken* publisher; - MessageStore* store; mutable MessageAdapter* adapter; static TransferAdapter TRANSFER; static PublishAdapter PUBLISH; MessageAdapter& getAdapter() const; - bool isContentReleased() const { return store; } }; }} diff --git a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp index 84b3cbb2ac..e65db391b5 100644 --- a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp @@ -54,7 +54,6 @@ void MessageBuilder::handle(AMQFrame& frame) message->getFrames().append(frame); //have we reached the staging limit? if so stage message and release content if (state == CONTENT && stagingThreshold && message->getFrames().getContentSize() >= stagingThreshold) { - store->stage(*message); message->releaseContent(store); staging = true; } diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.h b/qpid/cpp/src/qpid/broker/PersistableMessage.h index d8bcc70a30..a63ca2a52b 100644 --- a/qpid/cpp/src/qpid/broker/PersistableMessage.h +++ b/qpid/cpp/src/qpid/broker/PersistableMessage.h @@ -66,6 +66,10 @@ protected: typedef std::list syncList; syncList synclist; MessageStore* store; + bool contentReleased; + + inline void setContentReleased() {contentReleased = true; } + public: typedef boost::shared_ptr shared_ptr; @@ -79,11 +83,14 @@ public: PersistableMessage(): asyncEnqueueCounter(0), asyncDequeueCounter(0), - store(0) + store(0), + contentReleased(false) {} void flush(); + inline bool isContentReleased()const {return contentReleased; } + inline void waitForEnqueueComplete() { sys::ScopedLock l(asyncEnqueueLock); while (asyncEnqueueCounter > 0) { -- cgit v1.2.1