summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/epoll/EpollPoller.cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2007-07-12 01:48:13 +0000
committerAndrew Stitcher <astitcher@apache.org>2007-07-12 01:48:13 +0000
commitef1d54d57f354765e6ea1ae73807a4a633c73998 (patch)
tree8ce4fada81e4a844b8bc7b523d657dd84e79857c /cpp/src/qpid/sys/epoll/EpollPoller.cpp
parentfa267e310c233cfc9fc3b67b3c080adb4911f69f (diff)
downloadqpid-python-ef1d54d57f354765e6ea1ae73807a4a633c73998.tar.gz
* Add libuuid to libcommon link (for when apr goes away)
* Latest version of AsynchIO code git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@555455 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/epoll/EpollPoller.cpp')
-rw-r--r--cpp/src/qpid/sys/epoll/EpollPoller.cpp92
1 files changed, 72 insertions, 20 deletions
diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
index 65b2255023..8c3bdbc7d5 100644
--- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp
+++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
@@ -40,7 +40,9 @@ class PollerHandlePrivate {
enum FDStat {
ABSENT,
MONITORED,
- INACTIVE
+ INACTIVE,
+ HUNGUP,
+ MONITORED_HUNGUP
};
::__uint32_t events;
@@ -51,6 +53,39 @@ class PollerHandlePrivate {
events(0),
stat(ABSENT) {
}
+
+ bool isActive() const {
+ return stat == MONITORED || stat == MONITORED_HUNGUP;
+ }
+
+ void setActive() {
+ stat = (stat == HUNGUP) ? MONITORED_HUNGUP : MONITORED;
+ }
+
+ bool isInactive() const {
+ return stat == INACTIVE || stat == HUNGUP;
+ }
+
+ void setInactive() {
+ stat = INACTIVE;
+ }
+
+ bool isIdle() const {
+ return stat == ABSENT;
+ }
+
+ void setIdle() {
+ stat = ABSENT;
+ }
+
+ bool isHungup() const {
+ return stat == MONITORED_HUNGUP || stat == HUNGUP;
+ }
+
+ void setHungup() {
+ assert(stat == MONITORED);
+ stat = HUNGUP;
+ }
};
PollerHandle::PollerHandle(int fd0) :
@@ -108,13 +143,16 @@ class PollerPrivate {
}
}
- static Poller::Direction epollToDirection(::__uint32_t events) {
+ static Poller::EventType epollToDirection(::__uint32_t events) {
+ // POLLOUT & POLLHUP are mutually exclusive really, but at least socketpairs
+ // can give you both!
+ events = (events & ::EPOLLHUP) ? events & ~::EPOLLOUT : events;
::__uint32_t e = events & (::EPOLLIN | ::EPOLLOUT);
switch (e) {
- case ::EPOLLIN: return Poller::IN;
- case ::EPOLLOUT: return Poller::OUT;
- case ::EPOLLIN | ::EPOLLOUT: return Poller::INOUT;
- default: return Poller::NONE;
+ case ::EPOLLIN: return Poller::READABLE;
+ case ::EPOLLOUT: return Poller::WRITABLE;
+ case ::EPOLLIN | ::EPOLLOUT: return Poller::READ_WRITABLE;
+ default: return (events & ::EPOLLHUP) ? Poller::DISCONNECTED : Poller::INVALID;
}
}
@@ -138,11 +176,11 @@ void Poller::addFd(PollerHandle& handle, Direction dir) {
::epoll_event epe;
int op;
- if (eh.stat == PollerHandlePrivate::ABSENT) {
+ if (eh.isIdle()) {
op = EPOLL_CTL_ADD;
epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
} else {
- assert(eh.stat == PollerHandlePrivate::MONITORED);
+ assert(eh.isActive());
op = EPOLL_CTL_MOD;
epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir);
}
@@ -152,22 +190,27 @@ void Poller::addFd(PollerHandle& handle, Direction dir) {
// Record monitoring state of this fd
eh.events = epe.events;
- eh.stat = PollerHandlePrivate::MONITORED;
+ eh.setActive();
}
void Poller::delFd(PollerHandle& handle) {
PollerHandlePrivate& eh = *handle.impl;
ScopedLock<Mutex> l(eh.lock);
- assert(eh.stat != PollerHandlePrivate::ABSENT);
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, handle.getFD(), 0));
- eh.stat = PollerHandlePrivate::ABSENT;
+ assert(!eh.isIdle());
+ int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, handle.getFD(), 0);
+ // Ignore EBADF since deleting a nonexistent fd has the overall required result!
+ // And allows the case where a sloppy program closes the fd and then does the delFd()
+ if (rc == -1 && errno != EBADF) {
+ QPID_POSIX_CHECK(rc);
+ }
+ eh.setIdle();
}
// modFd is equivalent to delFd followed by addFd
void Poller::modFd(PollerHandle& handle, Direction dir) {
PollerHandlePrivate& eh = *handle.impl;
ScopedLock<Mutex> l(eh.lock);
- assert(eh.stat != PollerHandlePrivate::ABSENT);
+ assert(!eh.isIdle());
::epoll_event epe;
epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
@@ -177,13 +220,13 @@ void Poller::modFd(PollerHandle& handle, Direction dir) {
// Record monitoring state of this fd
eh.events = epe.events;
- eh.stat = PollerHandlePrivate::MONITORED;
+ eh.setActive();
}
void Poller::rearmFd(PollerHandle& handle) {
PollerHandlePrivate& eh = *handle.impl;
ScopedLock<Mutex> l(eh.lock);
- assert(eh.stat == PollerHandlePrivate::INACTIVE);
+ assert(eh.isInactive());
::epoll_event epe;
epe.events = eh.events;
@@ -191,7 +234,7 @@ void Poller::rearmFd(PollerHandle& handle) {
QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, handle.getFD(), &epe));
- eh.stat = PollerHandlePrivate::MONITORED;
+ eh.setActive();
}
void Poller::shutdown() {
@@ -229,8 +272,17 @@ Poller::Event Poller::wait(Duration timeout) {
ScopedLock<Mutex> l(eh.lock);
// the handle could have gone inactive since we left the epoll_wait
- if (eh.stat == PollerHandlePrivate::MONITORED) {
- eh.stat = PollerHandlePrivate::INACTIVE;
+ if (eh.isActive()) {
+ // If the connection has been hungup we could still be readable
+ // (just not writable), allow us to readable until we get here again
+ if (epe.events & ::EPOLLHUP) {
+ if (eh.isHungup()) {
+ return Event(handle, DISCONNECTED);
+ }
+ eh.setHungup();
+ } else {
+ eh.setInactive();
+ }
return Event(handle, PollerPrivate::epollToDirection(epe.events));
}
}
@@ -245,8 +297,8 @@ Poller::Event Poller::wait(Duration timeout) {
// If the wait wasn't indefinite, but we were interrupted then we have to return
// with a timeout as we don't know how long we've waited so far and so we can't
// continue the wait.
- if (rc == 0 || timeoutMs == -1) {
- return Event(0, NONE);
+ if (rc == 0 || timeoutMs != -1) {
+ return Event(0, TIMEOUT);
}
} while (true);
}