summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/epoll/EpollPoller.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/epoll/EpollPoller.cpp')
-rw-r--r--cpp/src/qpid/sys/epoll/EpollPoller.cpp148
1 files changed, 74 insertions, 74 deletions
diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
index d0623b86f4..42a2b2bbab 100644
--- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp
+++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
@@ -138,7 +138,7 @@ PollerHandle::~PollerHandle() {
{
ScopedLock<Mutex> l(impl->lock);
if (impl->isDeleted()) {
- return;
+ return;
}
impl->pollerHandle = 0;
if (impl->isInterrupted()) {
@@ -187,40 +187,40 @@ class PollerPrivate {
static int alwaysReadableFd;
class InterruptHandle: public PollerHandle {
- std::queue<PollerHandle*> handles;
-
- void processEvent(Poller::EventType) {
- PollerHandle* handle = handles.front();
- handles.pop();
- assert(handle);
-
- // Synthesise event
- Poller::Event event(handle, Poller::INTERRUPTED);
-
- // Process synthesised event
- event.process();
- }
+ std::queue<PollerHandle*> handles;
+
+ void processEvent(Poller::EventType) {
+ PollerHandle* handle = handles.front();
+ handles.pop();
+ assert(handle);
+
+ // Synthesise event
+ Poller::Event event(handle, Poller::INTERRUPTED);
+
+ // Process synthesised event
+ event.process();
+ }
public:
- InterruptHandle() :
- PollerHandle(DummyIOHandle)
- {}
-
- void addHandle(PollerHandle& h) {
- handles.push(&h);
- }
-
- PollerHandle* getHandle() {
- PollerHandle* handle = handles.front();
- handles.pop();
- return handle;
- }
-
- bool queuedHandles() {
- return handles.size() > 0;
- }
+ InterruptHandle() :
+ PollerHandle(DummyIOHandle)
+ {}
+
+ void addHandle(PollerHandle& h) {
+ handles.push(&h);
+ }
+
+ PollerHandle* getHandle() {
+ PollerHandle* handle = handles.front();
+ handles.pop();
+ return handle;
+ }
+
+ bool queuedHandles() {
+ return handles.size() > 0;
+ }
};
-
+
const int epollFd;
bool isShutdown;
InterruptHandle interruptHandle;
@@ -259,13 +259,13 @@ class PollerPrivate {
::epoll_event epe;
epe.events = 0;
epe.data.u64 = 0;
- QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_ADD, alwaysReadableFd, &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_ADD, alwaysReadableFd, &epe));
}
~PollerPrivate() {
// It's probably okay to ignore any errors here as there can't be data loss
::close(epollFd);
-
+
// Need to put the interruptHandle in idle state to delete it
static_cast<PollerHandle&>(interruptHandle).impl->setIdle();
}
@@ -273,14 +273,14 @@ class PollerPrivate {
void resetMode(PollerHandlePrivate& handle);
void interrupt() {
- ::epoll_event epe;
- // Use EPOLLONESHOT so we only wake a single thread
- epe.events = ::EPOLLIN | ::EPOLLONESHOT;
- epe.data.u64 = 0; // Keep valgrind happy
- epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle);
- QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe));
- }
-
+ ::epoll_event epe;
+ // Use EPOLLONESHOT so we only wake a single thread
+ epe.events = ::EPOLLIN | ::EPOLLONESHOT;
+ epe.data.u64 = 0; // Keep valgrind happy
+ epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle);
+ QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe));
+ }
+
void interruptAll() {
::epoll_event epe;
// Not EPOLLONESHOT, so we eventually get all threads
@@ -317,7 +317,7 @@ void Poller::unregisterHandle(PollerHandle& handle) {
// Ignore EBADF since deleting a nonexistent fd has the overall required result!
// And allows the case where a sloppy program closes the fd and then does the delFd()
if (rc == -1 && errno != EBADF) {
- QPID_POSIX_CHECK(rc);
+ QPID_POSIX_CHECK(rc);
}
eh.setIdle();
@@ -332,10 +332,10 @@ void PollerPrivate::resetMode(PollerHandlePrivate& eh) {
if (eh.isIdle() || eh.isDeleted()) {
return;
}
-
+
if (eh.events==0) {
eh.setActive();
- return;
+ return;
}
if (!eh.isInterrupted()) {
@@ -343,11 +343,11 @@ void PollerPrivate::resetMode(PollerHandlePrivate& eh) {
epe.events = eh.events | ::EPOLLONESHOT;
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
-
+
QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
-
+
eh.setActive();
- return;
+ return;
}
ph = eh.pollerHandle;
}
@@ -366,7 +366,7 @@ void Poller::monitorHandle(PollerHandle& handle, Direction dir) {
::__uint32_t oldEvents = eh.events;
eh.events |= PollerPrivate::directionToEpollEvent(dir);
-
+
// If no change nothing more to do - avoid unnecessary system call
if (oldEvents==eh.events) {
return;
@@ -376,7 +376,7 @@ void Poller::monitorHandle(PollerHandle& handle, Direction dir) {
if (!eh.isActive()) {
return;
}
-
+
::epoll_event epe;
epe.events = eh.events | ::EPOLLONESHOT;
epe.data.u64 = 0; // Keep valgrind happy
@@ -397,12 +397,12 @@ void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {
if (oldEvents==eh.events) {
return;
}
-
+
// If we're not actually listening wait till we are to perform change
if (!eh.isActive()) {
return;
}
-
+
::epoll_event epe;
epe.events = eh.events | ::EPOLLONESHOT;
epe.data.u64 = 0; // Keep valgrind happy
@@ -427,50 +427,50 @@ void Poller::shutdown() {
}
bool Poller::interrupt(PollerHandle& handle) {
- {
- PollerHandlePrivate& eh = *handle.impl;
- ScopedLock<Mutex> l(eh.lock);
- if (eh.isIdle() || eh.isDeleted()) {
- return false;
- }
-
+ {
+ PollerHandlePrivate& eh = *handle.impl;
+ ScopedLock<Mutex> l(eh.lock);
+ if (eh.isIdle() || eh.isDeleted()) {
+ return false;
+ }
+
if (eh.isInterrupted()) {
return true;
}
-
+
// Stop monitoring handle for read or write
- ::epoll_event epe;
- epe.events = 0;
- epe.data.u64 = 0; // Keep valgrind happy
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+ ::epoll_event epe;
+ epe.events = 0;
+ epe.data.u64 = 0; // Keep valgrind happy
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
if (eh.isInactive()) {
eh.setInterrupted();
return true;
}
eh.setInterrupted();
- }
+ }
- PollerPrivate::InterruptHandle& ih = impl->interruptHandle;
+ PollerPrivate::InterruptHandle& ih = impl->interruptHandle;
PollerHandlePrivate& eh = *static_cast<PollerHandle&>(ih).impl;
ScopedLock<Mutex> l(eh.lock);
- ih.addHandle(handle);
-
- impl->interrupt();
+ ih.addHandle(handle);
+
+ impl->interrupt();
eh.setActive();
return true;
}
void Poller::run() {
- // Make sure we can't be interrupted by signals at a bad time
- ::sigset_t ss;
- ::sigfillset(&ss);
+ // Make sure we can't be interrupted by signals at a bad time
+ ::sigset_t ss;
+ ::sigfillset(&ss);
::pthread_sigmask(SIG_SETMASK, &ss, 0);
do {
Event event = wait();
- // If can read/write then dispatch appropriate callbacks
+ // If can read/write then dispatch appropriate callbacks
if (event.handle) {
event.process();
} else {
@@ -564,7 +564,7 @@ Poller::Event Poller::wait(Duration timeout) {
PollerHandlePrivate& eh = *static_cast<PollerHandlePrivate*>(dataPtr);
ScopedLock<Mutex> l(eh.lock);
-
+
// the handle could have gone inactive since we left the epoll_wait
if (eh.isActive()) {
PollerHandle* handle = eh.pollerHandle;