diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2009-12-26 12:42:57 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2009-12-26 12:42:57 +0000 |
| commit | 248f1fe188fe2307b9dcf2c87a83b653eaa1920c (patch) | |
| tree | d5d0959a70218946ff72e107a6c106e32479a398 /cpp/src/qpid/broker/PersistableMessage.cpp | |
| parent | 3c83a0e3ec7cf4dc23e83a340b25f5fc1676f937 (diff) | |
| download | qpid-python-248f1fe188fe2307b9dcf2c87a83b653eaa1920c.tar.gz | |
synchronized with trunk except for ruby dir
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid.rnr@893970 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/PersistableMessage.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/PersistableMessage.cpp | 139 |
1 files changed, 137 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp index 3bf390faf3..303a0501f4 100644 --- a/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/cpp/src/qpid/broker/PersistableMessage.cpp @@ -20,12 +20,25 @@ */ -#include "PersistableMessage.h" -#include "MessageStore.h" +#include "qpid/broker/PersistableMessage.h" +#include "qpid/broker/MessageStore.h" #include <iostream> using namespace qpid::broker; +namespace qpid { +namespace broker { + +class MessageStore; + +PersistableMessage::~PersistableMessage() {} + +PersistableMessage::PersistableMessage() : + asyncEnqueueCounter(0), + asyncDequeueCounter(0), + store(0) +{} + void PersistableMessage::flush() { syncList copy; @@ -45,4 +58,126 @@ void PersistableMessage::flush() } } +void PersistableMessage::setContentReleased() +{ + contentReleaseState.released = true; +} + +bool PersistableMessage::isContentReleased() const +{ + return contentReleaseState.released; +} + +bool PersistableMessage::isEnqueueComplete() { + sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock); + return asyncEnqueueCounter == 0; +} + +void PersistableMessage::enqueueComplete() { + bool notify = false; + { + sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock); + if (asyncEnqueueCounter > 0) { + if (--asyncEnqueueCounter == 0) { + notify = true; + } + } + } + if (notify) { + allEnqueuesComplete(); + sys::ScopedLock<sys::Mutex> l(storeLock); + if (store) { + for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { + PersistableQueue::shared_ptr q(i->lock()); + if (q) q->notifyDurableIOComplete(); + } + } + } +} + +bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){ + if (store && (queue->getPersistenceId()!=0)) { + for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { + PersistableQueue::shared_ptr q(i->lock()); + if (q && q->getPersistenceId() == queue->getPersistenceId()) return true; + } + } + return false; +} + + +void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) { + if (_store){ + sys::ScopedLock<sys::Mutex> l(storeLock); + store = _store; + boost::weak_ptr<PersistableQueue> q(queue); + synclist.push_back(q); + } +} + +void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { + addToSyncList(queue, _store); + enqueueAsync(); +} + +void PersistableMessage::enqueueAsync() { + sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock); + asyncEnqueueCounter++; +} + +bool PersistableMessage::isDequeueComplete() { + sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); + return asyncDequeueCounter == 0; +} +void PersistableMessage::dequeueComplete() { + bool notify = false; + { + sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); + if (asyncDequeueCounter > 0) { + if (--asyncDequeueCounter == 0) { + notify = true; + } + } + } + if (notify) allDequeuesComplete(); +} + +void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { + if (_store){ + sys::ScopedLock<sys::Mutex> l(storeLock); + store = _store; + boost::weak_ptr<PersistableQueue> q(queue); + synclist.push_back(q); + } + dequeueAsync(); +} + +void PersistableMessage::dequeueAsync() { + sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); + asyncDequeueCounter++; +} + +PersistableMessage::ContentReleaseState::ContentReleaseState() : blocked(false), requested(false), released(false) {} + +void PersistableMessage::setStore(MessageStore* s) +{ + store = s; +} + +void PersistableMessage::requestContentRelease() +{ + contentReleaseState.requested = true; +} +void PersistableMessage::blockContentRelease() +{ + contentReleaseState.blocked = true; +} +bool PersistableMessage::checkContentReleasable() +{ + return contentReleaseState.requested && !contentReleaseState.blocked; +} + +}} + + |
