summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/PollableQueue.h
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-12-11 20:33:26 +0000
committerAlan Conway <aconway@apache.org>2008-12-11 20:33:26 +0000
commitcc781622299a4de5af2fdde6bfc1e2eb42e1623a (patch)
treef2488e6e4006b441694d9d1cfc8f775aab3c850b /cpp/src/qpid/sys/PollableQueue.h
parentf54d88f90490a2e7eaf93f5e11d788aeeb858390 (diff)
downloadqpid-python-cc781622299a4de5af2fdde6bfc1e2eb42e1623a.tar.gz
sys/PollableQueue: dispatch in batches, allow put-back.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@725802 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/PollableQueue.h')
-rw-r--r--cpp/src/qpid/sys/PollableQueue.h35
1 files changed, 16 insertions, 19 deletions
diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h
index 953d198fb0..7f11cc35a9 100644
--- a/cpp/src/qpid/sys/PollableQueue.h
+++ b/cpp/src/qpid/sys/PollableQueue.h
@@ -44,13 +44,13 @@ class Poller;
template <class T>
class PollableQueue {
public:
+ typedef std::deque<T> Queue;
+
/**
- * Callback to process an item from the queue.
- *
- * @return If true the item is removed from the queue else it
- * remains on the queue and the queue is stopped.
+ * Callback to process a batch of items from the queue.
+ * @param values to process, any items remaining after call are put back on the queue.
*/
- typedef boost::function<bool (const T&)> Callback;
+ typedef boost::function<void (Queue& values)> Callback;
/** When the queue is selected by the poller, values are passed to callback cb. */
PollableQueue(const Callback& cb, const boost::shared_ptr<sys::Poller>& poller);
@@ -73,7 +73,6 @@ class PollableQueue {
bool empty() { ScopedLock l(lock); return queue.empty(); }
private:
- typedef std::deque<T> Queue;
typedef sys::Monitor::ScopedLock ScopedLock;
typedef sys::Monitor::ScopedUnlock ScopedUnlock;
@@ -84,7 +83,7 @@ class PollableQueue {
boost::shared_ptr<sys::Poller> poller;
PollableCondition condition;
DispatchHandle handle;
- Queue queue;
+ Queue queue, batch;
Thread dispatcher;
bool stopped;
};
@@ -117,21 +116,19 @@ template <class T> void PollableQueue<T>::push(const T& t) {
}
template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) {
- ScopedLock l(lock); // Prevent concurrent push
- assert(dispatcher.id() == 0 || dispatcher.id() == Thread::current().id());
+ ScopedLock l(lock);
+ assert(dispatcher.id() == 0);
dispatcher = Thread::current();
while (!stopped && !queue.empty()) {
- T value = queue.front();
- queue.pop_front();
- bool ok = false;
- { // unlock to allow concurrent push or call to stop() in callback.
- ScopedUnlock u(lock);
- // FIXME aconway 2008-12-02: not exception safe if callback throws.
- ok = callback(value);
+ assert(batch.empty());
+ batch.swap(queue);
+ {
+ ScopedUnlock u(lock); // Allow concurrent push to queue.
+ callback(batch);
}
- if (!ok) { // callback cannot process value, put it back.
- queue.push_front(value);
- stopped=true;
+ if (!batch.empty()) {
+ queue.insert(queue.begin(), batch.begin(), batch.end()); // put back unprocessed items.
+ batch.clear();
}
}
dispatcher = Thread();