From 248f1fe188fe2307b9dcf2c87a83b653eaa1920c Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Sat, 26 Dec 2009 12:42:57 +0000 Subject: synchronized with trunk except for ruby dir git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid.rnr@893970 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/PersistableMessage.cpp | 139 ++++++++++++++++++++++++++++- 1 file changed, 137 insertions(+), 2 deletions(-) (limited to 'cpp/src/qpid/broker/PersistableMessage.cpp') diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp index 3bf390faf3..303a0501f4 100644 --- a/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/cpp/src/qpid/broker/PersistableMessage.cpp @@ -20,12 +20,25 @@ */ -#include "PersistableMessage.h" -#include "MessageStore.h" +#include "qpid/broker/PersistableMessage.h" +#include "qpid/broker/MessageStore.h" #include using namespace qpid::broker; +namespace qpid { +namespace broker { + +class MessageStore; + +PersistableMessage::~PersistableMessage() {} + +PersistableMessage::PersistableMessage() : + asyncEnqueueCounter(0), + asyncDequeueCounter(0), + store(0) +{} + void PersistableMessage::flush() { syncList copy; @@ -45,4 +58,126 @@ void PersistableMessage::flush() } } +void PersistableMessage::setContentReleased() +{ + contentReleaseState.released = true; +} + +bool PersistableMessage::isContentReleased() const +{ + return contentReleaseState.released; +} + +bool PersistableMessage::isEnqueueComplete() { + sys::ScopedLock l(asyncEnqueueLock); + return asyncEnqueueCounter == 0; +} + +void PersistableMessage::enqueueComplete() { + bool notify = false; + { + sys::ScopedLock l(asyncEnqueueLock); + if (asyncEnqueueCounter > 0) { + if (--asyncEnqueueCounter == 0) { + notify = true; + } + } + } + if (notify) { + allEnqueuesComplete(); + sys::ScopedLock l(storeLock); + if (store) { + for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { + PersistableQueue::shared_ptr q(i->lock()); + if (q) q->notifyDurableIOComplete(); + } + } + } +} + +bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){ + if (store && (queue->getPersistenceId()!=0)) { + for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { + PersistableQueue::shared_ptr q(i->lock()); + if (q && q->getPersistenceId() == queue->getPersistenceId()) return true; + } + } + return false; +} + + +void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) { + if (_store){ + sys::ScopedLock l(storeLock); + store = _store; + boost::weak_ptr q(queue); + synclist.push_back(q); + } +} + +void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { + addToSyncList(queue, _store); + enqueueAsync(); +} + +void PersistableMessage::enqueueAsync() { + sys::ScopedLock l(asyncEnqueueLock); + asyncEnqueueCounter++; +} + +bool PersistableMessage::isDequeueComplete() { + sys::ScopedLock l(asyncDequeueLock); + return asyncDequeueCounter == 0; +} +void PersistableMessage::dequeueComplete() { + bool notify = false; + { + sys::ScopedLock l(asyncDequeueLock); + if (asyncDequeueCounter > 0) { + if (--asyncDequeueCounter == 0) { + notify = true; + } + } + } + if (notify) allDequeuesComplete(); +} + +void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { + if (_store){ + sys::ScopedLock l(storeLock); + store = _store; + boost::weak_ptr q(queue); + synclist.push_back(q); + } + dequeueAsync(); +} + +void PersistableMessage::dequeueAsync() { + sys::ScopedLock l(asyncDequeueLock); + asyncDequeueCounter++; +} + +PersistableMessage::ContentReleaseState::ContentReleaseState() : blocked(false), requested(false), released(false) {} + +void PersistableMessage::setStore(MessageStore* s) +{ + store = s; +} + +void PersistableMessage::requestContentRelease() +{ + contentReleaseState.requested = true; +} +void PersistableMessage::blockContentRelease() +{ + contentReleaseState.blocked = true; +} +bool PersistableMessage::checkContentReleasable() +{ + return contentReleaseState.requested && !contentReleaseState.blocked; +} + +}} + + -- cgit v1.2.1