diff options
Diffstat (limited to 'cpp/src/qpid/sys/BlockingQueue.h')
-rw-r--r-- | cpp/src/qpid/sys/BlockingQueue.h | 97 |
1 files changed, 36 insertions, 61 deletions
diff --git a/cpp/src/qpid/sys/BlockingQueue.h b/cpp/src/qpid/sys/BlockingQueue.h index 86020fad81..9bb215ff7f 100644 --- a/cpp/src/qpid/sys/BlockingQueue.h +++ b/cpp/src/qpid/sys/BlockingQueue.h @@ -35,47 +35,45 @@ namespace sys { template <class T> class BlockingQueue { - mutable sys::Waitable lock; + mutable sys::Waitable waitable; std::queue<T> queue; - bool closed; public: - BlockingQueue() : closed(false) {} + BlockingQueue() {} ~BlockingQueue() { close(); } - /** Block until there is a value to pop */ - T pop() - { - Waitable::ScopedLock l(lock); - if (!queueWait()) throw ClosedException(); - return popInternal(); - } - - /** Non-blocking pop. If there is a value set outValue and return - * true, else return false; + /** Pop from the queue, block up to timeout if empty. + *@param result Set to value popped from queue. + *@param timeout Defaults to infinite. + *@return true if result was set, false if queue empty after timeout. */ - bool tryPop(T& outValue) { - Waitable::ScopedLock l(lock); + bool pop(T& result, Duration timeout=TIME_INFINITE) { + Mutex::ScopedLock l(waitable); + { + Waitable::ScopedWait w(waitable); + AbsTime deadline(now(),timeout); + while (queue.empty()) waitable.wait(deadline); + } if (queue.empty()) return false; - outValue = popInternal(); + result = queue.front(); + queue.pop(); + if (!queue.empty()) + waitable.notify(); // Notify another waiter. return true; } - /** Non-blocking pop. If there is a value return it, else return - * valueIfEmpty. - */ - T tryPop(const T& valueIfEmpty=T()) { - T result=valueIfEmpty; - tryPop(result); + T pop() { + T result; + bool ok = pop(result); + assert(ok); (void) ok; // Infinite wait. return result; } - + /** Push a value onto the queue */ - void push(const T& t) - { - Waitable::ScopedLock l(lock); + void push(const T& t) { + Mutex::ScopedLock l(waitable); queue.push(t); - queueNotify(0); + waitable.notify(); // Notify a waiter. } /** @@ -84,56 +82,33 @@ public: */ void close(const ExceptionHolder& ex=ExceptionHolder(new ClosedException())) { - Waitable::ScopedLock l(lock); - if (!closed) { - lock.setException(ex); - closed = true; - lock.notifyAll(); - lock.waitWaiters(); // Ensure no threads are still waiting. + Mutex::ScopedLock l(waitable); + if (!waitable.hasException()) { + waitable.setException(ex); + waitable.notifyAll(); + waitable.waitWaiters(); // Ensure no threads are still waiting. } } /** Open a closed queue. */ void open() { - Waitable::ScopedLock l(lock); - closed=false; + Mutex::ScopedLock l(waitable); + waitable.resetException(); } bool isClosed() const { - Waitable::ScopedLock l(lock); - return closed; + Mutex::ScopedLock l(waitable); + return waitable.hasException(); } bool empty() const { - Waitable::ScopedLock l(lock); + Mutex::ScopedLock l(waitable); return queue.empty(); } size_t size() const { - Waitable::ScopedLock l(lock); + Mutex::ScopedLock l(waitable); return queue.size(); } - - private: - - void queueNotify(size_t ignore) { - if (!queue.empty() && lock.hasWaiters()>ignore) - lock.notify(); // Notify another waiter. - } - - bool queueWait() { - Waitable::ScopedWait w(lock); - while (!closed && queue.empty()) - lock.wait(); - return !queue.empty(); - } - - T popInternal() { - T t=queue.front(); - queue.pop(); - queueNotify(1); - return t; - } - }; }} |