diff options
| author | Alan Conway <aconway@apache.org> | 2008-10-03 20:56:38 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-10-03 20:56:38 +0000 |
| commit | ff5c8a9034026a6d3ae437fa89c9f3cb9a1ba022 (patch) | |
| tree | d8109d15ce3a85a9b6175ba2c9b3c51d8706fe9c /cpp/src/qpid/sys/PollableQueue.h | |
| parent | 2141967346b884e592a42353ae596d37eb90fe7b (diff) | |
| download | qpid-python-ff5c8a9034026a6d3ae437fa89c9f3cb9a1ba022.tar.gz | |
Cluster join & brain-dumps working.
cluster: improved join protocol, fixed race conditions.
client/ConnectionHandler,ConnectionImpl: fixed connection close race causing client hang.
src/qpid/sys/PollableQueue.h: fixed incorrect use of startWatch/stopWatch.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@701532 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/PollableQueue.h')
| -rw-r--r-- | cpp/src/qpid/sys/PollableQueue.h | 99 |
1 files changed, 49 insertions, 50 deletions
diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h index 3a94c60be0..8313196623 100644 --- a/cpp/src/qpid/sys/PollableQueue.h +++ b/cpp/src/qpid/sys/PollableQueue.h @@ -42,40 +42,31 @@ class Poller; */ template <class T> class PollableQueue { - typedef std::deque<T> Queue; - public: - typedef typename Queue::iterator iterator; - /** Callback to process a range of items. */ - typedef boost::function<void (const iterator&, const iterator&)> Callback; + typedef boost::function<void (const T&)> Callback; - /** @see forEach() */ - template <class F> struct ForEach { - F handleOne; - ForEach(F f) : handleOne(f) {} - void operator()(const iterator& i, const iterator& j) const { std::for_each(i, j, handleOne); } - }; - - /** Create a range callback from a functor that processes a single item. */ - template <class F> static ForEach<F> forEach(const F& f) { return ForEach<F>(f); } - /** When the queue is selected by the poller, values are passed to callback cb. */ - explicit PollableQueue(const Callback& cb); + PollableQueue(const Callback& cb, const boost::shared_ptr<sys::Poller>& poller); + ~PollableQueue(); + /** Push a value onto the queue. Thread safe */ void push(const T& t); /** Start polling. */ - void start(const boost::shared_ptr<sys::Poller>& poller); + void start(); /** Stop polling and wait for the current callback, if any, to complete. */ void stop(); /** Are we currently stopped?*/ - bool isStopped() const; - + bool isStopped() const { ScopedLock l(lock); return stopped; } + + size_t size() { ScopedLock l(lock); return queue.size(); } + bool empty() { ScopedLock l(lock); return queue.empty(); } private: + typedef std::deque<T> Queue; typedef sys::Monitor::ScopedLock ScopedLock; typedef sys::Monitor::ScopedUnlock ScopedUnlock; @@ -83,59 +74,67 @@ class PollableQueue { mutable sys::Monitor lock; Callback callback; + boost::shared_ptr<sys::Poller> poller; PollableCondition condition; - sys::DispatchHandle handle; + DispatchHandle handle; Queue queue; - Queue batch; - bool dispatching, stopped; + Thread dispatcher; + bool stopped; }; -template <class T> PollableQueue<T>::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12: - : callback(cb), - handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0), - dispatching(false), stopped(true) -{} +template <class T> PollableQueue<T>::PollableQueue( + const Callback& cb, const boost::shared_ptr<sys::Poller>& p) + : callback(cb), poller(p), + handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0), stopped(true) +{ + handle.startWatch(poller); + handle.unwatch(); +} -template <class T> void PollableQueue<T>::start(const boost::shared_ptr<sys::Poller>& poller) { +template <class T> void PollableQueue<T>::start() { ScopedLock l(lock); + assert(stopped); stopped = false; - handle.startWatch(poller); + if (!queue.empty()) condition.set(); + handle.rewatch(); +} + +template <class T> PollableQueue<T>::~PollableQueue() { + handle.stopWatch(); } template <class T> void PollableQueue<T>::push(const T& t) { ScopedLock l(lock); + if (queue.empty()) condition.set(); queue.push_back(t); - condition.set(); } template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) { - ScopedLock l(lock); - if (stopped) return; - dispatching = true; - condition.clear(); - batch.clear(); - batch.swap(queue); // Snapshot of current queue contents. - { - // Process outside the lock to allow concurrent push. - ScopedUnlock u(lock); - callback(batch.begin(), batch.end()); + ScopedLock l(lock); // Prevent concurrent push + assert(dispatcher.id() == 0 || dispatcher.id() == Thread::current().id()); + dispatcher = Thread::current(); + while (!stopped && !queue.empty()) { + T value = queue.front(); + queue.pop_front(); + { // callback outside the lock to allow concurrent push. + ScopedUnlock u(lock); + callback(value); + } } - batch.clear(); - dispatching = false; + if (queue.empty()) condition.clear(); if (stopped) lock.notifyAll(); - else h.rewatch(); + dispatcher = Thread(); + if (!stopped) h.rewatch(); } template <class T> void PollableQueue<T>::stop() { ScopedLock l(lock); - handle.stopWatch(); + assert(!stopped); + handle.unwatch(); stopped = true; - while (dispatching) lock.wait(); -} - -template <class T> bool PollableQueue<T>::isStopped() const { - ScopedLock l(lock); - return stopped; + // No deadlock if stop is called from the dispatcher thread + while (dispatcher.id() && dispatcher.id() != Thread::current().id()) + lock.wait(); } }} // namespace qpid::sys |
