summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/PollableQueue.h
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2009-02-20 00:04:37 +0000
committerStephen D. Huston <shuston@apache.org>2009-02-20 00:04:37 +0000
commitd4b433f542ab8a506d7dbc53e685770a96ee7958 (patch)
treef0b7d7c643825f279c45f74a8c17f7b0753b53de /cpp/src/qpid/sys/PollableQueue.h
parent44ff0feac9edbf7faffdeed5a3df22313e0543c8 (diff)
downloadqpid-python-d4b433f542ab8a506d7dbc53e685770a96ee7958.tar.gz
Merged win-pollable-condition branch changes 743545:746056 into trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@746061 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/PollableQueue.h')
-rw-r--r--cpp/src/qpid/sys/PollableQueue.h46
1 files changed, 25 insertions, 21 deletions
diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h
index b5ff98c2c7..a23cc5137a 100644
--- a/cpp/src/qpid/sys/PollableQueue.h
+++ b/cpp/src/qpid/sys/PollableQueue.h
@@ -23,8 +23,6 @@
*/
#include "qpid/sys/PollableCondition.h"
-#include "qpid/sys/Dispatcher.h"
-#include "qpid/sys/DispatchHandle.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Thread.h"
#include <boost/function.hpp>
@@ -38,9 +36,10 @@ namespace sys {
class Poller;
/**
- * A queue that can be polled by sys::Poller. Any thread can push to
- * the queue, on wakeup the poller thread processes all items on the
- * queue by passing them to a callback in a batch.
+ * A queue whose item processing is dispatched by sys::Poller.
+ * Any thread can push to the queue; items pushed trigger an event the Poller
+ * recognizes. When a Poller I/O thread dispatches the event, a
+ * user-specified callback is invoked with all items on the queue.
*/
template <class T>
class PollableQueue {
@@ -50,12 +49,21 @@ class PollableQueue {
/**
* Callback to process a batch of items from the queue.
- * @param values to process, any items remaining after call are put back on the queue.
+ *
+ * @param values Queue of values to process. Any items remaining
+ * on return from Callback are put back on the queue.
*/
typedef boost::function<void (Queue& values)> Callback;
- /** When the queue is selected by the poller, values are passed to callback cb. */
- PollableQueue(const Callback& cb, const boost::shared_ptr<sys::Poller>& poller);
+ /**
+ * Constructor; sets necessary parameters.
+ *
+ * @param cb Callback that will be called to process items on the
+ * queue. Will be called from a Poller I/O thread.
+ * @param poller Poller to use for dispatching queue events.
+ */
+ PollableQueue(const Callback& cb,
+ const boost::shared_ptr<sys::Poller>& poller);
~PollableQueue();
@@ -85,14 +93,12 @@ class PollableQueue {
typedef sys::Monitor::ScopedLock ScopedLock;
typedef sys::Monitor::ScopedUnlock ScopedUnlock;
- void dispatch(sys::DispatchHandle&);
+ void dispatch(PollableCondition& cond);
void process();
mutable sys::Monitor lock;
Callback callback;
- boost::shared_ptr<sys::Poller> poller;
PollableCondition condition;
- DispatchHandleRef handle;
Queue queue, batch;
Thread dispatcher;
bool stopped;
@@ -100,11 +106,10 @@ class PollableQueue {
template <class T> PollableQueue<T>::PollableQueue(
const Callback& cb, const boost::shared_ptr<sys::Poller>& p)
- : callback(cb), poller(p),
- handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0), stopped(true)
+ : callback(cb),
+ condition(boost::bind(&PollableQueue<T>::dispatch, this, _1), p),
+ stopped(true)
{
- handle.startWatch(poller);
- handle.unwatch();
}
template <class T> void PollableQueue<T>::start() {
@@ -112,11 +117,10 @@ template <class T> void PollableQueue<T>::start() {
if (!stopped) return;
stopped = false;
if (!queue.empty()) condition.set();
- handle.rewatch();
+ condition.rearm();
}
template <class T> PollableQueue<T>::~PollableQueue() {
- handle.stopWatch();
}
template <class T> void PollableQueue<T>::push(const T& t) {
@@ -125,15 +129,15 @@ template <class T> void PollableQueue<T>::push(const T& t) {
queue.push_back(t);
}
-template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) {
+template <class T> void PollableQueue<T>::dispatch(PollableCondition& cond) {
ScopedLock l(lock);
assert(dispatcher.id() == 0);
dispatcher = Thread::current();
process();
dispatcher = Thread();
- if (queue.empty()) condition.clear();
+ if (queue.empty()) cond.clear();
if (stopped) lock.notifyAll();
- else h.rewatch();
+ else cond.rearm();
}
template <class T> void PollableQueue<T>::process() {
@@ -159,7 +163,7 @@ template <class T> void PollableQueue<T>::shutdown() {
template <class T> void PollableQueue<T>::stop() {
ScopedLock l(lock);
if (stopped) return;
- handle.unwatch();
+ condition.disarm();
stopped = true;
// Avoid deadlock if stop is called from the dispatch thread
while (dispatcher.id() && dispatcher.id() != Thread::current().id())