#ifndef QPID_SYS_POLLABLEQUEUE_H #define QPID_SYS_POLLABLEQUEUE_H /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/sys/PollableCondition.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Thread.h" #include #include #include #include #include "qpid/log/Statement.h" // FIXME aconway 2011-08-05: namespace qpid { namespace sys { class Poller; /** * A queue whose item processing is dispatched by sys::Poller. * Any thread can push to the queue; items pushed trigger an event the Poller * recognizes. When a Poller I/O thread dispatches the event, a * user-specified callback is invoked with all items on the queue. */ template class PollableQueue { public: typedef std::deque Batch; typedef T value_type; /** * Callback to process a batch of items from the queue. * * @param batch Queue of values to process. Any items remaining * on return from Callback are put back on the queue. * @return iterator pointing to the first un-processed item in batch. * Items from this point up to batch.end() are put back on the queue. */ typedef boost::function Callback; /** * Constructor; sets necessary parameters. * * @param cb Callback that will be called to process items on the * queue. Will be called from a Poller I/O thread. * @param poller Poller to use for dispatching queue events. */ PollableQueue(const Callback& cb, const boost::shared_ptr& poller); ~PollableQueue(); /** Push a value onto the queue. Thread safe */ void push(const T& t); /** Start polling. */ void start(); /** Stop polling and wait for the current callback, if any, to complete. */ void stop(); /** Are we currently stopped?*/ bool isStopped() const { ScopedLock l(lock); return stopped; } size_t size() { ScopedLock l(lock); return queue.size(); } bool empty() { ScopedLock l(lock); return queue.empty(); } /** * Allow any queued events to be processed; intended for calling * after all dispatch threads exit the Poller loop in order to * ensure clean shutdown with no events left on the queue. */ void shutdown(); private: typedef sys::Monitor::ScopedLock ScopedLock; typedef sys::Monitor::ScopedUnlock ScopedUnlock; void dispatch(PollableCondition& cond); void process(); mutable sys::Monitor lock; Callback callback; PollableCondition condition; Batch queue, batch; Thread dispatcher; bool stopped; }; template PollableQueue::PollableQueue( const Callback& cb, const boost::shared_ptr& p) : callback(cb), condition(boost::bind(&PollableQueue::dispatch, this, _1), p), stopped(true) { } template void PollableQueue::start() { ScopedLock l(lock); if (!stopped) return; stopped = false; if (!queue.empty()) condition.set(); } template PollableQueue::~PollableQueue() { } template void PollableQueue::push(const T& t) { ScopedLock l(lock); if (queue.empty() && !stopped) condition.set(); queue.push_back(t); } template void PollableQueue::dispatch(PollableCondition& cond) { ScopedLock l(lock); assert(!dispatcher); dispatcher = Thread::current(); process(); dispatcher = Thread(); if (queue.empty()) cond.clear(); if (stopped) lock.notifyAll(); } template void PollableQueue::process() { // Called with lock held while (!stopped && !queue.empty()) { assert(batch.empty()); batch.swap(queue); typename Batch::const_iterator putBack; { ScopedUnlock u(lock); // Allow concurrent push to queue. putBack = callback(batch); } // put back unprocessed items. queue.insert(queue.begin(), putBack, typename Batch::const_iterator(batch.end())); batch.clear(); } } template void PollableQueue::shutdown() { ScopedLock l(lock); process(); } template void PollableQueue::stop() { ScopedLock l(lock); if (stopped) return; condition.clear(); stopped = true; // Avoid deadlock if stop is called from the dispatch thread if (dispatcher && dispatcher != Thread::current()) while (dispatcher) lock.wait(); } }} // namespace qpid::sys #endif /*!QPID_SYS_POLLABLEQUEUE_H*/