summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/ConcurrentQueue.h
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-07-26 15:47:23 +0000
committerAlan Conway <aconway@apache.org>2007-07-26 15:47:23 +0000
commit233cc9184c758702d8fa4a83d1bf8ec7dc0b3474 (patch)
tree6a73a6dfb117218e8fd94c8b447def68e0ab9de0 /cpp/src/qpid/sys/ConcurrentQueue.h
parent89a8765ee2bac1d77be65f1011ffeeb2cbbabe2d (diff)
downloadqpid-python-233cc9184c758702d8fa4a83d1bf8ec7dc0b3474.tar.gz
* README: Instructions for openais install.
* configure.ac: Enable clustering if suitable openais is present. * src/tests/Cluster.cpp, .h, Cluster_child: Updated for 0-10 * src/qpid/sys/ConcurrentQueue.h: Added waitPop() * src/Makefile.am, src/qpid/sys/ThreadSafeQueue.h, ProducerConsumer.h: Removed unused code, ConcurrentQueue provides same functionality. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@559859 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/ConcurrentQueue.h')
-rw-r--r--cpp/src/qpid/sys/ConcurrentQueue.h53
1 files changed, 50 insertions, 3 deletions
diff --git a/cpp/src/qpid/sys/ConcurrentQueue.h b/cpp/src/qpid/sys/ConcurrentQueue.h
index dd7689666b..917afc5704 100644
--- a/cpp/src/qpid/sys/ConcurrentQueue.h
+++ b/cpp/src/qpid/sys/ConcurrentQueue.h
@@ -22,7 +22,10 @@
*
*/
-#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/ScopedIncrement.h"
+
+#include <boost/bind.hpp>
#include <deque>
@@ -33,9 +36,24 @@ namespace sys {
* Thread-safe queue that allows threads to push items onto
* the queue concurrently with threads popping items off the
* queue.
+ *
+ * Also allows consuming threads to wait until an item is available.
*/
template <class T> class ConcurrentQueue {
public:
+ ConcurrentQueue() : waiters(0), shutdown(false) {}
+
+ /** Threads in wait() are woken with ShutdownException before
+ * destroying the queue.
+ */
+ ~ConcurrentQueue() {
+ Mutex::ScopedLock l(lock);
+ shutdown = true;
+ lock.notifyAll();
+ while (waiters > 0)
+ lock.wait();
+ }
+
/** Push a data item onto the back of the queue */
void push(const T& data) {
Mutex::ScopedLock l(lock);
@@ -47,6 +65,28 @@ template <class T> class ConcurrentQueue {
*/
bool pop(T& data) {
Mutex::ScopedLock l(lock);
+ return popInternal(data);
+ }
+
+ /** Wait up to deadline for a data item to be available.
+ *@return true if data was available, false if timed out.
+ *@throws ShutdownException if the queue is destroyed.
+ */
+ bool waitPop(T& data, Duration timeout) {
+ Mutex::ScopedLock l(lock);
+ ScopedIncrement<size_t> w(
+ waiters, boost::bind(&ConcurrentQueue::noWaiters, this));
+ AbsTime deadline(now(), timeout);
+ while (queue.empty() && lock.wait(deadline))
+ ;
+ return popInternal(data);
+ }
+
+ private:
+
+ bool popInternal(T& data) {
+ if (shutdown)
+ throw ShutdownException();
if (queue.empty())
return false;
else {
@@ -56,9 +96,16 @@ template <class T> class ConcurrentQueue {
}
}
- private:
- Mutex lock;
+ void noWaiters() {
+ assert(waiters == 0);
+ if (shutdown)
+ lock.notify(); // Notify dtor thread.
+ }
+
+ Monitor lock;
std::deque<T> queue;
+ size_t waiters;
+ bool shutdown;
};
}} // namespace qpid::sys