summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/epoll/EpollPoller.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/epoll/EpollPoller.cpp')
-rw-r--r--cpp/src/qpid/sys/epoll/EpollPoller.cpp188
1 files changed, 162 insertions, 26 deletions
diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
index a1e624ea75..10705e12da 100644
--- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp
+++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
@@ -28,9 +28,10 @@
#include <sys/epoll.h>
#include <errno.h>
+#include <signal.h>
#include <assert.h>
-#include <vector>
+#include <queue>
#include <exception>
namespace qpid {
@@ -58,14 +59,12 @@ class PollerHandlePrivate {
int fd;
::__uint32_t events;
- PollerHandle* pollerHandle;
FDStat stat;
Mutex lock;
- PollerHandlePrivate(int f, PollerHandle* p) :
+ PollerHandlePrivate(int f) :
fd(f),
events(0),
- pollerHandle(p),
stat(ABSENT) {
}
@@ -112,7 +111,7 @@ class PollerHandlePrivate {
};
PollerHandle::PollerHandle(const IOHandle& h) :
- impl(new PollerHandlePrivate(toFd(h.impl), this))
+ impl(new PollerHandlePrivate(toFd(h.impl)))
{}
PollerHandle::~PollerHandle() {
@@ -161,9 +160,47 @@ class PollerPrivate {
};
static ReadablePipe alwaysReadable;
-
+ static int alwaysReadableFd;
+
+ class InterruptHandle: public PollerHandle {
+ std::queue<PollerHandle*> handles;
+
+ void processEvent(Poller::EventType) {
+ PollerHandle* handle = handles.front();
+ handles.pop();
+ assert(handle);
+
+ // Synthesise event
+ Poller::Event event(handle, Poller::INTERRUPTED);
+
+ // Process synthesised event
+ event.process();
+ }
+
+ public:
+ InterruptHandle() :
+ PollerHandle(DummyIOHandle)
+ {}
+
+ void addHandle(PollerHandle& h) {
+ handles.push(&h);
+ }
+
+ PollerHandle* getHandle() {
+ PollerHandle* handle = handles.front();
+ handles.pop();
+ return handle;
+ }
+
+ bool queuedHandles() {
+ return handles.size() > 0;
+ }
+ };
+
const int epollFd;
bool isShutdown;
+ InterruptHandle interruptHandle;
+ ::sigset_t sigMask;
static ::__uint32_t directionToEpollEvent(Poller::Direction dir) {
switch (dir) {
@@ -193,15 +230,41 @@ class PollerPrivate {
epollFd(::epoll_create(DefaultFds)),
isShutdown(false) {
QPID_POSIX_CHECK(epollFd);
+ ::sigemptyset(&sigMask);
+ // Add always readable fd into our set (but not listening to it yet)
+ ::epoll_event epe;
+ epe.events = 0;
+ epe.data.u64 = 0;
+ QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_ADD, alwaysReadableFd, &epe));
}
~PollerPrivate() {
// It's probably okay to ignore any errors here as there can't be data loss
::close(epollFd);
}
+
+ void interrupt(bool all=false) {
+ ::epoll_event epe;
+ if (all) {
+ // Not EPOLLONESHOT, so we eventually get all threads
+ epe.events = ::EPOLLIN;
+ epe.data.u64 = 0; // Keep valgrind happy
+ } else {
+ // Use EPOLLONESHOT so we only wake a single thread
+ epe.events = ::EPOLLIN | ::EPOLLONESHOT;
+ epe.data.u64 = 0; // Keep valgrind happy
+ epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle);
+ }
+ QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe));
+ }
+
+ void interruptAll() {
+ interrupt(true);
+ }
};
PollerPrivate::ReadablePipe PollerPrivate::alwaysReadable;
+int PollerPrivate::alwaysReadableFd = alwaysReadable.getFD();
void Poller::addFd(PollerHandle& handle, Direction dir) {
PollerHandlePrivate& eh = *handle.impl;
@@ -218,7 +281,7 @@ void Poller::addFd(PollerHandle& handle, Direction dir) {
epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir);
}
epe.data.u64 = 0; // Keep valgrind happy
- epe.data.ptr = &eh;
+ epe.data.ptr = &handle;
QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, eh.fd, &epe));
@@ -249,7 +312,7 @@ void Poller::modFd(PollerHandle& handle, Direction dir) {
::epoll_event epe;
epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
epe.data.u64 = 0; // Keep valgrind happy
- epe.data.ptr = &eh;
+ epe.data.ptr = &handle;
QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
@@ -266,7 +329,7 @@ void Poller::rearmFd(PollerHandle& handle) {
::epoll_event epe;
epe.events = eh.events;
epe.data.u64 = 0; // Keep valgrind happy
- epe.data.ptr = &eh;
+ epe.data.ptr = &handle;
QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
@@ -281,28 +344,85 @@ void Poller::shutdown() {
if (impl->isShutdown)
return;
- // Don't use any locking here - isshutdown will be visible to all
+ // Don't use any locking here - isShutdown will be visible to all
// after the epoll_ctl() anyway (it's a memory barrier)
impl->isShutdown = true;
- // Add always readable fd to epoll (not EPOLLONESHOT)
- int fd = impl->alwaysReadable.getFD();
- ::epoll_event epe;
- epe.events = ::EPOLLIN;
- epe.data.u64 = 0; // Keep valgrind happy - don't strictly need next line now
- epe.data.ptr = 0;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, fd, &epe));
+ impl->interruptAll();
+}
+
+bool Poller::interrupt(PollerHandle& handle) {
+ {
+ PollerHandlePrivate& eh = *handle.impl;
+ ScopedLock<Mutex> l(eh.lock);
+ if (eh.isInactive()) {
+ return false;
+ }
+ ::epoll_event epe;
+ epe.events = 0;
+ epe.data.u64 = 0; // Keep valgrind happy
+ epe.data.ptr = &eh;
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+ eh.setInactive();
+ }
+
+ PollerPrivate::InterruptHandle& ih = impl->interruptHandle;
+ PollerHandlePrivate& eh = *static_cast<PollerHandle&>(ih).impl;
+ ScopedLock<Mutex> l(eh.lock);
+ ih.addHandle(handle);
+
+ impl->interrupt();
+ eh.setActive();
+ return true;
+}
+
+void Poller::run() {
+ // Make sure we can't be interrupted by signals at a bad time
+ ::sigset_t ss;
+ ::sigfillset(&ss);
+ ::pthread_sigmask(SIG_SETMASK, &ss, 0);
+
+ do {
+ Event event = wait();
+
+ // If can read/write then dispatch appropriate callbacks
+ if (event.handle) {
+ event.process();
+ } else {
+ // Handle shutdown
+ switch (event.type) {
+ case SHUTDOWN:
+ return;
+ default:
+ // This should be impossible
+ assert(false);
+ }
+ }
+ } while (true);
}
Poller::Event Poller::wait(Duration timeout) {
epoll_event epe;
int timeoutMs = (timeout == TIME_INFINITE) ? -1 : timeout / TIME_MSEC;
+ AbsTime targetTimeout =
+ (timeout == TIME_INFINITE) ?
+ FAR_FUTURE :
+ AbsTime(now(), timeout);
- // Repeat until we weren't interupted
+ // Repeat until we weren't interrupted by signal
do {
PollerHandleDeletionManager.markAllUnusedInThisThread();
+ // Need to run on kernels without epoll_pwait()
+ // - fortunately in this case we don't really need the atomicity of epoll_pwait()
+#if 1
+ sigset_t os;
+ pthread_sigmask(SIG_SETMASK, &impl->sigMask, &os);
int rc = ::epoll_wait(impl->epollFd, &epe, 1, timeoutMs);
-
+ pthread_sigmask(SIG_SETMASK, &os, 0);
+#else
+ int rc = ::epoll_pwait(impl->epollFd, &epe, 1, timeoutMs, &impl->sigMask);
+#endif
+ // Check for shutdown
if (impl->isShutdown) {
PollerHandleDeletionManager.markAllUnusedInThisThread();
return Event(0, SHUTDOWN);
@@ -312,13 +432,27 @@ Poller::Event Poller::wait(Duration timeout) {
QPID_POSIX_CHECK(rc);
} else if (rc > 0) {
assert(rc == 1);
- PollerHandlePrivate& eh = *static_cast<PollerHandlePrivate*>(epe.data.ptr);
+ PollerHandle* handle = static_cast<PollerHandle*>(epe.data.ptr);
+ PollerHandlePrivate& eh = *handle->impl;
ScopedLock<Mutex> l(eh.lock);
-
+
// the handle could have gone inactive since we left the epoll_wait
if (eh.isActive()) {
- PollerHandle* handle = eh.pollerHandle;
+
+ // Check if this is an interrupt
+ if (handle == &impl->interruptHandle) {
+ PollerHandle* wrappedHandle = impl->interruptHandle.getHandle();
+ // If there is an interrupt queued behind this one we need to arm it
+ // We do it this way so that another thread can pick it up
+ if (impl->interruptHandle.queuedHandles()) {
+ impl->interrupt();
+ eh.setActive();
+ } else {
+ eh.setInactive();
+ }
+ return Event(wrappedHandle, INTERRUPTED);
+ }
// If the connection has been hungup we could still be readable
// (just not writable), allow us to readable until we get here again
@@ -349,10 +483,12 @@ Poller::Event Poller::wait(Duration timeout) {
// The only things we can do here are return a timeout or wait more.
// Obviously if we timed out we return timeout; if the wait was meant to
// be indefinite then we should never return with a time out so we go again.
- // If the wait wasn't indefinite, but we were interrupted then we have to return
- // with a timeout as we don't know how long we've waited so far and so we can't
- // continue the wait.
- if (rc == 0 || timeoutMs != -1) {
+ // If the wait wasn't indefinite, we check whether we are after the target wait
+ // time or not
+ if (timeoutMs == -1) {
+ continue;
+ }
+ if (rc == 0 && now() > targetTimeout) {
PollerHandleDeletionManager.markAllUnusedInThisThread();
return Event(0, TIMEOUT);
}