summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-03 20:56:38 +0000
committerAlan Conway <aconway@apache.org>2008-10-03 20:56:38 +0000
commitff5c8a9034026a6d3ae437fa89c9f3cb9a1ba022 (patch)
treed8109d15ce3a85a9b6175ba2c9b3c51d8706fe9c /cpp/src/qpid/sys
parent2141967346b884e592a42353ae596d37eb90fe7b (diff)
downloadqpid-python-ff5c8a9034026a6d3ae437fa89c9f3cb9a1ba022.tar.gz
Cluster join & brain-dumps working.
cluster: improved join protocol, fixed race conditions. client/ConnectionHandler,ConnectionImpl: fixed connection close race causing client hang. src/qpid/sys/PollableQueue.h: fixed incorrect use of startWatch/stopWatch. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@701532 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r--cpp/src/qpid/sys/LockPtr.h89
-rw-r--r--cpp/src/qpid/sys/PollableQueue.h99
2 files changed, 138 insertions, 50 deletions
diff --git a/cpp/src/qpid/sys/LockPtr.h b/cpp/src/qpid/sys/LockPtr.h
new file mode 100644
index 0000000000..738a864317
--- /dev/null
+++ b/cpp/src/qpid/sys/LockPtr.h
@@ -0,0 +1,89 @@
+#ifndef QPID_SYS_LOCKPTR_H
+#define QPID_SYS_LOCKPTR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Mutex.h"
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace sys {
+
+class Mutex;
+
+/**
+ * LockPtr is a smart pointer to T. It is constructed from a volatile
+ * T* and a Lock (by default a Mutex). It const_casts away the
+ * volatile qualifier and locks the Lock for the duration of its
+ *
+ * Used in conjuntion with the "volatile" keyword to get the compiler
+ * to help enforce correct concurrent use of mutli-threaded objects.
+ * See ochttp://www.ddj.com/cpp/184403766 for a detailed discussion.
+ *
+ * To summarize the convention:
+ * - Declare thread-safe member functions as volatile.
+ * - Declare instances of the class that may be called concurrently as volatile.
+ * - Use LockPtr to cast away the volatile qualifier while taking a lock.
+ *
+ * This means that code calling on a concurrently-used object
+ * (declared volatile) can only call thread-safe (volatile) member
+ * functions. Code that needs to use thread-unsafe members must use a
+ * LockPtr, thereby acquiring the lock and making it safe to do so.
+ *
+ * A good type-safe pattern is the internally-locked object:
+ * - It has it's own private lock member.
+ * - All public functions are thread safe and declared volatile.
+ * - Any thread-unsafe, non-volatile functions are private.
+ * - Only member function implementations use LockPtr to access private functions.
+ *
+ * This encapsulates all the locking logic inside the class.
+ *
+ * One nice feature of this convention is the common case where you
+ * need a public, locked version of some function foo() and also a
+ * private unlocked version to avoid recursive locks. They can be declared as
+ * volatile and non-volatile overloads of the same function:
+ *
+ * // public
+ * void Thing::foo() volatile { LockPtr<Thing>(this, myLock)->foo(); }
+ * // private
+ * void Thing::foo() { ... do stuff ...}
+ */
+
+template <class T, class Lock> class LockPtr : public boost::noncopyable {
+ public:
+ LockPtr(volatile T* p, Lock& l) : ptr(const_cast<T*>(p)), lock(l) { lock.lock(); }
+ LockPtr(volatile T* p, volatile Lock& l) : ptr(const_cast<T*>(p)), lock(const_cast<Lock&>(l)) { lock.lock(); }
+ ~LockPtr() { lock.unlock(); }
+
+ T& operator*() { return *ptr; }
+ T* operator->() { return ptr; }
+
+ private:
+ T* ptr;
+ Lock& lock;
+};
+
+
+}} // namespace qpid::sys
+
+
+#endif /*!QPID_SYS_LOCKPTR_H*/
diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h
index 3a94c60be0..8313196623 100644
--- a/cpp/src/qpid/sys/PollableQueue.h
+++ b/cpp/src/qpid/sys/PollableQueue.h
@@ -42,40 +42,31 @@ class Poller;
*/
template <class T>
class PollableQueue {
- typedef std::deque<T> Queue;
-
public:
- typedef typename Queue::iterator iterator;
-
/** Callback to process a range of items. */
- typedef boost::function<void (const iterator&, const iterator&)> Callback;
+ typedef boost::function<void (const T&)> Callback;
- /** @see forEach() */
- template <class F> struct ForEach {
- F handleOne;
- ForEach(F f) : handleOne(f) {}
- void operator()(const iterator& i, const iterator& j) const { std::for_each(i, j, handleOne); }
- };
-
- /** Create a range callback from a functor that processes a single item. */
- template <class F> static ForEach<F> forEach(const F& f) { return ForEach<F>(f); }
-
/** When the queue is selected by the poller, values are passed to callback cb. */
- explicit PollableQueue(const Callback& cb);
+ PollableQueue(const Callback& cb, const boost::shared_ptr<sys::Poller>& poller);
+ ~PollableQueue();
+
/** Push a value onto the queue. Thread safe */
void push(const T& t);
/** Start polling. */
- void start(const boost::shared_ptr<sys::Poller>& poller);
+ void start();
/** Stop polling and wait for the current callback, if any, to complete. */
void stop();
/** Are we currently stopped?*/
- bool isStopped() const;
-
+ bool isStopped() const { ScopedLock l(lock); return stopped; }
+
+ size_t size() { ScopedLock l(lock); return queue.size(); }
+ bool empty() { ScopedLock l(lock); return queue.empty(); }
private:
+ typedef std::deque<T> Queue;
typedef sys::Monitor::ScopedLock ScopedLock;
typedef sys::Monitor::ScopedUnlock ScopedUnlock;
@@ -83,59 +74,67 @@ class PollableQueue {
mutable sys::Monitor lock;
Callback callback;
+ boost::shared_ptr<sys::Poller> poller;
PollableCondition condition;
- sys::DispatchHandle handle;
+ DispatchHandle handle;
Queue queue;
- Queue batch;
- bool dispatching, stopped;
+ Thread dispatcher;
+ bool stopped;
};
-template <class T> PollableQueue<T>::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12:
- : callback(cb),
- handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0),
- dispatching(false), stopped(true)
-{}
+template <class T> PollableQueue<T>::PollableQueue(
+ const Callback& cb, const boost::shared_ptr<sys::Poller>& p)
+ : callback(cb), poller(p),
+ handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0), stopped(true)
+{
+ handle.startWatch(poller);
+ handle.unwatch();
+}
-template <class T> void PollableQueue<T>::start(const boost::shared_ptr<sys::Poller>& poller) {
+template <class T> void PollableQueue<T>::start() {
ScopedLock l(lock);
+ assert(stopped);
stopped = false;
- handle.startWatch(poller);
+ if (!queue.empty()) condition.set();
+ handle.rewatch();
+}
+
+template <class T> PollableQueue<T>::~PollableQueue() {
+ handle.stopWatch();
}
template <class T> void PollableQueue<T>::push(const T& t) {
ScopedLock l(lock);
+ if (queue.empty()) condition.set();
queue.push_back(t);
- condition.set();
}
template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) {
- ScopedLock l(lock);
- if (stopped) return;
- dispatching = true;
- condition.clear();
- batch.clear();
- batch.swap(queue); // Snapshot of current queue contents.
- {
- // Process outside the lock to allow concurrent push.
- ScopedUnlock u(lock);
- callback(batch.begin(), batch.end());
+ ScopedLock l(lock); // Prevent concurrent push
+ assert(dispatcher.id() == 0 || dispatcher.id() == Thread::current().id());
+ dispatcher = Thread::current();
+ while (!stopped && !queue.empty()) {
+ T value = queue.front();
+ queue.pop_front();
+ { // callback outside the lock to allow concurrent push.
+ ScopedUnlock u(lock);
+ callback(value);
+ }
}
- batch.clear();
- dispatching = false;
+ if (queue.empty()) condition.clear();
if (stopped) lock.notifyAll();
- else h.rewatch();
+ dispatcher = Thread();
+ if (!stopped) h.rewatch();
}
template <class T> void PollableQueue<T>::stop() {
ScopedLock l(lock);
- handle.stopWatch();
+ assert(!stopped);
+ handle.unwatch();
stopped = true;
- while (dispatching) lock.wait();
-}
-
-template <class T> bool PollableQueue<T>::isStopped() const {
- ScopedLock l(lock);
- return stopped;
+ // No deadlock if stop is called from the dispatcher thread
+ while (dispatcher.id() && dispatcher.id() != Thread::current().id())
+ lock.wait();
}
}} // namespace qpid::sys