diff options
| author | Alan Conway <aconway@apache.org> | 2008-10-03 20:56:38 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-10-03 20:56:38 +0000 |
| commit | ff5c8a9034026a6d3ae437fa89c9f3cb9a1ba022 (patch) | |
| tree | d8109d15ce3a85a9b6175ba2c9b3c51d8706fe9c /cpp/src/qpid/sys | |
| parent | 2141967346b884e592a42353ae596d37eb90fe7b (diff) | |
| download | qpid-python-ff5c8a9034026a6d3ae437fa89c9f3cb9a1ba022.tar.gz | |
Cluster join & brain-dumps working.
cluster: improved join protocol, fixed race conditions.
client/ConnectionHandler,ConnectionImpl: fixed connection close race causing client hang.
src/qpid/sys/PollableQueue.h: fixed incorrect use of startWatch/stopWatch.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@701532 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
| -rw-r--r-- | cpp/src/qpid/sys/LockPtr.h | 89 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/PollableQueue.h | 99 |
2 files changed, 138 insertions, 50 deletions
diff --git a/cpp/src/qpid/sys/LockPtr.h b/cpp/src/qpid/sys/LockPtr.h new file mode 100644 index 0000000000..738a864317 --- /dev/null +++ b/cpp/src/qpid/sys/LockPtr.h @@ -0,0 +1,89 @@ +#ifndef QPID_SYS_LOCKPTR_H +#define QPID_SYS_LOCKPTR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Mutex.h" +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace sys { + +class Mutex; + +/** + * LockPtr is a smart pointer to T. It is constructed from a volatile + * T* and a Lock (by default a Mutex). It const_casts away the + * volatile qualifier and locks the Lock for the duration of its + * + * Used in conjuntion with the "volatile" keyword to get the compiler + * to help enforce correct concurrent use of mutli-threaded objects. + * See ochttp://www.ddj.com/cpp/184403766 for a detailed discussion. + * + * To summarize the convention: + * - Declare thread-safe member functions as volatile. + * - Declare instances of the class that may be called concurrently as volatile. + * - Use LockPtr to cast away the volatile qualifier while taking a lock. + * + * This means that code calling on a concurrently-used object + * (declared volatile) can only call thread-safe (volatile) member + * functions. Code that needs to use thread-unsafe members must use a + * LockPtr, thereby acquiring the lock and making it safe to do so. + * + * A good type-safe pattern is the internally-locked object: + * - It has it's own private lock member. + * - All public functions are thread safe and declared volatile. + * - Any thread-unsafe, non-volatile functions are private. + * - Only member function implementations use LockPtr to access private functions. + * + * This encapsulates all the locking logic inside the class. + * + * One nice feature of this convention is the common case where you + * need a public, locked version of some function foo() and also a + * private unlocked version to avoid recursive locks. They can be declared as + * volatile and non-volatile overloads of the same function: + * + * // public + * void Thing::foo() volatile { LockPtr<Thing>(this, myLock)->foo(); } + * // private + * void Thing::foo() { ... do stuff ...} + */ + +template <class T, class Lock> class LockPtr : public boost::noncopyable { + public: + LockPtr(volatile T* p, Lock& l) : ptr(const_cast<T*>(p)), lock(l) { lock.lock(); } + LockPtr(volatile T* p, volatile Lock& l) : ptr(const_cast<T*>(p)), lock(const_cast<Lock&>(l)) { lock.lock(); } + ~LockPtr() { lock.unlock(); } + + T& operator*() { return *ptr; } + T* operator->() { return ptr; } + + private: + T* ptr; + Lock& lock; +}; + + +}} // namespace qpid::sys + + +#endif /*!QPID_SYS_LOCKPTR_H*/ diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h index 3a94c60be0..8313196623 100644 --- a/cpp/src/qpid/sys/PollableQueue.h +++ b/cpp/src/qpid/sys/PollableQueue.h @@ -42,40 +42,31 @@ class Poller; */ template <class T> class PollableQueue { - typedef std::deque<T> Queue; - public: - typedef typename Queue::iterator iterator; - /** Callback to process a range of items. */ - typedef boost::function<void (const iterator&, const iterator&)> Callback; + typedef boost::function<void (const T&)> Callback; - /** @see forEach() */ - template <class F> struct ForEach { - F handleOne; - ForEach(F f) : handleOne(f) {} - void operator()(const iterator& i, const iterator& j) const { std::for_each(i, j, handleOne); } - }; - - /** Create a range callback from a functor that processes a single item. */ - template <class F> static ForEach<F> forEach(const F& f) { return ForEach<F>(f); } - /** When the queue is selected by the poller, values are passed to callback cb. */ - explicit PollableQueue(const Callback& cb); + PollableQueue(const Callback& cb, const boost::shared_ptr<sys::Poller>& poller); + ~PollableQueue(); + /** Push a value onto the queue. Thread safe */ void push(const T& t); /** Start polling. */ - void start(const boost::shared_ptr<sys::Poller>& poller); + void start(); /** Stop polling and wait for the current callback, if any, to complete. */ void stop(); /** Are we currently stopped?*/ - bool isStopped() const; - + bool isStopped() const { ScopedLock l(lock); return stopped; } + + size_t size() { ScopedLock l(lock); return queue.size(); } + bool empty() { ScopedLock l(lock); return queue.empty(); } private: + typedef std::deque<T> Queue; typedef sys::Monitor::ScopedLock ScopedLock; typedef sys::Monitor::ScopedUnlock ScopedUnlock; @@ -83,59 +74,67 @@ class PollableQueue { mutable sys::Monitor lock; Callback callback; + boost::shared_ptr<sys::Poller> poller; PollableCondition condition; - sys::DispatchHandle handle; + DispatchHandle handle; Queue queue; - Queue batch; - bool dispatching, stopped; + Thread dispatcher; + bool stopped; }; -template <class T> PollableQueue<T>::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12: - : callback(cb), - handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0), - dispatching(false), stopped(true) -{} +template <class T> PollableQueue<T>::PollableQueue( + const Callback& cb, const boost::shared_ptr<sys::Poller>& p) + : callback(cb), poller(p), + handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0), stopped(true) +{ + handle.startWatch(poller); + handle.unwatch(); +} -template <class T> void PollableQueue<T>::start(const boost::shared_ptr<sys::Poller>& poller) { +template <class T> void PollableQueue<T>::start() { ScopedLock l(lock); + assert(stopped); stopped = false; - handle.startWatch(poller); + if (!queue.empty()) condition.set(); + handle.rewatch(); +} + +template <class T> PollableQueue<T>::~PollableQueue() { + handle.stopWatch(); } template <class T> void PollableQueue<T>::push(const T& t) { ScopedLock l(lock); + if (queue.empty()) condition.set(); queue.push_back(t); - condition.set(); } template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) { - ScopedLock l(lock); - if (stopped) return; - dispatching = true; - condition.clear(); - batch.clear(); - batch.swap(queue); // Snapshot of current queue contents. - { - // Process outside the lock to allow concurrent push. - ScopedUnlock u(lock); - callback(batch.begin(), batch.end()); + ScopedLock l(lock); // Prevent concurrent push + assert(dispatcher.id() == 0 || dispatcher.id() == Thread::current().id()); + dispatcher = Thread::current(); + while (!stopped && !queue.empty()) { + T value = queue.front(); + queue.pop_front(); + { // callback outside the lock to allow concurrent push. + ScopedUnlock u(lock); + callback(value); + } } - batch.clear(); - dispatching = false; + if (queue.empty()) condition.clear(); if (stopped) lock.notifyAll(); - else h.rewatch(); + dispatcher = Thread(); + if (!stopped) h.rewatch(); } template <class T> void PollableQueue<T>::stop() { ScopedLock l(lock); - handle.stopWatch(); + assert(!stopped); + handle.unwatch(); stopped = true; - while (dispatching) lock.wait(); -} - -template <class T> bool PollableQueue<T>::isStopped() const { - ScopedLock l(lock); - return stopped; + // No deadlock if stop is called from the dispatcher thread + while (dispatcher.id() && dispatcher.id() != Thread::current().id()) + lock.wait(); } }} // namespace qpid::sys |
