summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/PollableQueue.h
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-03 20:56:38 +0000
committerAlan Conway <aconway@apache.org>2008-10-03 20:56:38 +0000
commitff5c8a9034026a6d3ae437fa89c9f3cb9a1ba022 (patch)
treed8109d15ce3a85a9b6175ba2c9b3c51d8706fe9c /cpp/src/qpid/sys/PollableQueue.h
parent2141967346b884e592a42353ae596d37eb90fe7b (diff)
downloadqpid-python-ff5c8a9034026a6d3ae437fa89c9f3cb9a1ba022.tar.gz
Cluster join & brain-dumps working.
cluster: improved join protocol, fixed race conditions. client/ConnectionHandler,ConnectionImpl: fixed connection close race causing client hang. src/qpid/sys/PollableQueue.h: fixed incorrect use of startWatch/stopWatch. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@701532 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/PollableQueue.h')
-rw-r--r--cpp/src/qpid/sys/PollableQueue.h99
1 files changed, 49 insertions, 50 deletions
diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h
index 3a94c60be0..8313196623 100644
--- a/cpp/src/qpid/sys/PollableQueue.h
+++ b/cpp/src/qpid/sys/PollableQueue.h
@@ -42,40 +42,31 @@ class Poller;
*/
template <class T>
class PollableQueue {
- typedef std::deque<T> Queue;
-
public:
- typedef typename Queue::iterator iterator;
-
/** Callback to process a range of items. */
- typedef boost::function<void (const iterator&, const iterator&)> Callback;
+ typedef boost::function<void (const T&)> Callback;
- /** @see forEach() */
- template <class F> struct ForEach {
- F handleOne;
- ForEach(F f) : handleOne(f) {}
- void operator()(const iterator& i, const iterator& j) const { std::for_each(i, j, handleOne); }
- };
-
- /** Create a range callback from a functor that processes a single item. */
- template <class F> static ForEach<F> forEach(const F& f) { return ForEach<F>(f); }
-
/** When the queue is selected by the poller, values are passed to callback cb. */
- explicit PollableQueue(const Callback& cb);
+ PollableQueue(const Callback& cb, const boost::shared_ptr<sys::Poller>& poller);
+ ~PollableQueue();
+
/** Push a value onto the queue. Thread safe */
void push(const T& t);
/** Start polling. */
- void start(const boost::shared_ptr<sys::Poller>& poller);
+ void start();
/** Stop polling and wait for the current callback, if any, to complete. */
void stop();
/** Are we currently stopped?*/
- bool isStopped() const;
-
+ bool isStopped() const { ScopedLock l(lock); return stopped; }
+
+ size_t size() { ScopedLock l(lock); return queue.size(); }
+ bool empty() { ScopedLock l(lock); return queue.empty(); }
private:
+ typedef std::deque<T> Queue;
typedef sys::Monitor::ScopedLock ScopedLock;
typedef sys::Monitor::ScopedUnlock ScopedUnlock;
@@ -83,59 +74,67 @@ class PollableQueue {
mutable sys::Monitor lock;
Callback callback;
+ boost::shared_ptr<sys::Poller> poller;
PollableCondition condition;
- sys::DispatchHandle handle;
+ DispatchHandle handle;
Queue queue;
- Queue batch;
- bool dispatching, stopped;
+ Thread dispatcher;
+ bool stopped;
};
-template <class T> PollableQueue<T>::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12:
- : callback(cb),
- handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0),
- dispatching(false), stopped(true)
-{}
+template <class T> PollableQueue<T>::PollableQueue(
+ const Callback& cb, const boost::shared_ptr<sys::Poller>& p)
+ : callback(cb), poller(p),
+ handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0), stopped(true)
+{
+ handle.startWatch(poller);
+ handle.unwatch();
+}
-template <class T> void PollableQueue<T>::start(const boost::shared_ptr<sys::Poller>& poller) {
+template <class T> void PollableQueue<T>::start() {
ScopedLock l(lock);
+ assert(stopped);
stopped = false;
- handle.startWatch(poller);
+ if (!queue.empty()) condition.set();
+ handle.rewatch();
+}
+
+template <class T> PollableQueue<T>::~PollableQueue() {
+ handle.stopWatch();
}
template <class T> void PollableQueue<T>::push(const T& t) {
ScopedLock l(lock);
+ if (queue.empty()) condition.set();
queue.push_back(t);
- condition.set();
}
template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) {
- ScopedLock l(lock);
- if (stopped) return;
- dispatching = true;
- condition.clear();
- batch.clear();
- batch.swap(queue); // Snapshot of current queue contents.
- {
- // Process outside the lock to allow concurrent push.
- ScopedUnlock u(lock);
- callback(batch.begin(), batch.end());
+ ScopedLock l(lock); // Prevent concurrent push
+ assert(dispatcher.id() == 0 || dispatcher.id() == Thread::current().id());
+ dispatcher = Thread::current();
+ while (!stopped && !queue.empty()) {
+ T value = queue.front();
+ queue.pop_front();
+ { // callback outside the lock to allow concurrent push.
+ ScopedUnlock u(lock);
+ callback(value);
+ }
}
- batch.clear();
- dispatching = false;
+ if (queue.empty()) condition.clear();
if (stopped) lock.notifyAll();
- else h.rewatch();
+ dispatcher = Thread();
+ if (!stopped) h.rewatch();
}
template <class T> void PollableQueue<T>::stop() {
ScopedLock l(lock);
- handle.stopWatch();
+ assert(!stopped);
+ handle.unwatch();
stopped = true;
- while (dispatching) lock.wait();
-}
-
-template <class T> bool PollableQueue<T>::isStopped() const {
- ScopedLock l(lock);
- return stopped;
+ // No deadlock if stop is called from the dispatcher thread
+ while (dispatcher.id() && dispatcher.id() != Thread::current().id())
+ lock.wait();
}
}} // namespace qpid::sys