diff options
Diffstat (limited to 'cpp/src/qpid/sys/PollableQueue.h')
| -rw-r--r-- | cpp/src/qpid/sys/PollableQueue.h | 30 |
1 files changed, 19 insertions, 11 deletions
diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h index a594dab86d..2ee29db022 100644 --- a/cpp/src/qpid/sys/PollableQueue.h +++ b/cpp/src/qpid/sys/PollableQueue.h @@ -44,8 +44,13 @@ class Poller; template <class T> class PollableQueue { public: - /** Callback to process a range of items. */ - typedef boost::function<void (const T&)> Callback; + /** + * Callback to process an item from the queue. + * + * @return If true the item is removed from the queue else it + * remains on the queue and the queue is stopped. + */ + typedef boost::function<bool (const T&)> Callback; /** When the queue is selected by the poller, values are passed to callback cb. */ PollableQueue(const Callback& cb, const boost::shared_ptr<sys::Poller>& poller); @@ -66,6 +71,7 @@ class PollableQueue { size_t size() { ScopedLock l(lock); return queue.size(); } bool empty() { ScopedLock l(lock); return queue.empty(); } + private: typedef std::deque<T> Queue; typedef sys::Monitor::ScopedLock ScopedLock; @@ -94,7 +100,7 @@ template <class T> PollableQueue<T>::PollableQueue( template <class T> void PollableQueue<T>::start() { ScopedLock l(lock); - assert(stopped); + if (!stopped) return; stopped = false; if (!queue.empty()) condition.set(); handle.rewatch(); @@ -115,25 +121,27 @@ template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) { assert(dispatcher.id() == 0 || dispatcher.id() == Thread::current().id()); dispatcher = Thread::current(); while (!stopped && !queue.empty()) { - T value = queue.front(); - queue.pop_front(); - { // callback outside the lock to allow concurrent push. + bool ok = false; + { // unlock to allow concurrent push or call to stop() in callback. ScopedUnlock u(lock); - callback(value); + // FIXME aconway 2008-12-02: exception-safe if callback throws. + ok = callback(queue.front()); } + if (ok) queue.pop_front(); + else stopped=true; } + dispatcher = Thread(); if (queue.empty()) condition.clear(); if (stopped) lock.notifyAll(); - dispatcher = Thread(); - if (!stopped) h.rewatch(); + else h.rewatch(); } template <class T> void PollableQueue<T>::stop() { ScopedLock l(lock); - assert(!stopped); + if (stopped) return; handle.unwatch(); stopped = true; - // No deadlock if stop is called from the dispatcher thread + // Avoid deadlock if stop is called from the dispatch thread while (dispatcher.id() && dispatcher.id() != Thread::current().id()) lock.wait(); } |
