From 233cc9184c758702d8fa4a83d1bf8ec7dc0b3474 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 26 Jul 2007 15:47:23 +0000 Subject: * README: Instructions for openais install. * configure.ac: Enable clustering if suitable openais is present. * src/tests/Cluster.cpp, .h, Cluster_child: Updated for 0-10 * src/qpid/sys/ConcurrentQueue.h: Added waitPop() * src/Makefile.am, src/qpid/sys/ThreadSafeQueue.h, ProducerConsumer.h: Removed unused code, ConcurrentQueue provides same functionality. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@559859 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/sys/ConcurrentQueue.h | 53 +++++++++++++++++++++++++++++++++++--- 1 file changed, 50 insertions(+), 3 deletions(-) (limited to 'cpp/src/qpid/sys/ConcurrentQueue.h') diff --git a/cpp/src/qpid/sys/ConcurrentQueue.h b/cpp/src/qpid/sys/ConcurrentQueue.h index dd7689666b..917afc5704 100644 --- a/cpp/src/qpid/sys/ConcurrentQueue.h +++ b/cpp/src/qpid/sys/ConcurrentQueue.h @@ -22,7 +22,10 @@ * */ -#include "qpid/sys/Mutex.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/ScopedIncrement.h" + +#include #include @@ -33,9 +36,24 @@ namespace sys { * Thread-safe queue that allows threads to push items onto * the queue concurrently with threads popping items off the * queue. + * + * Also allows consuming threads to wait until an item is available. */ template class ConcurrentQueue { public: + ConcurrentQueue() : waiters(0), shutdown(false) {} + + /** Threads in wait() are woken with ShutdownException before + * destroying the queue. + */ + ~ConcurrentQueue() { + Mutex::ScopedLock l(lock); + shutdown = true; + lock.notifyAll(); + while (waiters > 0) + lock.wait(); + } + /** Push a data item onto the back of the queue */ void push(const T& data) { Mutex::ScopedLock l(lock); @@ -47,6 +65,28 @@ template class ConcurrentQueue { */ bool pop(T& data) { Mutex::ScopedLock l(lock); + return popInternal(data); + } + + /** Wait up to deadline for a data item to be available. + *@return true if data was available, false if timed out. + *@throws ShutdownException if the queue is destroyed. + */ + bool waitPop(T& data, Duration timeout) { + Mutex::ScopedLock l(lock); + ScopedIncrement w( + waiters, boost::bind(&ConcurrentQueue::noWaiters, this)); + AbsTime deadline(now(), timeout); + while (queue.empty() && lock.wait(deadline)) + ; + return popInternal(data); + } + + private: + + bool popInternal(T& data) { + if (shutdown) + throw ShutdownException(); if (queue.empty()) return false; else { @@ -56,9 +96,16 @@ template class ConcurrentQueue { } } - private: - Mutex lock; + void noWaiters() { + assert(waiters == 0); + if (shutdown) + lock.notify(); // Notify dtor thread. + } + + Monitor lock; std::deque queue; + size_t waiters; + bool shutdown; }; }} // namespace qpid::sys -- cgit v1.2.1