diff options
| author | Stephen D. Huston <shuston@apache.org> | 2009-02-20 00:04:37 +0000 |
|---|---|---|
| committer | Stephen D. Huston <shuston@apache.org> | 2009-02-20 00:04:37 +0000 |
| commit | d4b433f542ab8a506d7dbc53e685770a96ee7958 (patch) | |
| tree | f0b7d7c643825f279c45f74a8c17f7b0753b53de /cpp/src/qpid/sys/PollableQueue.h | |
| parent | 44ff0feac9edbf7faffdeed5a3df22313e0543c8 (diff) | |
| download | qpid-python-d4b433f542ab8a506d7dbc53e685770a96ee7958.tar.gz | |
Merged win-pollable-condition branch changes 743545:746056 into trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@746061 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/PollableQueue.h')
| -rw-r--r-- | cpp/src/qpid/sys/PollableQueue.h | 46 |
1 files changed, 25 insertions, 21 deletions
diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h index b5ff98c2c7..a23cc5137a 100644 --- a/cpp/src/qpid/sys/PollableQueue.h +++ b/cpp/src/qpid/sys/PollableQueue.h @@ -23,8 +23,6 @@ */ #include "qpid/sys/PollableCondition.h" -#include "qpid/sys/Dispatcher.h" -#include "qpid/sys/DispatchHandle.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Thread.h" #include <boost/function.hpp> @@ -38,9 +36,10 @@ namespace sys { class Poller; /** - * A queue that can be polled by sys::Poller. Any thread can push to - * the queue, on wakeup the poller thread processes all items on the - * queue by passing them to a callback in a batch. + * A queue whose item processing is dispatched by sys::Poller. + * Any thread can push to the queue; items pushed trigger an event the Poller + * recognizes. When a Poller I/O thread dispatches the event, a + * user-specified callback is invoked with all items on the queue. */ template <class T> class PollableQueue { @@ -50,12 +49,21 @@ class PollableQueue { /** * Callback to process a batch of items from the queue. - * @param values to process, any items remaining after call are put back on the queue. + * + * @param values Queue of values to process. Any items remaining + * on return from Callback are put back on the queue. */ typedef boost::function<void (Queue& values)> Callback; - /** When the queue is selected by the poller, values are passed to callback cb. */ - PollableQueue(const Callback& cb, const boost::shared_ptr<sys::Poller>& poller); + /** + * Constructor; sets necessary parameters. + * + * @param cb Callback that will be called to process items on the + * queue. Will be called from a Poller I/O thread. + * @param poller Poller to use for dispatching queue events. + */ + PollableQueue(const Callback& cb, + const boost::shared_ptr<sys::Poller>& poller); ~PollableQueue(); @@ -85,14 +93,12 @@ class PollableQueue { typedef sys::Monitor::ScopedLock ScopedLock; typedef sys::Monitor::ScopedUnlock ScopedUnlock; - void dispatch(sys::DispatchHandle&); + void dispatch(PollableCondition& cond); void process(); mutable sys::Monitor lock; Callback callback; - boost::shared_ptr<sys::Poller> poller; PollableCondition condition; - DispatchHandleRef handle; Queue queue, batch; Thread dispatcher; bool stopped; @@ -100,11 +106,10 @@ class PollableQueue { template <class T> PollableQueue<T>::PollableQueue( const Callback& cb, const boost::shared_ptr<sys::Poller>& p) - : callback(cb), poller(p), - handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0), stopped(true) + : callback(cb), + condition(boost::bind(&PollableQueue<T>::dispatch, this, _1), p), + stopped(true) { - handle.startWatch(poller); - handle.unwatch(); } template <class T> void PollableQueue<T>::start() { @@ -112,11 +117,10 @@ template <class T> void PollableQueue<T>::start() { if (!stopped) return; stopped = false; if (!queue.empty()) condition.set(); - handle.rewatch(); + condition.rearm(); } template <class T> PollableQueue<T>::~PollableQueue() { - handle.stopWatch(); } template <class T> void PollableQueue<T>::push(const T& t) { @@ -125,15 +129,15 @@ template <class T> void PollableQueue<T>::push(const T& t) { queue.push_back(t); } -template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) { +template <class T> void PollableQueue<T>::dispatch(PollableCondition& cond) { ScopedLock l(lock); assert(dispatcher.id() == 0); dispatcher = Thread::current(); process(); dispatcher = Thread(); - if (queue.empty()) condition.clear(); + if (queue.empty()) cond.clear(); if (stopped) lock.notifyAll(); - else h.rewatch(); + else cond.rearm(); } template <class T> void PollableQueue<T>::process() { @@ -159,7 +163,7 @@ template <class T> void PollableQueue<T>::shutdown() { template <class T> void PollableQueue<T>::stop() { ScopedLock l(lock); if (stopped) return; - handle.unwatch(); + condition.disarm(); stopped = true; // Avoid deadlock if stop is called from the dispatch thread while (dispatcher.id() && dispatcher.id() != Thread::current().id()) |
