summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/BlockingQueue.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/BlockingQueue.h')
-rw-r--r--cpp/src/qpid/sys/BlockingQueue.h97
1 files changed, 36 insertions, 61 deletions
diff --git a/cpp/src/qpid/sys/BlockingQueue.h b/cpp/src/qpid/sys/BlockingQueue.h
index 86020fad81..9bb215ff7f 100644
--- a/cpp/src/qpid/sys/BlockingQueue.h
+++ b/cpp/src/qpid/sys/BlockingQueue.h
@@ -35,47 +35,45 @@ namespace sys {
template <class T>
class BlockingQueue
{
- mutable sys::Waitable lock;
+ mutable sys::Waitable waitable;
std::queue<T> queue;
- bool closed;
public:
- BlockingQueue() : closed(false) {}
+ BlockingQueue() {}
~BlockingQueue() { close(); }
- /** Block until there is a value to pop */
- T pop()
- {
- Waitable::ScopedLock l(lock);
- if (!queueWait()) throw ClosedException();
- return popInternal();
- }
-
- /** Non-blocking pop. If there is a value set outValue and return
- * true, else return false;
+ /** Pop from the queue, block up to timeout if empty.
+ *@param result Set to value popped from queue.
+ *@param timeout Defaults to infinite.
+ *@return true if result was set, false if queue empty after timeout.
*/
- bool tryPop(T& outValue) {
- Waitable::ScopedLock l(lock);
+ bool pop(T& result, Duration timeout=TIME_INFINITE) {
+ Mutex::ScopedLock l(waitable);
+ {
+ Waitable::ScopedWait w(waitable);
+ AbsTime deadline(now(),timeout);
+ while (queue.empty()) waitable.wait(deadline);
+ }
if (queue.empty()) return false;
- outValue = popInternal();
+ result = queue.front();
+ queue.pop();
+ if (!queue.empty())
+ waitable.notify(); // Notify another waiter.
return true;
}
- /** Non-blocking pop. If there is a value return it, else return
- * valueIfEmpty.
- */
- T tryPop(const T& valueIfEmpty=T()) {
- T result=valueIfEmpty;
- tryPop(result);
+ T pop() {
+ T result;
+ bool ok = pop(result);
+ assert(ok); (void) ok; // Infinite wait.
return result;
}
-
+
/** Push a value onto the queue */
- void push(const T& t)
- {
- Waitable::ScopedLock l(lock);
+ void push(const T& t) {
+ Mutex::ScopedLock l(waitable);
queue.push(t);
- queueNotify(0);
+ waitable.notify(); // Notify a waiter.
}
/**
@@ -84,56 +82,33 @@ public:
*/
void close(const ExceptionHolder& ex=ExceptionHolder(new ClosedException()))
{
- Waitable::ScopedLock l(lock);
- if (!closed) {
- lock.setException(ex);
- closed = true;
- lock.notifyAll();
- lock.waitWaiters(); // Ensure no threads are still waiting.
+ Mutex::ScopedLock l(waitable);
+ if (!waitable.hasException()) {
+ waitable.setException(ex);
+ waitable.notifyAll();
+ waitable.waitWaiters(); // Ensure no threads are still waiting.
}
}
/** Open a closed queue. */
void open() {
- Waitable::ScopedLock l(lock);
- closed=false;
+ Mutex::ScopedLock l(waitable);
+ waitable.resetException();
}
bool isClosed() const {
- Waitable::ScopedLock l(lock);
- return closed;
+ Mutex::ScopedLock l(waitable);
+ return waitable.hasException();
}
bool empty() const {
- Waitable::ScopedLock l(lock);
+ Mutex::ScopedLock l(waitable);
return queue.empty();
}
size_t size() const {
- Waitable::ScopedLock l(lock);
+ Mutex::ScopedLock l(waitable);
return queue.size();
}
-
- private:
-
- void queueNotify(size_t ignore) {
- if (!queue.empty() && lock.hasWaiters()>ignore)
- lock.notify(); // Notify another waiter.
- }
-
- bool queueWait() {
- Waitable::ScopedWait w(lock);
- while (!closed && queue.empty())
- lock.wait();
- return !queue.empty();
- }
-
- T popInternal() {
- T t=queue.front();
- queue.pop();
- queueNotify(1);
- return t;
- }
-
};
}}