/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/log/Logger.h" #include "qpid/sys/Poller.h" #include "qpid/sys/IOHandle.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/DeletionManager.h" #include "qpid/sys/posix/check.h" #include "qpid/sys/posix/PrivatePosix.h" #include #include #include #include #include #include #include #include //TODO: Remove this #include "qpid/sys/Dispatcher.h" namespace qpid { namespace sys { // Deletion manager to handle deferring deletion of PollerHandles to when they definitely aren't being used DeletionManager PollerHandleDeletionManager; // Instantiate (and define) class static for DeletionManager template <> DeletionManager::AllThreadsStatuses DeletionManager::allThreadsStatuses(0); class PollerHandlePrivate { friend class Poller; friend class PollerHandle; enum FDStat { ABSENT, MONITORED, INACTIVE, HUNGUP, MONITORED_HUNGUP, DELETED }; int fd; uint32_t events; FDStat stat; Mutex lock; PollerHandlePrivate(int f) : fd(f), events(0), stat(ABSENT) { } bool isActive() const { return stat == MONITORED || stat == MONITORED_HUNGUP; } void setActive() { stat = (stat == HUNGUP) ? MONITORED_HUNGUP : MONITORED; } bool isInactive() const { return stat == INACTIVE || stat == HUNGUP; } void setInactive() { stat = INACTIVE; } bool isIdle() const { return stat == ABSENT; } void setIdle() { stat = ABSENT; } bool isHungup() const { return stat == MONITORED_HUNGUP || stat == HUNGUP; } void setHungup() { assert(stat == MONITORED); stat = HUNGUP; } bool isDeleted() const { return stat == DELETED; } void setDeleted() { stat = DELETED; } }; PollerHandle::PollerHandle(const IOHandle& h) : impl(new PollerHandlePrivate(toFd(h.impl))) {} PollerHandle::~PollerHandle() { { ScopedLock l(impl->lock); if (impl->isDeleted()) { return; } if (impl->isActive()) { impl->setDeleted(); } } PollerHandleDeletionManager.markForDeletion(impl); } /** * Concrete implementation of Poller to use the Solaris Event Completion * Framework interface */ class PollerPrivate { friend class Poller; class InterruptHandle: public PollerHandle { std::queue handles; void processEvent(Poller::EventType) { PollerHandle* handle = handles.front(); handles.pop(); assert(handle); //Synthesise event Poller::Event event(handle, Poller::INTERRUPTED); //Process synthesised event event.process(); } public: InterruptHandle() : PollerHandle(DummyIOHandle) {} void addHandle(PollerHandle& h) { handles.push(&h); } PollerHandle *getHandle() { PollerHandle* handle = handles.front(); handles.pop(); return handle; } bool queuedHandles() { return handles.size() > 0; } }; const int portId; bool isShutdown; InterruptHandle interruptHandle; static uint32_t directionToPollEvent(Poller::Direction dir) { switch (dir) { case Poller::INPUT: return POLLIN; case Poller::OUTPUT: return POLLOUT; case Poller::INOUT: return POLLIN | POLLOUT; default: return 0; } } static Poller::EventType pollToDirection(uint32_t events) { uint32_t e = events & (POLLIN | POLLOUT); switch (e) { case POLLIN: return Poller::READABLE; case POLLOUT: return Poller::WRITABLE; case POLLIN | POLLOUT: return Poller::READ_WRITABLE; default: return (events & (POLLHUP | POLLERR)) ? Poller::DISCONNECTED : Poller::INVALID; } } PollerPrivate() : portId(::port_create()), isShutdown(false) { QPID_POSIX_CHECK(portId); QPID_LOG(trace, "port_create returned port Id: " << portId); } ~PollerPrivate() { } void interrupt() { //Send an Alarm to the port //We need to send a nonzero event mask, using POLLHUP, //nevertheless the wait method will only look for a PORT_ALERT_SET QPID_LOG(trace, "Sending a port_alert to " << portId); QPID_POSIX_CHECK(::port_alert(portId, PORT_ALERT_SET, POLLHUP, &static_cast(interruptHandle))); } }; void Poller::addFd(PollerHandle& handle, Direction dir) { PollerHandlePrivate& eh = *handle.impl; ScopedLock l(eh.lock); uint32_t events = 0; if (eh.isIdle()) { events = PollerPrivate::directionToPollEvent(dir); } else { assert(eh.isActive()); events = eh.events | PollerPrivate::directionToPollEvent(dir); } //port_associate can be used to add an association or modify an //existing one QPID_POSIX_CHECK(::port_associate(impl->portId, PORT_SOURCE_FD, (uintptr_t) eh.fd, events, &handle)); eh.events = events; eh.setActive(); QPID_LOG(trace, "Poller::addFd(handle=" << &handle << "[" << typeid(&handle).name() << "], fd=" << eh.fd << ")"); } void Poller::delFd(PollerHandle& handle) { PollerHandlePrivate& eh = *handle.impl; ScopedLock l(eh.lock); assert(!eh.isIdle()); int rc = ::port_dissociate(impl->portId, PORT_SOURCE_FD, (uintptr_t) eh.fd); //Allow closing an invalid fd, allowing users to close fd before //doing delFd() if (rc == -1 && errno != EBADFD) { QPID_POSIX_CHECK(rc); } eh.setIdle(); QPID_LOG(trace, "Poller::delFd(handle=" << &handle << ", fd=" << eh.fd << ")"); } // modFd is equivalent to delFd followed by addFd void Poller::modFd(PollerHandle& handle, Direction dir) { PollerHandlePrivate& eh = *handle.impl; ScopedLock l(eh.lock); assert(!eh.isIdle()); eh.events = PollerPrivate::directionToPollEvent(dir); //If fd is already associated, events and user arguments are updated //So, no need to check if fd is already associated QPID_POSIX_CHECK(::port_associate(impl->portId, PORT_SOURCE_FD, (uintptr_t) eh.fd, eh.events, &handle)); eh.setActive(); QPID_LOG(trace, "Poller::modFd(handle=" << &handle << ", fd=" << eh.fd << ")"); } void Poller::rearmFd(PollerHandle& handle) { PollerHandlePrivate& eh = *handle.impl; ScopedLock l(eh.lock); assert(eh.isInactive()); QPID_POSIX_CHECK(::port_associate(impl->portId, PORT_SOURCE_FD, (uintptr_t) eh.fd, eh.events, &handle)); eh.setActive(); QPID_LOG(trace, "Poller::rearmdFd(handle=" << &handle << ", fd=" << eh.fd << ")"); } void Poller::shutdown() { //Allow sloppy code to shut us down more than once if (impl->isShutdown) return; impl->isShutdown = true; impl->interrupt(); } bool Poller::hasShutdown() { return impl->isShutdown; } bool Poller::interrupt(PollerHandle& handle) { PollerPrivate::InterruptHandle& ih = impl->interruptHandle; PollerHandlePrivate& eh = *static_cast(ih).impl; ScopedLock l(eh.lock); ih.addHandle(handle); impl->interrupt(); eh.setActive(); return true; } void Poller::run() { // Make sure we can't be interrupted by signals at a bad time ::sigset_t ss; ::sigfillset(&ss); ::pthread_sigmask(SIG_SETMASK, &ss, 0); do { Event event = wait(); // If can read/write then dispatch appropriate callbacks if (event.handle) { event.process(); } else { // Handle shutdown switch (event.type) { case SHUTDOWN: return; default: // This should be impossible assert(false); } } } while (true); } Poller::Event Poller::wait(Duration timeout) { timespec_t tout; timespec_t* ptout = NULL; port_event_t pe; AbsTime targetTimeout = (timeout == TIME_INFINITE) ? FAR_FUTURE : AbsTime(now(), timeout); if (timeout != TIME_INFINITE) { tout.tv_sec = 0; tout.tv_nsec = timeout; ptout = &tout; } do { PollerHandleDeletionManager.markAllUnusedInThisThread(); QPID_LOG(trace, "About to enter port_get on " << impl->portId << ". Thread " << pthread_self() << ", timeout=" << timeout); int rc = ::port_get(impl->portId, &pe, ptout); QPID_LOG(trace, "port_get on " << impl->portId << " returned " << rc); if (impl->isShutdown) { PollerHandleDeletionManager.markAllUnusedInThisThread(); return Event(0, SHUTDOWN); } if (rc < 0) { switch (errno) { case EINTR: continue; case ETIME: return Event(0, TIMEOUT); default: QPID_POSIX_CHECK(rc); } } else { PollerHandle* handle = static_cast(pe.portev_user); PollerHandlePrivate& eh = *handle->impl; ScopedLock l(eh.lock); if (eh.isActive()) { QPID_LOG(trace, "Handle is active"); //We use alert mode to notify interrupts if (pe.portev_source == PORT_SOURCE_ALERT && handle == &impl->interruptHandle) { QPID_LOG(trace, "Interrupt notified"); PollerHandle* wrappedHandle = impl->interruptHandle.getHandle(); if (impl->interruptHandle.queuedHandles()) { impl->interrupt(); eh.setActive(); } else { eh.setInactive(); } return Event(wrappedHandle, INTERRUPTED); } if (pe.portev_source == PORT_SOURCE_FD) { QPID_LOG(trace, "About to send handle: " << handle); if (pe.portev_events & POLLHUP) { if (eh.isHungup()) { return Event(handle, DISCONNECTED); } eh.setHungup(); } else { eh.setInactive(); } QPID_LOG(trace, "Sending event (thread: " << pthread_self() << ") for handle " << handle << ", direction= " << PollerPrivate::pollToDirection(pe.portev_events)); return Event(handle, PollerPrivate::pollToDirection(pe.portev_events)); } } else if (eh.isDeleted()) { //Remove the handle from the poller int rc = ::port_dissociate(impl->portId, PORT_SOURCE_FD, (uintptr_t) eh.fd); if (rc == -1 && errno != EBADFD) { QPID_POSIX_CHECK(rc); } } } if (timeout == TIME_INFINITE) { continue; } if (rc == 0 && now() > targetTimeout) { PollerHandleDeletionManager.markAllUnusedInThisThread(); return Event(0, TIMEOUT); } } while (true); } // Concrete constructors Poller::Poller() : impl(new PollerPrivate()) {} Poller::~Poller() { delete impl; } }}