From ff5c8a9034026a6d3ae437fa89c9f3cb9a1ba022 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 3 Oct 2008 20:56:38 +0000 Subject: Cluster join & brain-dumps working. cluster: improved join protocol, fixed race conditions. client/ConnectionHandler,ConnectionImpl: fixed connection close race causing client hang. src/qpid/sys/PollableQueue.h: fixed incorrect use of startWatch/stopWatch. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@701532 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/sys/PollableQueue.h | 99 ++++++++++++++++++++-------------------- 1 file changed, 49 insertions(+), 50 deletions(-) (limited to 'cpp/src/qpid/sys/PollableQueue.h') diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h index 3a94c60be0..8313196623 100644 --- a/cpp/src/qpid/sys/PollableQueue.h +++ b/cpp/src/qpid/sys/PollableQueue.h @@ -42,40 +42,31 @@ class Poller; */ template class PollableQueue { - typedef std::deque Queue; - public: - typedef typename Queue::iterator iterator; - /** Callback to process a range of items. */ - typedef boost::function Callback; + typedef boost::function Callback; - /** @see forEach() */ - template struct ForEach { - F handleOne; - ForEach(F f) : handleOne(f) {} - void operator()(const iterator& i, const iterator& j) const { std::for_each(i, j, handleOne); } - }; - - /** Create a range callback from a functor that processes a single item. */ - template static ForEach forEach(const F& f) { return ForEach(f); } - /** When the queue is selected by the poller, values are passed to callback cb. */ - explicit PollableQueue(const Callback& cb); + PollableQueue(const Callback& cb, const boost::shared_ptr& poller); + ~PollableQueue(); + /** Push a value onto the queue. Thread safe */ void push(const T& t); /** Start polling. */ - void start(const boost::shared_ptr& poller); + void start(); /** Stop polling and wait for the current callback, if any, to complete. */ void stop(); /** Are we currently stopped?*/ - bool isStopped() const; - + bool isStopped() const { ScopedLock l(lock); return stopped; } + + size_t size() { ScopedLock l(lock); return queue.size(); } + bool empty() { ScopedLock l(lock); return queue.empty(); } private: + typedef std::deque Queue; typedef sys::Monitor::ScopedLock ScopedLock; typedef sys::Monitor::ScopedUnlock ScopedUnlock; @@ -83,59 +74,67 @@ class PollableQueue { mutable sys::Monitor lock; Callback callback; + boost::shared_ptr poller; PollableCondition condition; - sys::DispatchHandle handle; + DispatchHandle handle; Queue queue; - Queue batch; - bool dispatching, stopped; + Thread dispatcher; + bool stopped; }; -template PollableQueue::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12: - : callback(cb), - handle(condition, boost::bind(&PollableQueue::dispatch, this, _1), 0, 0), - dispatching(false), stopped(true) -{} +template PollableQueue::PollableQueue( + const Callback& cb, const boost::shared_ptr& p) + : callback(cb), poller(p), + handle(condition, boost::bind(&PollableQueue::dispatch, this, _1), 0, 0), stopped(true) +{ + handle.startWatch(poller); + handle.unwatch(); +} -template void PollableQueue::start(const boost::shared_ptr& poller) { +template void PollableQueue::start() { ScopedLock l(lock); + assert(stopped); stopped = false; - handle.startWatch(poller); + if (!queue.empty()) condition.set(); + handle.rewatch(); +} + +template PollableQueue::~PollableQueue() { + handle.stopWatch(); } template void PollableQueue::push(const T& t) { ScopedLock l(lock); + if (queue.empty()) condition.set(); queue.push_back(t); - condition.set(); } template void PollableQueue::dispatch(sys::DispatchHandle& h) { - ScopedLock l(lock); - if (stopped) return; - dispatching = true; - condition.clear(); - batch.clear(); - batch.swap(queue); // Snapshot of current queue contents. - { - // Process outside the lock to allow concurrent push. - ScopedUnlock u(lock); - callback(batch.begin(), batch.end()); + ScopedLock l(lock); // Prevent concurrent push + assert(dispatcher.id() == 0 || dispatcher.id() == Thread::current().id()); + dispatcher = Thread::current(); + while (!stopped && !queue.empty()) { + T value = queue.front(); + queue.pop_front(); + { // callback outside the lock to allow concurrent push. + ScopedUnlock u(lock); + callback(value); + } } - batch.clear(); - dispatching = false; + if (queue.empty()) condition.clear(); if (stopped) lock.notifyAll(); - else h.rewatch(); + dispatcher = Thread(); + if (!stopped) h.rewatch(); } template void PollableQueue::stop() { ScopedLock l(lock); - handle.stopWatch(); + assert(!stopped); + handle.unwatch(); stopped = true; - while (dispatching) lock.wait(); -} - -template bool PollableQueue::isStopped() const { - ScopedLock l(lock); - return stopped; + // No deadlock if stop is called from the dispatcher thread + while (dispatcher.id() && dispatcher.id() != Thread::current().id()) + lock.wait(); } }} // namespace qpid::sys -- cgit v1.2.1