diff options
Diffstat (limited to 'cpp/src/qpid/sys/epoll/EpollPoller.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/epoll/EpollPoller.cpp | 188 |
1 files changed, 162 insertions, 26 deletions
diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp index a1e624ea75..10705e12da 100644 --- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp +++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -28,9 +28,10 @@ #include <sys/epoll.h> #include <errno.h> +#include <signal.h> #include <assert.h> -#include <vector> +#include <queue> #include <exception> namespace qpid { @@ -58,14 +59,12 @@ class PollerHandlePrivate { int fd; ::__uint32_t events; - PollerHandle* pollerHandle; FDStat stat; Mutex lock; - PollerHandlePrivate(int f, PollerHandle* p) : + PollerHandlePrivate(int f) : fd(f), events(0), - pollerHandle(p), stat(ABSENT) { } @@ -112,7 +111,7 @@ class PollerHandlePrivate { }; PollerHandle::PollerHandle(const IOHandle& h) : - impl(new PollerHandlePrivate(toFd(h.impl), this)) + impl(new PollerHandlePrivate(toFd(h.impl))) {} PollerHandle::~PollerHandle() { @@ -161,9 +160,47 @@ class PollerPrivate { }; static ReadablePipe alwaysReadable; - + static int alwaysReadableFd; + + class InterruptHandle: public PollerHandle { + std::queue<PollerHandle*> handles; + + void processEvent(Poller::EventType) { + PollerHandle* handle = handles.front(); + handles.pop(); + assert(handle); + + // Synthesise event + Poller::Event event(handle, Poller::INTERRUPTED); + + // Process synthesised event + event.process(); + } + + public: + InterruptHandle() : + PollerHandle(DummyIOHandle) + {} + + void addHandle(PollerHandle& h) { + handles.push(&h); + } + + PollerHandle* getHandle() { + PollerHandle* handle = handles.front(); + handles.pop(); + return handle; + } + + bool queuedHandles() { + return handles.size() > 0; + } + }; + const int epollFd; bool isShutdown; + InterruptHandle interruptHandle; + ::sigset_t sigMask; static ::__uint32_t directionToEpollEvent(Poller::Direction dir) { switch (dir) { @@ -193,15 +230,41 @@ class PollerPrivate { epollFd(::epoll_create(DefaultFds)), isShutdown(false) { QPID_POSIX_CHECK(epollFd); + ::sigemptyset(&sigMask); + // Add always readable fd into our set (but not listening to it yet) + ::epoll_event epe; + epe.events = 0; + epe.data.u64 = 0; + QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_ADD, alwaysReadableFd, &epe)); } ~PollerPrivate() { // It's probably okay to ignore any errors here as there can't be data loss ::close(epollFd); } + + void interrupt(bool all=false) { + ::epoll_event epe; + if (all) { + // Not EPOLLONESHOT, so we eventually get all threads + epe.events = ::EPOLLIN; + epe.data.u64 = 0; // Keep valgrind happy + } else { + // Use EPOLLONESHOT so we only wake a single thread + epe.events = ::EPOLLIN | ::EPOLLONESHOT; + epe.data.u64 = 0; // Keep valgrind happy + epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle); + } + QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe)); + } + + void interruptAll() { + interrupt(true); + } }; PollerPrivate::ReadablePipe PollerPrivate::alwaysReadable; +int PollerPrivate::alwaysReadableFd = alwaysReadable.getFD(); void Poller::addFd(PollerHandle& handle, Direction dir) { PollerHandlePrivate& eh = *handle.impl; @@ -218,7 +281,7 @@ void Poller::addFd(PollerHandle& handle, Direction dir) { epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir); } epe.data.u64 = 0; // Keep valgrind happy - epe.data.ptr = &eh; + epe.data.ptr = &handle; QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, eh.fd, &epe)); @@ -249,7 +312,7 @@ void Poller::modFd(PollerHandle& handle, Direction dir) { ::epoll_event epe; epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT; epe.data.u64 = 0; // Keep valgrind happy - epe.data.ptr = &eh; + epe.data.ptr = &handle; QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); @@ -266,7 +329,7 @@ void Poller::rearmFd(PollerHandle& handle) { ::epoll_event epe; epe.events = eh.events; epe.data.u64 = 0; // Keep valgrind happy - epe.data.ptr = &eh; + epe.data.ptr = &handle; QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); @@ -281,28 +344,85 @@ void Poller::shutdown() { if (impl->isShutdown) return; - // Don't use any locking here - isshutdown will be visible to all + // Don't use any locking here - isShutdown will be visible to all // after the epoll_ctl() anyway (it's a memory barrier) impl->isShutdown = true; - // Add always readable fd to epoll (not EPOLLONESHOT) - int fd = impl->alwaysReadable.getFD(); - ::epoll_event epe; - epe.events = ::EPOLLIN; - epe.data.u64 = 0; // Keep valgrind happy - don't strictly need next line now - epe.data.ptr = 0; - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, fd, &epe)); + impl->interruptAll(); +} + +bool Poller::interrupt(PollerHandle& handle) { + { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + if (eh.isInactive()) { + return false; + } + ::epoll_event epe; + epe.events = 0; + epe.data.u64 = 0; // Keep valgrind happy + epe.data.ptr = &eh; + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); + eh.setInactive(); + } + + PollerPrivate::InterruptHandle& ih = impl->interruptHandle; + PollerHandlePrivate& eh = *static_cast<PollerHandle&>(ih).impl; + ScopedLock<Mutex> l(eh.lock); + ih.addHandle(handle); + + impl->interrupt(); + eh.setActive(); + return true; +} + +void Poller::run() { + // Make sure we can't be interrupted by signals at a bad time + ::sigset_t ss; + ::sigfillset(&ss); + ::pthread_sigmask(SIG_SETMASK, &ss, 0); + + do { + Event event = wait(); + + // If can read/write then dispatch appropriate callbacks + if (event.handle) { + event.process(); + } else { + // Handle shutdown + switch (event.type) { + case SHUTDOWN: + return; + default: + // This should be impossible + assert(false); + } + } + } while (true); } Poller::Event Poller::wait(Duration timeout) { epoll_event epe; int timeoutMs = (timeout == TIME_INFINITE) ? -1 : timeout / TIME_MSEC; + AbsTime targetTimeout = + (timeout == TIME_INFINITE) ? + FAR_FUTURE : + AbsTime(now(), timeout); - // Repeat until we weren't interupted + // Repeat until we weren't interrupted by signal do { PollerHandleDeletionManager.markAllUnusedInThisThread(); + // Need to run on kernels without epoll_pwait() + // - fortunately in this case we don't really need the atomicity of epoll_pwait() +#if 1 + sigset_t os; + pthread_sigmask(SIG_SETMASK, &impl->sigMask, &os); int rc = ::epoll_wait(impl->epollFd, &epe, 1, timeoutMs); - + pthread_sigmask(SIG_SETMASK, &os, 0); +#else + int rc = ::epoll_pwait(impl->epollFd, &epe, 1, timeoutMs, &impl->sigMask); +#endif + // Check for shutdown if (impl->isShutdown) { PollerHandleDeletionManager.markAllUnusedInThisThread(); return Event(0, SHUTDOWN); @@ -312,13 +432,27 @@ Poller::Event Poller::wait(Duration timeout) { QPID_POSIX_CHECK(rc); } else if (rc > 0) { assert(rc == 1); - PollerHandlePrivate& eh = *static_cast<PollerHandlePrivate*>(epe.data.ptr); + PollerHandle* handle = static_cast<PollerHandle*>(epe.data.ptr); + PollerHandlePrivate& eh = *handle->impl; ScopedLock<Mutex> l(eh.lock); - + // the handle could have gone inactive since we left the epoll_wait if (eh.isActive()) { - PollerHandle* handle = eh.pollerHandle; + + // Check if this is an interrupt + if (handle == &impl->interruptHandle) { + PollerHandle* wrappedHandle = impl->interruptHandle.getHandle(); + // If there is an interrupt queued behind this one we need to arm it + // We do it this way so that another thread can pick it up + if (impl->interruptHandle.queuedHandles()) { + impl->interrupt(); + eh.setActive(); + } else { + eh.setInactive(); + } + return Event(wrappedHandle, INTERRUPTED); + } // If the connection has been hungup we could still be readable // (just not writable), allow us to readable until we get here again @@ -349,10 +483,12 @@ Poller::Event Poller::wait(Duration timeout) { // The only things we can do here are return a timeout or wait more. // Obviously if we timed out we return timeout; if the wait was meant to // be indefinite then we should never return with a time out so we go again. - // If the wait wasn't indefinite, but we were interrupted then we have to return - // with a timeout as we don't know how long we've waited so far and so we can't - // continue the wait. - if (rc == 0 || timeoutMs != -1) { + // If the wait wasn't indefinite, we check whether we are after the target wait + // time or not + if (timeoutMs == -1) { + continue; + } + if (rc == 0 && now() > targetTimeout) { PollerHandleDeletionManager.markAllUnusedInThisThread(); return Event(0, TIMEOUT); } |
