diff options
| author | Stephen D. Huston <shuston@apache.org> | 2009-02-20 00:04:37 +0000 |
|---|---|---|
| committer | Stephen D. Huston <shuston@apache.org> | 2009-02-20 00:04:37 +0000 |
| commit | d4b433f542ab8a506d7dbc53e685770a96ee7958 (patch) | |
| tree | f0b7d7c643825f279c45f74a8c17f7b0753b53de /cpp/src/qpid/sys | |
| parent | 44ff0feac9edbf7faffdeed5a3df22313e0543c8 (diff) | |
| download | qpid-python-d4b433f542ab8a506d7dbc53e685770a96ee7958.tar.gz | |
Merged win-pollable-condition branch changes 743545:746056 into trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@746061 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
| -rw-r--r-- | cpp/src/qpid/sys/PollableCondition.h | 54 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/PollableQueue.h | 46 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/PollableCondition.cpp | 119 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/PollableCondition.h | 56 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/windows/PollableCondition.cpp | 125 |
5 files changed, 307 insertions, 93 deletions
diff --git a/cpp/src/qpid/sys/PollableCondition.h b/cpp/src/qpid/sys/PollableCondition.h index 56d38f90da..49e84e6cb0 100644 --- a/cpp/src/qpid/sys/PollableCondition.h +++ b/cpp/src/qpid/sys/PollableCondition.h @@ -22,7 +22,57 @@ * */ -// Currently only has a posix implementation, add #ifdefs for other platforms as needed. -#include "posix/PollableCondition.h" +#include "qpid/sys/Poller.h" +#include <boost/function.hpp> +#include <boost/shared_ptr.hpp> + + +namespace qpid { +namespace sys { + +class PollableConditionPrivate; + +class PollableCondition { +public: + typedef boost::function1<void, PollableCondition&> Callback; + + PollableCondition(const Callback& cb, + const boost::shared_ptr<sys::Poller>& poller); + + ~PollableCondition(); + + /** + * Set the condition. Triggers callback to Callback from Poller. + * When callback is made, condition is suspended. Call rearm() to + * resume reacting to the condition. + */ + void set(); + + /** + * Get the current state of the condition, then clear it. + * + * @return The state of the condition before it was cleared. + */ + bool clear(); + + /** + * Temporarily suspend the ability for the poller to react to the + * condition. It can be rearm()ed later. + */ + void disarm(); + + /** + * Reset the ability for the poller to react to the condition. + */ + void rearm(); + + private: + PollableConditionPrivate *impl; + + Callback callback; + boost::shared_ptr<sys::Poller> poller; +}; + +}} // namespace qpid::sys #endif /*!QPID_SYS_POLLABLECONDITION_H*/ diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h index b5ff98c2c7..a23cc5137a 100644 --- a/cpp/src/qpid/sys/PollableQueue.h +++ b/cpp/src/qpid/sys/PollableQueue.h @@ -23,8 +23,6 @@ */ #include "qpid/sys/PollableCondition.h" -#include "qpid/sys/Dispatcher.h" -#include "qpid/sys/DispatchHandle.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Thread.h" #include <boost/function.hpp> @@ -38,9 +36,10 @@ namespace sys { class Poller; /** - * A queue that can be polled by sys::Poller. Any thread can push to - * the queue, on wakeup the poller thread processes all items on the - * queue by passing them to a callback in a batch. + * A queue whose item processing is dispatched by sys::Poller. + * Any thread can push to the queue; items pushed trigger an event the Poller + * recognizes. When a Poller I/O thread dispatches the event, a + * user-specified callback is invoked with all items on the queue. */ template <class T> class PollableQueue { @@ -50,12 +49,21 @@ class PollableQueue { /** * Callback to process a batch of items from the queue. - * @param values to process, any items remaining after call are put back on the queue. + * + * @param values Queue of values to process. Any items remaining + * on return from Callback are put back on the queue. */ typedef boost::function<void (Queue& values)> Callback; - /** When the queue is selected by the poller, values are passed to callback cb. */ - PollableQueue(const Callback& cb, const boost::shared_ptr<sys::Poller>& poller); + /** + * Constructor; sets necessary parameters. + * + * @param cb Callback that will be called to process items on the + * queue. Will be called from a Poller I/O thread. + * @param poller Poller to use for dispatching queue events. + */ + PollableQueue(const Callback& cb, + const boost::shared_ptr<sys::Poller>& poller); ~PollableQueue(); @@ -85,14 +93,12 @@ class PollableQueue { typedef sys::Monitor::ScopedLock ScopedLock; typedef sys::Monitor::ScopedUnlock ScopedUnlock; - void dispatch(sys::DispatchHandle&); + void dispatch(PollableCondition& cond); void process(); mutable sys::Monitor lock; Callback callback; - boost::shared_ptr<sys::Poller> poller; PollableCondition condition; - DispatchHandleRef handle; Queue queue, batch; Thread dispatcher; bool stopped; @@ -100,11 +106,10 @@ class PollableQueue { template <class T> PollableQueue<T>::PollableQueue( const Callback& cb, const boost::shared_ptr<sys::Poller>& p) - : callback(cb), poller(p), - handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0), stopped(true) + : callback(cb), + condition(boost::bind(&PollableQueue<T>::dispatch, this, _1), p), + stopped(true) { - handle.startWatch(poller); - handle.unwatch(); } template <class T> void PollableQueue<T>::start() { @@ -112,11 +117,10 @@ template <class T> void PollableQueue<T>::start() { if (!stopped) return; stopped = false; if (!queue.empty()) condition.set(); - handle.rewatch(); + condition.rearm(); } template <class T> PollableQueue<T>::~PollableQueue() { - handle.stopWatch(); } template <class T> void PollableQueue<T>::push(const T& t) { @@ -125,15 +129,15 @@ template <class T> void PollableQueue<T>::push(const T& t) { queue.push_back(t); } -template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) { +template <class T> void PollableQueue<T>::dispatch(PollableCondition& cond) { ScopedLock l(lock); assert(dispatcher.id() == 0); dispatcher = Thread::current(); process(); dispatcher = Thread(); - if (queue.empty()) condition.clear(); + if (queue.empty()) cond.clear(); if (stopped) lock.notifyAll(); - else h.rewatch(); + else cond.rearm(); } template <class T> void PollableQueue<T>::process() { @@ -159,7 +163,7 @@ template <class T> void PollableQueue<T>::shutdown() { template <class T> void PollableQueue<T>::stop() { ScopedLock l(lock); if (stopped) return; - handle.unwatch(); + condition.disarm(); stopped = true; // Avoid deadlock if stop is called from the dispatch thread while (dispatcher.id() && dispatcher.id() != Thread::current().id()) diff --git a/cpp/src/qpid/sys/posix/PollableCondition.cpp b/cpp/src/qpid/sys/posix/PollableCondition.cpp index 0c55fd3c0d..0991e5fd76 100644 --- a/cpp/src/qpid/sys/posix/PollableCondition.cpp +++ b/cpp/src/qpid/sys/posix/PollableCondition.cpp @@ -22,17 +22,46 @@ * */ -#include "PollableCondition.h" +#include "qpid/sys/PollableCondition.h" +#include "qpid/sys/DispatchHandle.h" +#include "qpid/sys/IOHandle.h" #include "qpid/sys/posix/PrivatePosix.h" #include "qpid/Exception.h" +#include <boost/bind.hpp> + #include <unistd.h> #include <fcntl.h> namespace qpid { namespace sys { -PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) { +class PollableConditionPrivate : public sys::IOHandle { + friend class PollableCondition; + +private: + PollableConditionPrivate(const sys::PollableCondition::Callback& cb, + sys::PollableCondition& parent, + const boost::shared_ptr<sys::Poller>& poller); + ~PollableConditionPrivate(); + + void dispatch(sys::DispatchHandle& h); + void rewatch(); + void unwatch(); + +private: + PollableCondition::Callback cb; + PollableCondition& parent; + boost::shared_ptr<sys::Poller> poller; + int writeFd; + std::auto_ptr<DispatchHandleRef> handle; +}; + +PollableConditionPrivate::PollableConditionPrivate(const sys::PollableCondition::Callback& cb, + sys::PollableCondition& parent, + const boost::shared_ptr<sys::Poller>& poller) + : IOHandle(new sys::IOHandlePrivate), cb(cb), parent(parent) +{ int fds[2]; if (::pipe(fds) == -1) throw ErrnoException(QPID_MSG("Can't create PollableCondition")); @@ -42,22 +71,71 @@ PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) { throw ErrnoException(QPID_MSG("Can't create PollableCondition")); if (::fcntl(writeFd, F_SETFL, O_NONBLOCK) == -1) throw ErrnoException(QPID_MSG("Can't create PollableCondition")); + handle.reset (new DispatchHandleRef(*this, + boost::bind(&sys::PollableConditionPrivate::dispatch, this, _1), + 0, 0)); + handle->startWatch(poller); + handle->unwatch(); +} + +PollableConditionPrivate::~PollableConditionPrivate() +{ + handle->stopWatch(); + close(writeFd); +} + +void PollableConditionPrivate::dispatch(sys::DispatchHandle& /*h*/) +{ + cb(parent); +} + +void PollableConditionPrivate::rewatch() +{ + handle->rewatch(); +} + +void PollableConditionPrivate::unwatch() +{ + handle->unwatch(); +} + + /* PollableCondition */ + +PollableCondition::PollableCondition(const Callback& cb, + const boost::shared_ptr<sys::Poller>& poller) + : impl(new PollableConditionPrivate(cb, *this, poller)) +{ +} + +PollableCondition::~PollableCondition() +{ + delete impl; +} + +void PollableCondition::set() { + static const char dummy=0; + ssize_t n = ::write(impl->writeFd, &dummy, 1); + if (n == -1 && errno != EAGAIN) + throw ErrnoException("Error setting PollableCondition"); } bool PollableCondition::clear() { char buf[256]; ssize_t n; bool wasSet = false; - while ((n = ::read(impl->fd, buf, sizeof(buf))) > 0) + while ((n = ::read(impl->impl->fd, buf, sizeof(buf))) > 0) wasSet = true; - if (n == -1 && errno != EAGAIN) throw ErrnoException(QPID_MSG("Error clearing PollableCondition")); + if (n == -1 && errno != EAGAIN) + throw ErrnoException(QPID_MSG("Error clearing PollableCondition")); return wasSet; } -void PollableCondition::set() { - static const char dummy=0; - ssize_t n = ::write(writeFd, &dummy, 1); - if (n == -1 && errno != EAGAIN) throw ErrnoException("Error setting PollableCondition"); +void PollableCondition::disarm() { + impl->unwatch(); +} + +void PollableCondition::rearm() { + impl->rewatch(); } @@ -71,22 +149,35 @@ void PollableCondition::set() { namespace qpid { namespace sys { -PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) { +PollableConditionPrivate::PollableConditionPrivate(const PollableCondition::Callback& cb, + sys::PollableCondition& parent, + const boost::shared_ptr<sys::Poller>& poller) + : cb(cb), parent(parent), poller(poller), + IOHandle(new sys::IOHandlePrivate) { impl->fd = ::eventfd(0, 0); if (impl->fd < 0) throw ErrnoException("conditionfd() failed"); } +void PollableCondition::set() { + static const uint64_t value=1; + ssize_t n = ::write(impl->impl->fd, + reinterpret_cast<const void*>(&value), 8); + if (n != 8) throw ErrnoException("write failed on conditionfd"); +} + bool PollableCondition::clear() { char buf[8]; - ssize_t n = ::read(impl->fd, buf, 8); + ssize_t n = ::read(impl->impl->fd, buf, 8); if (n != 8) throw ErrnoException("read failed on conditionfd"); return *reinterpret_cast<uint64_t*>(buf); } -void PollableCondition::set() { - static const uint64_t value=1; - ssize_t n = ::write(impl->fd, reinterpret_cast<const void*>(&value), 8); - if (n != 8) throw ErrnoException("write failed on conditionfd"); +void PollableCondition::disarm() { + // ???? +} + +void PollableCondition::rearm() { + // ???? } #endif diff --git a/cpp/src/qpid/sys/posix/PollableCondition.h b/cpp/src/qpid/sys/posix/PollableCondition.h deleted file mode 100644 index 4ec277b0ec..0000000000 --- a/cpp/src/qpid/sys/posix/PollableCondition.h +++ /dev/null @@ -1,56 +0,0 @@ -#ifndef QPID_SYS_POSIX_POLLABLECONDITION_H -#define QPID_SYS_POSIX_POLLABLECONDITION_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/IOHandle.h" - -namespace qpid { -namespace sys { - -/** - * A pollable condition to integrate in-process conditions with IO - * conditions in a polling loop. - * - * Setting the condition makes it readable for a poller. - * - * Writable/disconnected conditions are undefined and should not be - * polled for. - */ -class PollableCondition : public sys::IOHandle { - public: - PollableCondition(); - - /** Set the condition, triggers readable in a poller. */ - void set(); - - /** Get the current state of the condition, then clear it. - *@return The state of the condition before it was cleared. - */ - bool clear(); - - private: - int writeFd; -}; -}} // namespace qpid::sys - -#endif /*!QPID_SYS_POSIX_POLLABLECONDITION_H*/ diff --git a/cpp/src/qpid/sys/windows/PollableCondition.cpp b/cpp/src/qpid/sys/windows/PollableCondition.cpp new file mode 100644 index 0000000000..ed0f7c3917 --- /dev/null +++ b/cpp/src/qpid/sys/windows/PollableCondition.cpp @@ -0,0 +1,125 @@ +#ifndef QPID_SYS_WINDOWS_POLLABLECONDITION_CPP +#define QPID_SYS_WINDOWS_POLLABLECONDITION_CPP + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/PollableCondition.h" +#include "qpid/sys/IOHandle.h" +#include "AsynchIoResult.h" +#include "IoHandlePrivate.h" + +#include <boost/bind.hpp> +#include <windows.h> + +namespace qpid { +namespace sys { + +// PollableConditionPrivate will reuse the IocpPoller's ability to queue +// a completion to the IOCP and have it dispatched to the completer callback +// noted in the IOHandlePrivate when the request is queued. The +// AsynchCallbackRequest object is not really used - we already have the +// desired callback for the user of PollableCondition. +class PollableConditionPrivate : private IOHandle { + friend class PollableCondition; + +private: + PollableConditionPrivate(const sys::PollableCondition::Callback& cb, + sys::PollableCondition& parent, + const boost::shared_ptr<sys::Poller>& poller); + ~PollableConditionPrivate(); + + void poke(); + void dispatch(AsynchIoResult *result); + +private: + PollableCondition::Callback cb; + PollableCondition& parent; + boost::shared_ptr<sys::Poller> poller; + LONG isSet; + LONG armed; +}; + +PollableConditionPrivate::PollableConditionPrivate(const sys::PollableCondition::Callback& cb, + sys::PollableCondition& parent, + const boost::shared_ptr<sys::Poller>& poller) + : IOHandle(new sys::IOHandlePrivate(INVALID_SOCKET, + boost::bind(&PollableConditionPrivate::dispatch, this, _1))), + cb(cb), parent(parent), poller(poller), isSet(0), armed(0) +{ +} + +PollableConditionPrivate::~PollableConditionPrivate() +{ +} + +void PollableConditionPrivate::poke() +{ + if (!armed) + return; + + // addFd will queue a completion for the IOCP; when it's handled, a + // poller thread will call back to dispatch() below. + PollerHandle ph(*this); + poller->addFd(ph, Poller::INPUT); +} + +void PollableConditionPrivate::dispatch(AsynchIoResult *result) +{ + delete result; // Poller::addFd() allocates this + cb(parent); +} + + /* PollableCondition */ + +PollableCondition::PollableCondition(const Callback& cb, + const boost::shared_ptr<sys::Poller>& poller) + : impl(new PollableConditionPrivate(cb, *this, poller)) +{ +} + +PollableCondition::~PollableCondition() +{ + delete impl; +} + +void PollableCondition::set() { + // Add one to the set count and poke it to provoke a callback + ::InterlockedIncrement(&impl->isSet); + impl->poke(); +} + +bool PollableCondition::clear() { + return (0 != ::InterlockedExchange(&impl->isSet, 0)); +} + +void PollableCondition::disarm() { + ::InterlockedExchange(&impl->armed, 0); +} + +void PollableCondition::rearm() { + if (0 == ::InterlockedExchange(&impl->armed, 1) && impl->isSet) + impl->poke(); +} + +}} // namespace qpid::sys + +#endif /*!QPID_SYS_WINDOWS_POLLABLECONDITION_CPP*/ |
