diff options
| author | Alan Conway <aconway@apache.org> | 2008-12-11 20:33:26 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-12-11 20:33:26 +0000 |
| commit | cc781622299a4de5af2fdde6bfc1e2eb42e1623a (patch) | |
| tree | f2488e6e4006b441694d9d1cfc8f775aab3c850b /cpp/src/qpid/sys/PollableQueue.h | |
| parent | f54d88f90490a2e7eaf93f5e11d788aeeb858390 (diff) | |
| download | qpid-python-cc781622299a4de5af2fdde6bfc1e2eb42e1623a.tar.gz | |
sys/PollableQueue: dispatch in batches, allow put-back.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@725802 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/PollableQueue.h')
| -rw-r--r-- | cpp/src/qpid/sys/PollableQueue.h | 35 |
1 files changed, 16 insertions, 19 deletions
diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h index 953d198fb0..7f11cc35a9 100644 --- a/cpp/src/qpid/sys/PollableQueue.h +++ b/cpp/src/qpid/sys/PollableQueue.h @@ -44,13 +44,13 @@ class Poller; template <class T> class PollableQueue { public: + typedef std::deque<T> Queue; + /** - * Callback to process an item from the queue. - * - * @return If true the item is removed from the queue else it - * remains on the queue and the queue is stopped. + * Callback to process a batch of items from the queue. + * @param values to process, any items remaining after call are put back on the queue. */ - typedef boost::function<bool (const T&)> Callback; + typedef boost::function<void (Queue& values)> Callback; /** When the queue is selected by the poller, values are passed to callback cb. */ PollableQueue(const Callback& cb, const boost::shared_ptr<sys::Poller>& poller); @@ -73,7 +73,6 @@ class PollableQueue { bool empty() { ScopedLock l(lock); return queue.empty(); } private: - typedef std::deque<T> Queue; typedef sys::Monitor::ScopedLock ScopedLock; typedef sys::Monitor::ScopedUnlock ScopedUnlock; @@ -84,7 +83,7 @@ class PollableQueue { boost::shared_ptr<sys::Poller> poller; PollableCondition condition; DispatchHandle handle; - Queue queue; + Queue queue, batch; Thread dispatcher; bool stopped; }; @@ -117,21 +116,19 @@ template <class T> void PollableQueue<T>::push(const T& t) { } template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) { - ScopedLock l(lock); // Prevent concurrent push - assert(dispatcher.id() == 0 || dispatcher.id() == Thread::current().id()); + ScopedLock l(lock); + assert(dispatcher.id() == 0); dispatcher = Thread::current(); while (!stopped && !queue.empty()) { - T value = queue.front(); - queue.pop_front(); - bool ok = false; - { // unlock to allow concurrent push or call to stop() in callback. - ScopedUnlock u(lock); - // FIXME aconway 2008-12-02: not exception safe if callback throws. - ok = callback(value); + assert(batch.empty()); + batch.swap(queue); + { + ScopedUnlock u(lock); // Allow concurrent push to queue. + callback(batch); } - if (!ok) { // callback cannot process value, put it back. - queue.push_front(value); - stopped=true; + if (!batch.empty()) { + queue.insert(queue.begin(), batch.begin(), batch.end()); // put back unprocessed items. + batch.clear(); } } dispatcher = Thread(); |
