diff options
Diffstat (limited to 'cpp/src/qpid/sys/epoll/EpollPoller.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/epoll/EpollPoller.cpp | 148 |
1 files changed, 74 insertions, 74 deletions
diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp index d0623b86f4..42a2b2bbab 100644 --- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp +++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -138,7 +138,7 @@ PollerHandle::~PollerHandle() { { ScopedLock<Mutex> l(impl->lock); if (impl->isDeleted()) { - return; + return; } impl->pollerHandle = 0; if (impl->isInterrupted()) { @@ -187,40 +187,40 @@ class PollerPrivate { static int alwaysReadableFd; class InterruptHandle: public PollerHandle { - std::queue<PollerHandle*> handles; - - void processEvent(Poller::EventType) { - PollerHandle* handle = handles.front(); - handles.pop(); - assert(handle); - - // Synthesise event - Poller::Event event(handle, Poller::INTERRUPTED); - - // Process synthesised event - event.process(); - } + std::queue<PollerHandle*> handles; + + void processEvent(Poller::EventType) { + PollerHandle* handle = handles.front(); + handles.pop(); + assert(handle); + + // Synthesise event + Poller::Event event(handle, Poller::INTERRUPTED); + + // Process synthesised event + event.process(); + } public: - InterruptHandle() : - PollerHandle(DummyIOHandle) - {} - - void addHandle(PollerHandle& h) { - handles.push(&h); - } - - PollerHandle* getHandle() { - PollerHandle* handle = handles.front(); - handles.pop(); - return handle; - } - - bool queuedHandles() { - return handles.size() > 0; - } + InterruptHandle() : + PollerHandle(DummyIOHandle) + {} + + void addHandle(PollerHandle& h) { + handles.push(&h); + } + + PollerHandle* getHandle() { + PollerHandle* handle = handles.front(); + handles.pop(); + return handle; + } + + bool queuedHandles() { + return handles.size() > 0; + } }; - + const int epollFd; bool isShutdown; InterruptHandle interruptHandle; @@ -259,13 +259,13 @@ class PollerPrivate { ::epoll_event epe; epe.events = 0; epe.data.u64 = 0; - QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_ADD, alwaysReadableFd, &epe)); + QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_ADD, alwaysReadableFd, &epe)); } ~PollerPrivate() { // It's probably okay to ignore any errors here as there can't be data loss ::close(epollFd); - + // Need to put the interruptHandle in idle state to delete it static_cast<PollerHandle&>(interruptHandle).impl->setIdle(); } @@ -273,14 +273,14 @@ class PollerPrivate { void resetMode(PollerHandlePrivate& handle); void interrupt() { - ::epoll_event epe; - // Use EPOLLONESHOT so we only wake a single thread - epe.events = ::EPOLLIN | ::EPOLLONESHOT; - epe.data.u64 = 0; // Keep valgrind happy - epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle); - QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe)); - } - + ::epoll_event epe; + // Use EPOLLONESHOT so we only wake a single thread + epe.events = ::EPOLLIN | ::EPOLLONESHOT; + epe.data.u64 = 0; // Keep valgrind happy + epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle); + QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe)); + } + void interruptAll() { ::epoll_event epe; // Not EPOLLONESHOT, so we eventually get all threads @@ -317,7 +317,7 @@ void Poller::unregisterHandle(PollerHandle& handle) { // Ignore EBADF since deleting a nonexistent fd has the overall required result! // And allows the case where a sloppy program closes the fd and then does the delFd() if (rc == -1 && errno != EBADF) { - QPID_POSIX_CHECK(rc); + QPID_POSIX_CHECK(rc); } eh.setIdle(); @@ -332,10 +332,10 @@ void PollerPrivate::resetMode(PollerHandlePrivate& eh) { if (eh.isIdle() || eh.isDeleted()) { return; } - + if (eh.events==0) { eh.setActive(); - return; + return; } if (!eh.isInterrupted()) { @@ -343,11 +343,11 @@ void PollerPrivate::resetMode(PollerHandlePrivate& eh) { epe.events = eh.events | ::EPOLLONESHOT; epe.data.u64 = 0; // Keep valgrind happy epe.data.ptr = &eh; - + QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); - + eh.setActive(); - return; + return; } ph = eh.pollerHandle; } @@ -366,7 +366,7 @@ void Poller::monitorHandle(PollerHandle& handle, Direction dir) { ::__uint32_t oldEvents = eh.events; eh.events |= PollerPrivate::directionToEpollEvent(dir); - + // If no change nothing more to do - avoid unnecessary system call if (oldEvents==eh.events) { return; @@ -376,7 +376,7 @@ void Poller::monitorHandle(PollerHandle& handle, Direction dir) { if (!eh.isActive()) { return; } - + ::epoll_event epe; epe.events = eh.events | ::EPOLLONESHOT; epe.data.u64 = 0; // Keep valgrind happy @@ -397,12 +397,12 @@ void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) { if (oldEvents==eh.events) { return; } - + // If we're not actually listening wait till we are to perform change if (!eh.isActive()) { return; } - + ::epoll_event epe; epe.events = eh.events | ::EPOLLONESHOT; epe.data.u64 = 0; // Keep valgrind happy @@ -427,50 +427,50 @@ void Poller::shutdown() { } bool Poller::interrupt(PollerHandle& handle) { - { - PollerHandlePrivate& eh = *handle.impl; - ScopedLock<Mutex> l(eh.lock); - if (eh.isIdle() || eh.isDeleted()) { - return false; - } - + { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + if (eh.isIdle() || eh.isDeleted()) { + return false; + } + if (eh.isInterrupted()) { return true; } - + // Stop monitoring handle for read or write - ::epoll_event epe; - epe.events = 0; - epe.data.u64 = 0; // Keep valgrind happy - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); + ::epoll_event epe; + epe.events = 0; + epe.data.u64 = 0; // Keep valgrind happy + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); if (eh.isInactive()) { eh.setInterrupted(); return true; } eh.setInterrupted(); - } + } - PollerPrivate::InterruptHandle& ih = impl->interruptHandle; + PollerPrivate::InterruptHandle& ih = impl->interruptHandle; PollerHandlePrivate& eh = *static_cast<PollerHandle&>(ih).impl; ScopedLock<Mutex> l(eh.lock); - ih.addHandle(handle); - - impl->interrupt(); + ih.addHandle(handle); + + impl->interrupt(); eh.setActive(); return true; } void Poller::run() { - // Make sure we can't be interrupted by signals at a bad time - ::sigset_t ss; - ::sigfillset(&ss); + // Make sure we can't be interrupted by signals at a bad time + ::sigset_t ss; + ::sigfillset(&ss); ::pthread_sigmask(SIG_SETMASK, &ss, 0); do { Event event = wait(); - // If can read/write then dispatch appropriate callbacks + // If can read/write then dispatch appropriate callbacks if (event.handle) { event.process(); } else { @@ -564,7 +564,7 @@ Poller::Event Poller::wait(Duration timeout) { PollerHandlePrivate& eh = *static_cast<PollerHandlePrivate*>(dataPtr); ScopedLock<Mutex> l(eh.lock); - + // the handle could have gone inactive since we left the epoll_wait if (eh.isActive()) { PollerHandle* handle = eh.pollerHandle; |
