diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2007-07-12 01:48:13 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2007-07-12 01:48:13 +0000 |
| commit | ef1d54d57f354765e6ea1ae73807a4a633c73998 (patch) | |
| tree | 8ce4fada81e4a844b8bc7b523d657dd84e79857c /cpp/src/qpid/sys/epoll/EpollPoller.cpp | |
| parent | fa267e310c233cfc9fc3b67b3c080adb4911f69f (diff) | |
| download | qpid-python-ef1d54d57f354765e6ea1ae73807a4a633c73998.tar.gz | |
* Add libuuid to libcommon link (for when apr goes away)
* Latest version of AsynchIO code
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@555455 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/epoll/EpollPoller.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/epoll/EpollPoller.cpp | 92 |
1 files changed, 72 insertions, 20 deletions
diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp index 65b2255023..8c3bdbc7d5 100644 --- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp +++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -40,7 +40,9 @@ class PollerHandlePrivate { enum FDStat { ABSENT, MONITORED, - INACTIVE + INACTIVE, + HUNGUP, + MONITORED_HUNGUP }; ::__uint32_t events; @@ -51,6 +53,39 @@ class PollerHandlePrivate { events(0), stat(ABSENT) { } + + bool isActive() const { + return stat == MONITORED || stat == MONITORED_HUNGUP; + } + + void setActive() { + stat = (stat == HUNGUP) ? MONITORED_HUNGUP : MONITORED; + } + + bool isInactive() const { + return stat == INACTIVE || stat == HUNGUP; + } + + void setInactive() { + stat = INACTIVE; + } + + bool isIdle() const { + return stat == ABSENT; + } + + void setIdle() { + stat = ABSENT; + } + + bool isHungup() const { + return stat == MONITORED_HUNGUP || stat == HUNGUP; + } + + void setHungup() { + assert(stat == MONITORED); + stat = HUNGUP; + } }; PollerHandle::PollerHandle(int fd0) : @@ -108,13 +143,16 @@ class PollerPrivate { } } - static Poller::Direction epollToDirection(::__uint32_t events) { + static Poller::EventType epollToDirection(::__uint32_t events) { + // POLLOUT & POLLHUP are mutually exclusive really, but at least socketpairs + // can give you both! + events = (events & ::EPOLLHUP) ? events & ~::EPOLLOUT : events; ::__uint32_t e = events & (::EPOLLIN | ::EPOLLOUT); switch (e) { - case ::EPOLLIN: return Poller::IN; - case ::EPOLLOUT: return Poller::OUT; - case ::EPOLLIN | ::EPOLLOUT: return Poller::INOUT; - default: return Poller::NONE; + case ::EPOLLIN: return Poller::READABLE; + case ::EPOLLOUT: return Poller::WRITABLE; + case ::EPOLLIN | ::EPOLLOUT: return Poller::READ_WRITABLE; + default: return (events & ::EPOLLHUP) ? Poller::DISCONNECTED : Poller::INVALID; } } @@ -138,11 +176,11 @@ void Poller::addFd(PollerHandle& handle, Direction dir) { ::epoll_event epe; int op; - if (eh.stat == PollerHandlePrivate::ABSENT) { + if (eh.isIdle()) { op = EPOLL_CTL_ADD; epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT; } else { - assert(eh.stat == PollerHandlePrivate::MONITORED); + assert(eh.isActive()); op = EPOLL_CTL_MOD; epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir); } @@ -152,22 +190,27 @@ void Poller::addFd(PollerHandle& handle, Direction dir) { // Record monitoring state of this fd eh.events = epe.events; - eh.stat = PollerHandlePrivate::MONITORED; + eh.setActive(); } void Poller::delFd(PollerHandle& handle) { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); - assert(eh.stat != PollerHandlePrivate::ABSENT); - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, handle.getFD(), 0)); - eh.stat = PollerHandlePrivate::ABSENT; + assert(!eh.isIdle()); + int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, handle.getFD(), 0); + // Ignore EBADF since deleting a nonexistent fd has the overall required result! + // And allows the case where a sloppy program closes the fd and then does the delFd() + if (rc == -1 && errno != EBADF) { + QPID_POSIX_CHECK(rc); + } + eh.setIdle(); } // modFd is equivalent to delFd followed by addFd void Poller::modFd(PollerHandle& handle, Direction dir) { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); - assert(eh.stat != PollerHandlePrivate::ABSENT); + assert(!eh.isIdle()); ::epoll_event epe; epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT; @@ -177,13 +220,13 @@ void Poller::modFd(PollerHandle& handle, Direction dir) { // Record monitoring state of this fd eh.events = epe.events; - eh.stat = PollerHandlePrivate::MONITORED; + eh.setActive(); } void Poller::rearmFd(PollerHandle& handle) { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); - assert(eh.stat == PollerHandlePrivate::INACTIVE); + assert(eh.isInactive()); ::epoll_event epe; epe.events = eh.events; @@ -191,7 +234,7 @@ void Poller::rearmFd(PollerHandle& handle) { QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, handle.getFD(), &epe)); - eh.stat = PollerHandlePrivate::MONITORED; + eh.setActive(); } void Poller::shutdown() { @@ -229,8 +272,17 @@ Poller::Event Poller::wait(Duration timeout) { ScopedLock<Mutex> l(eh.lock); // the handle could have gone inactive since we left the epoll_wait - if (eh.stat == PollerHandlePrivate::MONITORED) { - eh.stat = PollerHandlePrivate::INACTIVE; + if (eh.isActive()) { + // If the connection has been hungup we could still be readable + // (just not writable), allow us to readable until we get here again + if (epe.events & ::EPOLLHUP) { + if (eh.isHungup()) { + return Event(handle, DISCONNECTED); + } + eh.setHungup(); + } else { + eh.setInactive(); + } return Event(handle, PollerPrivate::epollToDirection(epe.events)); } } @@ -245,8 +297,8 @@ Poller::Event Poller::wait(Duration timeout) { // If the wait wasn't indefinite, but we were interrupted then we have to return // with a timeout as we don't know how long we've waited so far and so we can't // continue the wait. - if (rc == 0 || timeoutMs == -1) { - return Event(0, NONE); + if (rc == 0 || timeoutMs != -1) { + return Event(0, TIMEOUT); } } while (true); } |
