summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/epoll
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2007-06-18 12:11:32 +0000
committerAndrew Stitcher <astitcher@apache.org>2007-06-18 12:11:32 +0000
commit45b526ce09daee869ec1313808583f7e05bff7bb (patch)
tree297d2b1f02b14e1fdffbc1074b3d23670859f602 /cpp/src/qpid/sys/epoll
parent41c30308ad435c338633b97405fe7350d515f069 (diff)
downloadqpid-python-45b526ce09daee869ec1313808583f7e05bff7bb.tar.gz
Intermediate checkin with preliminary work on epoll based net IO
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@548337 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/epoll')
-rw-r--r--cpp/src/qpid/sys/epoll/EpollPoller.cpp263
1 files changed, 263 insertions, 0 deletions
diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
new file mode 100644
index 0000000000..65b2255023
--- /dev/null
+++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
@@ -0,0 +1,263 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/posix/check.h"
+
+#include <sys/epoll.h>
+#include <errno.h>
+
+#include <assert.h>
+#include <vector>
+#include <exception>
+
+namespace qpid {
+namespace sys {
+
+class PollerHandlePrivate {
+ friend class Poller;
+ friend class PollerHandle;
+
+ enum FDStat {
+ ABSENT,
+ MONITORED,
+ INACTIVE
+ };
+
+ ::__uint32_t events;
+ FDStat stat;
+ Mutex lock;
+
+ PollerHandlePrivate() :
+ events(0),
+ stat(ABSENT) {
+ }
+};
+
+PollerHandle::PollerHandle(int fd0) :
+ impl(new PollerHandlePrivate),
+ fd(fd0)
+{}
+
+PollerHandle::~PollerHandle() {
+ delete impl;
+}
+
+/**
+ * Concrete implementation of Poller to use the Linux specific epoll
+ * interface
+ */
+class PollerPrivate {
+ friend class Poller;
+
+ static const int DefaultFds = 256;
+
+ struct ReadablePipe {
+ int fds[2];
+
+ /**
+ * This encapsulates an always readable pipe which we can add
+ * to the epoll set to force epoll_wait to return
+ */
+ ReadablePipe() {
+ QPID_POSIX_CHECK(::pipe(fds));
+ // Just write the pipe's fds to the pipe
+ QPID_POSIX_CHECK(::write(fds[1], fds, 2));
+ }
+
+ ~ReadablePipe() {
+ ::close(fds[0]);
+ ::close(fds[1]);
+ }
+
+ int getFD() {
+ return fds[0];
+ }
+ };
+
+ static ReadablePipe alwaysReadable;
+
+ const int epollFd;
+ bool isShutdown;
+
+ static ::__uint32_t directionToEpollEvent(Poller::Direction dir) {
+ switch (dir) {
+ case Poller::IN: return ::EPOLLIN;
+ case Poller::OUT: return ::EPOLLOUT;
+ case Poller::INOUT: return ::EPOLLIN | ::EPOLLOUT;
+ default: return 0;
+ }
+ }
+
+ static Poller::Direction epollToDirection(::__uint32_t events) {
+ ::__uint32_t e = events & (::EPOLLIN | ::EPOLLOUT);
+ switch (e) {
+ case ::EPOLLIN: return Poller::IN;
+ case ::EPOLLOUT: return Poller::OUT;
+ case ::EPOLLIN | ::EPOLLOUT: return Poller::INOUT;
+ default: return Poller::NONE;
+ }
+ }
+
+ PollerPrivate() :
+ epollFd(::epoll_create(DefaultFds)),
+ isShutdown(false) {
+ QPID_POSIX_CHECK(epollFd);
+ }
+
+ ~PollerPrivate() {
+ // It's probably okay to ignore any errors here as there can't be data loss
+ ::close(epollFd);
+ }
+};
+
+PollerPrivate::ReadablePipe PollerPrivate::alwaysReadable;
+
+void Poller::addFd(PollerHandle& handle, Direction dir) {
+ PollerHandlePrivate& eh = *handle.impl;
+ ScopedLock<Mutex> l(eh.lock);
+ ::epoll_event epe;
+ int op;
+
+ if (eh.stat == PollerHandlePrivate::ABSENT) {
+ op = EPOLL_CTL_ADD;
+ epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
+ } else {
+ assert(eh.stat == PollerHandlePrivate::MONITORED);
+ op = EPOLL_CTL_MOD;
+ epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir);
+ }
+ epe.data.ptr = &handle;
+
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, handle.getFD(), &epe));
+
+ // Record monitoring state of this fd
+ eh.events = epe.events;
+ eh.stat = PollerHandlePrivate::MONITORED;
+}
+
+void Poller::delFd(PollerHandle& handle) {
+ PollerHandlePrivate& eh = *handle.impl;
+ ScopedLock<Mutex> l(eh.lock);
+ assert(eh.stat != PollerHandlePrivate::ABSENT);
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, handle.getFD(), 0));
+ eh.stat = PollerHandlePrivate::ABSENT;
+}
+
+// modFd is equivalent to delFd followed by addFd
+void Poller::modFd(PollerHandle& handle, Direction dir) {
+ PollerHandlePrivate& eh = *handle.impl;
+ ScopedLock<Mutex> l(eh.lock);
+ assert(eh.stat != PollerHandlePrivate::ABSENT);
+
+ ::epoll_event epe;
+ epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
+ epe.data.ptr = &handle;
+
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, handle.getFD(), &epe));
+
+ // Record monitoring state of this fd
+ eh.events = epe.events;
+ eh.stat = PollerHandlePrivate::MONITORED;
+}
+
+void Poller::rearmFd(PollerHandle& handle) {
+ PollerHandlePrivate& eh = *handle.impl;
+ ScopedLock<Mutex> l(eh.lock);
+ assert(eh.stat == PollerHandlePrivate::INACTIVE);
+
+ ::epoll_event epe;
+ epe.events = eh.events;
+ epe.data.ptr = &handle;
+
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, handle.getFD(), &epe));
+
+ eh.stat = PollerHandlePrivate::MONITORED;
+}
+
+void Poller::shutdown() {
+ // Don't use any locking here - isshutdown will be visible to all
+ // after the epoll_ctl() anyway (it's a memory barrier)
+ impl->isShutdown = true;
+
+ // Add always readable fd to epoll (not EPOLLONESHOT)
+ int fd = impl->alwaysReadable.getFD();
+ ::epoll_event epe;
+ epe.events = ::EPOLLIN;
+ epe.data.ptr = 0;
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, fd, &epe));
+}
+
+Poller::Event Poller::wait(Duration timeout) {
+ epoll_event epe;
+ int timeoutMs = (timeout == TIME_INFINITE) ? -1 : timeout / TIME_MSEC;
+
+ // Repeat until we weren't interupted
+ do {
+ int rc = ::epoll_wait(impl->epollFd, &epe, 1, timeoutMs);
+
+ if (impl->isShutdown) {
+ return Event(0, SHUTDOWN);
+ }
+
+ if (rc ==-1 && errno != EINTR) {
+ QPID_POSIX_CHECK(rc);
+ } else if (rc > 0) {
+ assert(rc == 1);
+ PollerHandle* handle = static_cast<PollerHandle*>(epe.data.ptr);
+ PollerHandlePrivate& eh = *handle->impl;
+
+ ScopedLock<Mutex> l(eh.lock);
+
+ // the handle could have gone inactive since we left the epoll_wait
+ if (eh.stat == PollerHandlePrivate::MONITORED) {
+ eh.stat = PollerHandlePrivate::INACTIVE;
+ return Event(handle, PollerPrivate::epollToDirection(epe.events));
+ }
+ }
+ // We only get here if one of the following:
+ // * epoll_wait was interrupted by a signal
+ // * epoll_wait timed out
+ // * the state of the handle changed after being returned by epoll_wait
+ //
+ // The only things we can do here are return a timeout or wait more.
+ // Obviously if we timed out we return timeout; if the wait was meant to
+ // be indefinite then we should never return with a time out so we go again.
+ // If the wait wasn't indefinite, but we were interrupted then we have to return
+ // with a timeout as we don't know how long we've waited so far and so we can't
+ // continue the wait.
+ if (rc == 0 || timeoutMs == -1) {
+ return Event(0, NONE);
+ }
+ } while (true);
+}
+
+// Concrete constructors
+Poller::Poller() :
+ impl(new PollerPrivate())
+{}
+
+Poller::~Poller() {
+ delete impl;
+}
+
+}}