diff options
Diffstat (limited to 'cpp/src/qpid/broker/PersistableMessage.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/PersistableMessage.cpp | 106 |
1 files changed, 106 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp index 3bf390faf3..b67a669f1d 100644 --- a/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/cpp/src/qpid/broker/PersistableMessage.cpp @@ -26,6 +26,20 @@ using namespace qpid::broker; +namespace qpid { +namespace broker { + +class MessageStore; + +PersistableMessage::~PersistableMessage() {} + +PersistableMessage::PersistableMessage() : + asyncEnqueueCounter(0), + asyncDequeueCounter(0), + contentReleased(false), + store(0) +{} + void PersistableMessage::flush() { syncList copy; @@ -45,4 +59,96 @@ void PersistableMessage::flush() } } +void PersistableMessage::setContentReleased() {contentReleased = true; } + +bool PersistableMessage::isContentReleased()const { return contentReleased; } + +void PersistableMessage::waitForEnqueueComplete() { + sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock); + while (asyncEnqueueCounter > 0) { + asyncEnqueueLock.wait(); + } +} + +bool PersistableMessage::isEnqueueComplete() { + sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock); + return asyncEnqueueCounter == 0; +} + +void PersistableMessage::enqueueComplete() { + bool notify = false; + { + sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock); + if (asyncEnqueueCounter > 0) { + if (--asyncEnqueueCounter == 0) { + asyncEnqueueLock.notify(); + notify = true; + } + } + } + if (notify) { + sys::ScopedLock<sys::Mutex> l(storeLock); + if (store) { + for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { + PersistableQueue::shared_ptr q(i->lock()); + if (q) q->notifyDurableIOComplete(); + } + } + } +} + +void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { + if (_store){ + sys::ScopedLock<sys::Mutex> l(storeLock); + store = _store; + boost::weak_ptr<PersistableQueue> q(queue); + synclist.push_back(q); + } + enqueueAsync(); +} + +void PersistableMessage::enqueueAsync() { + sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock); + asyncEnqueueCounter++; +} + +bool PersistableMessage::isDequeueComplete() { + sys::ScopedLock<sys::Monitor> l(asyncDequeueLock); + return asyncDequeueCounter == 0; +} +void PersistableMessage::dequeueComplete() { + + sys::ScopedLock<sys::Monitor> l(asyncDequeueLock); + if (asyncDequeueCounter > 0) { + if (--asyncDequeueCounter == 0) { + asyncDequeueLock.notify(); + } + } +} + +void PersistableMessage::waitForDequeueComplete() { + sys::ScopedLock<sys::Monitor> l(asyncDequeueLock); + while (asyncDequeueCounter > 0) { + asyncDequeueLock.wait(); + } +} + +void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { + if (_store){ + sys::ScopedLock<sys::Mutex> l(storeLock); + store = _store; + boost::weak_ptr<PersistableQueue> q(queue); + synclist.push_back(q); + } + dequeueAsync(); +} + +void PersistableMessage::dequeueAsync() { + sys::ScopedLock<sys::Monitor> l(asyncDequeueLock); + asyncDequeueCounter++; +} + +}} + + |
