diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2009-01-06 23:42:18 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2009-01-06 23:42:18 +0000 |
| commit | 9a933ae9011d343a75929136269fe45c6b863a17 (patch) | |
| tree | 29ebd71241d810af6e0f20d7e5694cba1607486f /cpp/src/qpid | |
| parent | 820071d5a9959a2923269751ddcff2ed085b239a (diff) | |
| download | qpid-python-9a933ae9011d343a75929136269fe45c6b863a17.tar.gz | |
Work on the low level IO code:
* Introduce code so that you can interrupt waiting for a handle and receive
a callback that is correctly serialised with the IO callbacks for that
handle
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@732177 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIO.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/DispatchHandle.cpp | 41 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/DispatchHandle.h | 15 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/Dispatcher.cpp | 21 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/Poller.h | 20 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/epoll/EpollPoller.cpp | 188 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 14 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/IOHandle.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/PrivatePosix.h | 20 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/windows/AsynchIO.cpp | 6 |
10 files changed, 269 insertions, 60 deletions
diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h index 68e441349a..0a2a1ca1b4 100644 --- a/cpp/src/qpid/sys/AsynchIO.h +++ b/cpp/src/qpid/sys/AsynchIO.h @@ -114,6 +114,7 @@ public: typedef boost::function2<void, AsynchIO&, const Socket&> ClosedCallback; typedef boost::function1<void, AsynchIO&> BuffersEmptyCallback; typedef boost::function1<void, AsynchIO&> IdleCallback; + typedef boost::function1<void, AsynchIO&> RequestCallback; // Call create() to allocate a new AsynchIO object with the specified // callbacks. This method is implemented in platform-specific code to @@ -138,6 +139,7 @@ public: virtual void queueWriteClose() = 0; virtual bool writeQueueEmpty() = 0; virtual void startReading() = 0; + virtual void requestCallback(RequestCallback) = 0; virtual BufferBase* getQueuedBuffer() = 0; protected: diff --git a/cpp/src/qpid/sys/DispatchHandle.cpp b/cpp/src/qpid/sys/DispatchHandle.cpp index 4722fc0b8b..cbdee7eda6 100644 --- a/cpp/src/qpid/sys/DispatchHandle.cpp +++ b/cpp/src/qpid/sys/DispatchHandle.cpp @@ -270,22 +270,30 @@ void DispatchHandle::stopWatch() { case IDLE: case DELAYED_IDLE: case DELAYED_DELETE: - return; + return; case DELAYED_R: case DELAYED_W: case DELAYED_RW: case DELAYED_INACTIVE: - state = DELAYED_IDLE; - break; + state = DELAYED_IDLE; + break; default: - state = IDLE; - break; + state = IDLE; + break; } assert(poller); poller->delFd(*this); poller.reset(); } +void DispatchHandle::call(Callback iCb) { + assert(iCb); + ScopedLock<Mutex> lock(stateLock); + interruptedCallbacks.push(iCb); + + (void) poller->interrupt(*this); +} + // The slightly strange switch structure // is to ensure that the lock is released before // we do the delete @@ -302,9 +310,9 @@ void DispatchHandle::doDelete() { state = DELAYED_DELETE; return; case IDLE: - break; + break; default: - // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states + // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states assert(false); } } @@ -368,14 +376,29 @@ void DispatchHandle::processEvent(Poller::EventType type) { disconnectedCallback(*this); } break; + case Poller::INTERRUPTED: + { + ScopedLock<Mutex> lock(stateLock); + assert(interruptedCallbacks.size() > 0); + // We'll actually do the interrupt below + } + break; default: assert(false); } - // If any of the callbacks re-enabled reading/writing then actually - // do it now { ScopedLock<Mutex> lock(stateLock); + // If we've got a pending interrupt do it now + while (interruptedCallbacks.size() > 0) { + Callback cb = interruptedCallbacks.front(); + assert(cb); + cb(*this); + interruptedCallbacks.pop(); + } + + // If any of the callbacks re-enabled reading/writing then actually + // do it now switch (state) { case DELAYED_R: poller->modFd(*this, Poller::INPUT); diff --git a/cpp/src/qpid/sys/DispatchHandle.h b/cpp/src/qpid/sys/DispatchHandle.h index 219f2c53d6..ffcbd80f7e 100644 --- a/cpp/src/qpid/sys/DispatchHandle.h +++ b/cpp/src/qpid/sys/DispatchHandle.h @@ -27,6 +27,7 @@ #include <boost/function.hpp> +#include <queue> namespace qpid { namespace sys { @@ -53,11 +54,13 @@ class DispatchHandle : public PollerHandle { friend class DispatchHandleRef; public: typedef boost::function1<void, DispatchHandle&> Callback; + typedef std::queue<Callback> CallbackQueue; private: Callback readableCallback; Callback writableCallback; Callback disconnectedCallback; + CallbackQueue interruptedCallbacks; Poller::shared_ptr poller; Mutex stateLock; enum { @@ -92,12 +95,12 @@ public: /** Add this DispatchHandle to the poller to be watched. */ void startWatch(Poller::shared_ptr poller); - /** Resume watchingn for all non-0 callbacks. */ + /** Resume watching for all non-0 callbacks. */ void rewatch(); - /** Resume watchingn for read only. */ + /** Resume watching for read only. */ void rewatchRead(); - /** Resume watchingn for write only. */ + /** Resume watching for write only. */ void rewatchWrite(); /** Stop watching temporarily. The DispatchHandle remains @@ -112,6 +115,11 @@ public: /** Stop watching permanently. Disassociates from the poller. */ void stopWatch(); + /** Interrupt watching this handle and make a serialised callback that respects the + * same exclusivity guarantees as the other callbacks + */ + void call(Callback iCb); + protected: /** Override to get extra processing done when the DispatchHandle is deleted. */ void doDelete(); @@ -139,6 +147,7 @@ public: void unwatchRead() { ref->unwatchRead(); } void unwatchWrite() { ref->unwatchWrite(); } void stopWatch() { ref->stopWatch(); } + void call(Callback iCb) { ref->call(iCb); } }; }} diff --git a/cpp/src/qpid/sys/Dispatcher.cpp b/cpp/src/qpid/sys/Dispatcher.cpp index 8d1d1b79f5..e8ae6bc2fe 100644 --- a/cpp/src/qpid/sys/Dispatcher.cpp +++ b/cpp/src/qpid/sys/Dispatcher.cpp @@ -34,26 +34,7 @@ Dispatcher::~Dispatcher() { } void Dispatcher::run() { - do { - Poller::Event event = poller->wait(); - - // If can read/write then dispatch appropriate callbacks - if (event.handle) { - event.process(); - } else { - // Handle shutdown - switch (event.type) { - case Poller::SHUTDOWN: - goto dispatcher_shutdown; - default: - // This should be impossible - assert(false); - } - } - } while (true); - -dispatcher_shutdown: - ; + poller->run(); } }} diff --git a/cpp/src/qpid/sys/Poller.h b/cpp/src/qpid/sys/Poller.h index 6b7f4d818e..8e9f67fefd 100644 --- a/cpp/src/qpid/sys/Poller.h +++ b/cpp/src/qpid/sys/Poller.h @@ -23,6 +23,7 @@ */ #include "Time.h" +#include "Runnable.h" #include <boost/shared_ptr.hpp> @@ -37,7 +38,7 @@ namespace sys { */ class PollerHandle; class PollerPrivate; -class Poller { +class Poller : public Runnable { PollerPrivate* const impl; public: @@ -57,7 +58,8 @@ public: READ_WRITABLE, DISCONNECTED, SHUTDOWN, - TIMEOUT + TIMEOUT, + INTERRUPTED }; struct Event { @@ -76,6 +78,20 @@ public: ~Poller(); /** Note: this function is async-signal safe */ void shutdown(); + + // Interrupt waiting for a specific poller handle + // returns true if we could interrupt the handle + // - in this case on return the handle is no longer being monitored, + // but we will receive an event from some invocation of poller::wait + // with the handle and the INTERRUPTED event type + // if it returns false then the handle is not being monitored by the poller + // - This can either be because it has just received an event which has been + // reported and has not been reenabled since. Or because it was removed + // from the monitoring set + bool interrupt(PollerHandle& handle); + + // Poller run loop + void run(); void addFd(PollerHandle& handle, Direction dir); void delFd(PollerHandle& handle); diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp index a1e624ea75..10705e12da 100644 --- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp +++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -28,9 +28,10 @@ #include <sys/epoll.h> #include <errno.h> +#include <signal.h> #include <assert.h> -#include <vector> +#include <queue> #include <exception> namespace qpid { @@ -58,14 +59,12 @@ class PollerHandlePrivate { int fd; ::__uint32_t events; - PollerHandle* pollerHandle; FDStat stat; Mutex lock; - PollerHandlePrivate(int f, PollerHandle* p) : + PollerHandlePrivate(int f) : fd(f), events(0), - pollerHandle(p), stat(ABSENT) { } @@ -112,7 +111,7 @@ class PollerHandlePrivate { }; PollerHandle::PollerHandle(const IOHandle& h) : - impl(new PollerHandlePrivate(toFd(h.impl), this)) + impl(new PollerHandlePrivate(toFd(h.impl))) {} PollerHandle::~PollerHandle() { @@ -161,9 +160,47 @@ class PollerPrivate { }; static ReadablePipe alwaysReadable; - + static int alwaysReadableFd; + + class InterruptHandle: public PollerHandle { + std::queue<PollerHandle*> handles; + + void processEvent(Poller::EventType) { + PollerHandle* handle = handles.front(); + handles.pop(); + assert(handle); + + // Synthesise event + Poller::Event event(handle, Poller::INTERRUPTED); + + // Process synthesised event + event.process(); + } + + public: + InterruptHandle() : + PollerHandle(DummyIOHandle) + {} + + void addHandle(PollerHandle& h) { + handles.push(&h); + } + + PollerHandle* getHandle() { + PollerHandle* handle = handles.front(); + handles.pop(); + return handle; + } + + bool queuedHandles() { + return handles.size() > 0; + } + }; + const int epollFd; bool isShutdown; + InterruptHandle interruptHandle; + ::sigset_t sigMask; static ::__uint32_t directionToEpollEvent(Poller::Direction dir) { switch (dir) { @@ -193,15 +230,41 @@ class PollerPrivate { epollFd(::epoll_create(DefaultFds)), isShutdown(false) { QPID_POSIX_CHECK(epollFd); + ::sigemptyset(&sigMask); + // Add always readable fd into our set (but not listening to it yet) + ::epoll_event epe; + epe.events = 0; + epe.data.u64 = 0; + QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_ADD, alwaysReadableFd, &epe)); } ~PollerPrivate() { // It's probably okay to ignore any errors here as there can't be data loss ::close(epollFd); } + + void interrupt(bool all=false) { + ::epoll_event epe; + if (all) { + // Not EPOLLONESHOT, so we eventually get all threads + epe.events = ::EPOLLIN; + epe.data.u64 = 0; // Keep valgrind happy + } else { + // Use EPOLLONESHOT so we only wake a single thread + epe.events = ::EPOLLIN | ::EPOLLONESHOT; + epe.data.u64 = 0; // Keep valgrind happy + epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle); + } + QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe)); + } + + void interruptAll() { + interrupt(true); + } }; PollerPrivate::ReadablePipe PollerPrivate::alwaysReadable; +int PollerPrivate::alwaysReadableFd = alwaysReadable.getFD(); void Poller::addFd(PollerHandle& handle, Direction dir) { PollerHandlePrivate& eh = *handle.impl; @@ -218,7 +281,7 @@ void Poller::addFd(PollerHandle& handle, Direction dir) { epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir); } epe.data.u64 = 0; // Keep valgrind happy - epe.data.ptr = &eh; + epe.data.ptr = &handle; QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, eh.fd, &epe)); @@ -249,7 +312,7 @@ void Poller::modFd(PollerHandle& handle, Direction dir) { ::epoll_event epe; epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT; epe.data.u64 = 0; // Keep valgrind happy - epe.data.ptr = &eh; + epe.data.ptr = &handle; QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); @@ -266,7 +329,7 @@ void Poller::rearmFd(PollerHandle& handle) { ::epoll_event epe; epe.events = eh.events; epe.data.u64 = 0; // Keep valgrind happy - epe.data.ptr = &eh; + epe.data.ptr = &handle; QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); @@ -281,28 +344,85 @@ void Poller::shutdown() { if (impl->isShutdown) return; - // Don't use any locking here - isshutdown will be visible to all + // Don't use any locking here - isShutdown will be visible to all // after the epoll_ctl() anyway (it's a memory barrier) impl->isShutdown = true; - // Add always readable fd to epoll (not EPOLLONESHOT) - int fd = impl->alwaysReadable.getFD(); - ::epoll_event epe; - epe.events = ::EPOLLIN; - epe.data.u64 = 0; // Keep valgrind happy - don't strictly need next line now - epe.data.ptr = 0; - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, fd, &epe)); + impl->interruptAll(); +} + +bool Poller::interrupt(PollerHandle& handle) { + { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + if (eh.isInactive()) { + return false; + } + ::epoll_event epe; + epe.events = 0; + epe.data.u64 = 0; // Keep valgrind happy + epe.data.ptr = &eh; + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); + eh.setInactive(); + } + + PollerPrivate::InterruptHandle& ih = impl->interruptHandle; + PollerHandlePrivate& eh = *static_cast<PollerHandle&>(ih).impl; + ScopedLock<Mutex> l(eh.lock); + ih.addHandle(handle); + + impl->interrupt(); + eh.setActive(); + return true; +} + +void Poller::run() { + // Make sure we can't be interrupted by signals at a bad time + ::sigset_t ss; + ::sigfillset(&ss); + ::pthread_sigmask(SIG_SETMASK, &ss, 0); + + do { + Event event = wait(); + + // If can read/write then dispatch appropriate callbacks + if (event.handle) { + event.process(); + } else { + // Handle shutdown + switch (event.type) { + case SHUTDOWN: + return; + default: + // This should be impossible + assert(false); + } + } + } while (true); } Poller::Event Poller::wait(Duration timeout) { epoll_event epe; int timeoutMs = (timeout == TIME_INFINITE) ? -1 : timeout / TIME_MSEC; + AbsTime targetTimeout = + (timeout == TIME_INFINITE) ? + FAR_FUTURE : + AbsTime(now(), timeout); - // Repeat until we weren't interupted + // Repeat until we weren't interrupted by signal do { PollerHandleDeletionManager.markAllUnusedInThisThread(); + // Need to run on kernels without epoll_pwait() + // - fortunately in this case we don't really need the atomicity of epoll_pwait() +#if 1 + sigset_t os; + pthread_sigmask(SIG_SETMASK, &impl->sigMask, &os); int rc = ::epoll_wait(impl->epollFd, &epe, 1, timeoutMs); - + pthread_sigmask(SIG_SETMASK, &os, 0); +#else + int rc = ::epoll_pwait(impl->epollFd, &epe, 1, timeoutMs, &impl->sigMask); +#endif + // Check for shutdown if (impl->isShutdown) { PollerHandleDeletionManager.markAllUnusedInThisThread(); return Event(0, SHUTDOWN); @@ -312,13 +432,27 @@ Poller::Event Poller::wait(Duration timeout) { QPID_POSIX_CHECK(rc); } else if (rc > 0) { assert(rc == 1); - PollerHandlePrivate& eh = *static_cast<PollerHandlePrivate*>(epe.data.ptr); + PollerHandle* handle = static_cast<PollerHandle*>(epe.data.ptr); + PollerHandlePrivate& eh = *handle->impl; ScopedLock<Mutex> l(eh.lock); - + // the handle could have gone inactive since we left the epoll_wait if (eh.isActive()) { - PollerHandle* handle = eh.pollerHandle; + + // Check if this is an interrupt + if (handle == &impl->interruptHandle) { + PollerHandle* wrappedHandle = impl->interruptHandle.getHandle(); + // If there is an interrupt queued behind this one we need to arm it + // We do it this way so that another thread can pick it up + if (impl->interruptHandle.queuedHandles()) { + impl->interrupt(); + eh.setActive(); + } else { + eh.setInactive(); + } + return Event(wrappedHandle, INTERRUPTED); + } // If the connection has been hungup we could still be readable // (just not writable), allow us to readable until we get here again @@ -349,10 +483,12 @@ Poller::Event Poller::wait(Duration timeout) { // The only things we can do here are return a timeout or wait more. // Obviously if we timed out we return timeout; if the wait was meant to // be indefinite then we should never return with a time out so we go again. - // If the wait wasn't indefinite, but we were interrupted then we have to return - // with a timeout as we don't know how long we've waited so far and so we can't - // continue the wait. - if (rc == 0 || timeoutMs != -1) { + // If the wait wasn't indefinite, we check whether we are after the target wait + // time or not + if (timeoutMs == -1) { + continue; + } + if (rc == 0 && now() > targetTimeout) { PollerHandleDeletionManager.markAllUnusedInThisThread(); return Event(0, TIMEOUT); } diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index 9a5798311b..b4fede06fd 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -266,6 +266,7 @@ public: virtual void queueWriteClose(); virtual bool writeQueueEmpty(); virtual void startReading(); + virtual void requestCallback(RequestCallback); virtual BufferBase* getQueuedBuffer(); private: @@ -275,6 +276,7 @@ private: void readable(DispatchHandle& handle); void writeable(DispatchHandle& handle); void disconnected(DispatchHandle& handle); + void requestedCall(RequestCallback); void close(DispatchHandle& handle); private: @@ -386,6 +388,18 @@ void AsynchIO::startReading() { DispatchHandle::rewatchRead(); } +void AsynchIO::requestCallback(RequestCallback callback) { + // TODO creating a function object every time isn't all that + // efficient - if this becomes heavily used do something better (what?) + assert(callback); + DispatchHandle::call(boost::bind(&AsynchIO::requestedCall, this, callback)); +} + +void AsynchIO::requestedCall(RequestCallback callback) { + assert(callback); + callback(*this); +} + /** Return a queued buffer if there are enough * to spare */ diff --git a/cpp/src/qpid/sys/posix/IOHandle.cpp b/cpp/src/qpid/sys/posix/IOHandle.cpp index 80b487eadc..075eb4c335 100644 --- a/cpp/src/qpid/sys/posix/IOHandle.cpp +++ b/cpp/src/qpid/sys/posix/IOHandle.cpp @@ -31,6 +31,8 @@ int toFd(const IOHandlePrivate* h) return h->fd; } +NullIOHandle DummyIOHandle; + IOHandle::IOHandle(IOHandlePrivate* h) : impl(h) {} diff --git a/cpp/src/qpid/sys/posix/PrivatePosix.h b/cpp/src/qpid/sys/posix/PrivatePosix.h index 33c0cd81bc..0fefa50ab6 100644 --- a/cpp/src/qpid/sys/posix/PrivatePosix.h +++ b/cpp/src/qpid/sys/posix/PrivatePosix.h @@ -23,6 +23,7 @@ */ #include "qpid/sys/Time.h" +#include "qpid/sys/IOHandle.h" struct timespec; struct timeval; @@ -47,6 +48,25 @@ public: int toFd(const IOHandlePrivate* h); +// Posix fd as an IOHandle +class PosixIOHandle : public IOHandle { +public: + PosixIOHandle(int fd) : + IOHandle(new IOHandlePrivate(fd)) + {} +}; + +// Dummy IOHandle for places it's required in the API +// but we promise not to actually try to do any operations on the IOHandle +class NullIOHandle : public IOHandle { +public: + NullIOHandle() : + IOHandle(new IOHandlePrivate) + {} +}; + +extern NullIOHandle DummyIOHandle; + }} #endif /*!_sys_posix_PrivatePosix_h*/ diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp index ca56efd8dd..356d5ba927 100644 --- a/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -284,6 +284,7 @@ public: virtual void queueWriteClose(); virtual bool writeQueueEmpty(); virtual void startReading(); + virtual void requestCallback(RequestCallback); /** * getQueuedBuffer returns a buffer from the buffer queue, if one is @@ -531,6 +532,11 @@ void AsynchIO::startReading() { return; } +// TODO: This needs to arrange for a callback that is serialised with +// the other IO callbacks for this AsynchIO +void AsynchIO::requestCallback(RequestCallback callback) { +} + /** * Return a queued buffer if there are enough to spare. */ |
