summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/ConcurrentQueue.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/ConcurrentQueue.h')
-rw-r--r--cpp/src/qpid/sys/ConcurrentQueue.h53
1 files changed, 50 insertions, 3 deletions
diff --git a/cpp/src/qpid/sys/ConcurrentQueue.h b/cpp/src/qpid/sys/ConcurrentQueue.h
index dd7689666b..917afc5704 100644
--- a/cpp/src/qpid/sys/ConcurrentQueue.h
+++ b/cpp/src/qpid/sys/ConcurrentQueue.h
@@ -22,7 +22,10 @@
*
*/
-#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/ScopedIncrement.h"
+
+#include <boost/bind.hpp>
#include <deque>
@@ -33,9 +36,24 @@ namespace sys {
* Thread-safe queue that allows threads to push items onto
* the queue concurrently with threads popping items off the
* queue.
+ *
+ * Also allows consuming threads to wait until an item is available.
*/
template <class T> class ConcurrentQueue {
public:
+ ConcurrentQueue() : waiters(0), shutdown(false) {}
+
+ /** Threads in wait() are woken with ShutdownException before
+ * destroying the queue.
+ */
+ ~ConcurrentQueue() {
+ Mutex::ScopedLock l(lock);
+ shutdown = true;
+ lock.notifyAll();
+ while (waiters > 0)
+ lock.wait();
+ }
+
/** Push a data item onto the back of the queue */
void push(const T& data) {
Mutex::ScopedLock l(lock);
@@ -47,6 +65,28 @@ template <class T> class ConcurrentQueue {
*/
bool pop(T& data) {
Mutex::ScopedLock l(lock);
+ return popInternal(data);
+ }
+
+ /** Wait up to deadline for a data item to be available.
+ *@return true if data was available, false if timed out.
+ *@throws ShutdownException if the queue is destroyed.
+ */
+ bool waitPop(T& data, Duration timeout) {
+ Mutex::ScopedLock l(lock);
+ ScopedIncrement<size_t> w(
+ waiters, boost::bind(&ConcurrentQueue::noWaiters, this));
+ AbsTime deadline(now(), timeout);
+ while (queue.empty() && lock.wait(deadline))
+ ;
+ return popInternal(data);
+ }
+
+ private:
+
+ bool popInternal(T& data) {
+ if (shutdown)
+ throw ShutdownException();
if (queue.empty())
return false;
else {
@@ -56,9 +96,16 @@ template <class T> class ConcurrentQueue {
}
}
- private:
- Mutex lock;
+ void noWaiters() {
+ assert(waiters == 0);
+ if (shutdown)
+ lock.notify(); // Notify dtor thread.
+ }
+
+ Monitor lock;
std::deque<T> queue;
+ size_t waiters;
+ bool shutdown;
};
}} // namespace qpid::sys