diff options
| author | Alan Conway <aconway@apache.org> | 2007-07-26 15:47:23 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-07-26 15:47:23 +0000 |
| commit | 233cc9184c758702d8fa4a83d1bf8ec7dc0b3474 (patch) | |
| tree | 6a73a6dfb117218e8fd94c8b447def68e0ab9de0 /cpp/src/qpid/sys/ConcurrentQueue.h | |
| parent | 89a8765ee2bac1d77be65f1011ffeeb2cbbabe2d (diff) | |
| download | qpid-python-233cc9184c758702d8fa4a83d1bf8ec7dc0b3474.tar.gz | |
* README: Instructions for openais install.
* configure.ac: Enable clustering if suitable openais is present.
* src/tests/Cluster.cpp, .h, Cluster_child: Updated for 0-10
* src/qpid/sys/ConcurrentQueue.h: Added waitPop()
* src/Makefile.am, src/qpid/sys/ThreadSafeQueue.h, ProducerConsumer.h:
Removed unused code, ConcurrentQueue provides same functionality.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@559859 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/ConcurrentQueue.h')
| -rw-r--r-- | cpp/src/qpid/sys/ConcurrentQueue.h | 53 |
1 files changed, 50 insertions, 3 deletions
diff --git a/cpp/src/qpid/sys/ConcurrentQueue.h b/cpp/src/qpid/sys/ConcurrentQueue.h index dd7689666b..917afc5704 100644 --- a/cpp/src/qpid/sys/ConcurrentQueue.h +++ b/cpp/src/qpid/sys/ConcurrentQueue.h @@ -22,7 +22,10 @@ * */ -#include "qpid/sys/Mutex.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/ScopedIncrement.h" + +#include <boost/bind.hpp> #include <deque> @@ -33,9 +36,24 @@ namespace sys { * Thread-safe queue that allows threads to push items onto * the queue concurrently with threads popping items off the * queue. + * + * Also allows consuming threads to wait until an item is available. */ template <class T> class ConcurrentQueue { public: + ConcurrentQueue() : waiters(0), shutdown(false) {} + + /** Threads in wait() are woken with ShutdownException before + * destroying the queue. + */ + ~ConcurrentQueue() { + Mutex::ScopedLock l(lock); + shutdown = true; + lock.notifyAll(); + while (waiters > 0) + lock.wait(); + } + /** Push a data item onto the back of the queue */ void push(const T& data) { Mutex::ScopedLock l(lock); @@ -47,6 +65,28 @@ template <class T> class ConcurrentQueue { */ bool pop(T& data) { Mutex::ScopedLock l(lock); + return popInternal(data); + } + + /** Wait up to deadline for a data item to be available. + *@return true if data was available, false if timed out. + *@throws ShutdownException if the queue is destroyed. + */ + bool waitPop(T& data, Duration timeout) { + Mutex::ScopedLock l(lock); + ScopedIncrement<size_t> w( + waiters, boost::bind(&ConcurrentQueue::noWaiters, this)); + AbsTime deadline(now(), timeout); + while (queue.empty() && lock.wait(deadline)) + ; + return popInternal(data); + } + + private: + + bool popInternal(T& data) { + if (shutdown) + throw ShutdownException(); if (queue.empty()) return false; else { @@ -56,9 +96,16 @@ template <class T> class ConcurrentQueue { } } - private: - Mutex lock; + void noWaiters() { + assert(waiters == 0); + if (shutdown) + lock.notify(); // Notify dtor thread. + } + + Monitor lock; std::deque<T> queue; + size_t waiters; + bool shutdown; }; }} // namespace qpid::sys |
