summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/posix/EventChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/posix/EventChannel.cpp')
-rw-r--r--cpp/src/qpid/sys/posix/EventChannel.cpp596
1 files changed, 0 insertions, 596 deletions
diff --git a/cpp/src/qpid/sys/posix/EventChannel.cpp b/cpp/src/qpid/sys/posix/EventChannel.cpp
deleted file mode 100644
index d35eedf5a5..0000000000
--- a/cpp/src/qpid/sys/posix/EventChannel.cpp
+++ /dev/null
@@ -1,596 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-// TODO aconway 2006-12-15: Locking review.
-
-// TODO aconway 2006-12-15: use Descriptor pointers everywhere,
-// get them from channel, pass them to Event constructors.
-// Eliminate lookup.
-
-
-#include "EventChannel.h"
-#include "check.h"
-
-#include "qpid/QpidError.h"
-#include "qpid/sys/AtomicCount.h"
-
-#include <mqueue.h>
-#include <string.h>
-#include <iostream>
-
-#include <sys/errno.h>
-#include <sys/socket.h>
-#include <sys/epoll.h>
-
-#include <typeinfo>
-#include <iostream>
-#include <queue>
-
-#include <boost/ptr_container/ptr_map.hpp>
-#include <boost/noncopyable.hpp>
-#include <boost/bind.hpp>
-
-using namespace std;
-
-
-namespace qpid {
-namespace sys {
-
-
-// ================================================================
-// Private class declarations
-
-namespace {
-
-typedef enum { IN, OUT } Direction;
-
-typedef std::pair<Event*, Event*> EventPair;
-
-/**
- * Template to zero out a C-struct on construction. Avoids uninitialized memory
- * warnings from valgrind or other mem checking tool.
- */
-template <class T> struct CleanStruct : public T {
- CleanStruct() { memset(this, 0, sizeof(*this)); }
-};
-
-} // namespace
-
-/**
- * Queue of events corresponding to one IO direction (IN or OUT).
- * Each Descriptor contains two Queues.
- */
-class EventChannel::Queue : private boost::noncopyable
-{
- public:
- Queue(Descriptor& container, Direction dir);
-
- /** Called by Event classes in prepare() */
- void push(Event* e);
-
- /** Called when epoll wakes.
- *@return The next completed event or 0.
- */
- Event* wake(uint32_t epollFlags);
-
- Event* pop() { Event* e = queue.front(); queue.pop_front(); return e; }
-
- bool empty() { return queue.empty(); }
-
- void setBit(uint32_t &epollFlags);
-
- void shutdown();
-
- private:
- typedef std::deque<Event*> EventQ;
-
- inline bool isMyEvent(uint32_t flags) { return flags | myEvent; }
-
- Mutex& lock; // Shared with Descriptor.
- Descriptor& descriptor;
- uint32_t myEvent; // Epoll event flag.
- EventQ queue;
-};
-
-
-/**
- * Manages a file descriptor in an epoll set.
- *
- * Can be shutdown and re-activated for the same file descriptor.
- */
-class EventChannel::Descriptor : private boost::noncopyable {
- public:
- explicit Descriptor(int fd) : epollFd(-1), myFd(fd),
- inQueue(*this, IN), outQueue(*this, OUT) {}
-
- void activate(int epollFd_);
-
- /** Epoll woke up for this descriptor. */
- Event* wake(uint32_t epollEvents);
-
- /** Shut down: close and remove file descriptor.
- * May be re-activated if fd is reused.
- */
- void shutdown();
-
- // TODO aconway 2006-12-18: Nasty. Need to clean up interaction.
- void shutdownUnsafe();
-
- bool isShutdown() { return epollFd == -1; }
-
- Queue& getQueue(Direction d) { return d==IN ? inQueue : outQueue; }
- int getFD() const { return myFd; }
-
- private:
- void update();
- void epollCtl(int op, uint32_t events);
- Queue* pick();
-
- Mutex lock;
- int epollFd;
- int myFd;
- Queue inQueue, outQueue;
- bool preferIn;
-
- friend class Queue;
-};
-
-
-/**
- * Holds a map of Descriptors, which do most of the work.
- */
-class EventChannel::Impl {
- public:
- Impl(int size = 256);
-
- ~Impl();
-
- /**
- * Activate descriptor
- */
- void activate(Descriptor& d) {
- d.activate(epollFd);
- }
-
- /** Wait for an event, return 0 on timeout */
- Event* wait(Duration timeout);
-
- void shutdown();
-
- private:
-
- Monitor monitor;
- int epollFd;
- int shutdownPipe[2];
- AtomicCount nWaiters;
- bool isShutdown;
-};
-
-
-// ================================================================
-// EventChannel::Queue::implementation.
-
-static const char* shutdownMsg = "Event queue shut down.";
-
-EventChannel::Queue::Queue(Descriptor& d, Direction dir) :
- lock(d.lock), descriptor(d),
- myEvent(dir==IN ? EPOLLIN : EPOLLOUT)
-{}
-
-void EventChannel::Queue::push(Event* e) {
- Mutex::ScopedLock l(lock);
- if (descriptor.isShutdown())
- THROW_QPID_ERROR(INTERNAL_ERROR, shutdownMsg);
- queue.push_back(e);
- descriptor.update();
-}
-
-void EventChannel::Queue::setBit(uint32_t &epollFlags) {
- if (queue.empty())
- epollFlags &= ~myEvent;
- else
- epollFlags |= myEvent;
-}
-
-// TODO aconway 2006-12-20: REMOVE
-Event* EventChannel::Queue::wake(uint32_t epollFlags) {
- // Called with lock held.
- if (!queue.empty() && (isMyEvent(epollFlags))) {
- assert(!queue.empty());
- Event* e = queue.front();
- assert(e);
- if (!e->getException()) {
- // TODO aconway 2006-12-20: Can/should we move event completion
- // out into dispatch() so it doesn't happen in Descriptor locks?
- e->complete(descriptor);
- }
- queue.pop_front();
- return e;
- }
- return 0;
-}
-
-void EventChannel::Queue::shutdown() {
- // Mark all pending events with a shutdown exception.
- // The server threads will remove and dispatch the events.
- //
- qpid::QpidError ex(INTERNAL_ERROR, shutdownMsg, SRCLINE);
- for_each(queue.begin(), queue.end(),
- boost::bind(&Event::setException, _1, ex));
-}
-
-
-// ================================================================
-// Descriptor
-
-
-void EventChannel::Descriptor::activate(int epollFd_) {
- Mutex::ScopedLock l(lock);
- if (isShutdown()) {
- epollFd = epollFd_; // We're back in business.
- epollCtl(EPOLL_CTL_ADD, 0);
- }
-}
-
-void EventChannel::Descriptor::shutdown() {
- Mutex::ScopedLock l(lock);
- shutdownUnsafe();
-}
-
-void EventChannel::Descriptor::shutdownUnsafe() {
- // Caller holds lock.
- ::close(myFd);
- epollFd = -1; // Mark myself as shutdown.
- inQueue.shutdown();
- outQueue.shutdown();
-}
-
-// TODO aconway 2006-12-20: Inline into wake().
-void EventChannel::Descriptor::update() {
- // Caller holds lock.
- if (isShutdown()) // Nothing to do
- return;
- uint32_t events = EPOLLONESHOT | EPOLLERR | EPOLLHUP;
- inQueue.setBit(events);
- outQueue.setBit(events);
- epollCtl(EPOLL_CTL_MOD, events);
-}
-
-void EventChannel::Descriptor::epollCtl(int op, uint32_t events) {
- // Caller holds lock
- assert(!isShutdown());
- CleanStruct<epoll_event> ee;
- ee.data.ptr = this;
- ee.events = events;
- int status = ::epoll_ctl(epollFd, op, myFd, &ee);
- if (status < 0) {
- if (errno == EEXIST) // It's okay to add an existing fd
- return;
- else if (errno == EBADF) // FD was closed externally.
- shutdownUnsafe();
- else
- throw QPID_POSIX_ERROR(errno);
- }
-}
-
-
-EventChannel::Queue* EventChannel::Descriptor::pick() {
- if (inQueue.empty() && outQueue.empty())
- return 0;
- if (inQueue.empty() || outQueue.empty())
- return !inQueue.empty() ? &inQueue : &outQueue;
- // Neither is empty, pick fairly.
- preferIn = !preferIn;
- return preferIn ? &inQueue : &outQueue;
-}
-
-Event* EventChannel::Descriptor::wake(uint32_t epollEvents) {
- Mutex::ScopedLock l(lock);
- // On error, shut down the Descriptor and both queues.
- if (epollEvents & (EPOLLERR | EPOLLHUP)) {
- shutdownUnsafe();
- // TODO aconway 2006-12-20: This error handling models means
- // that any error reported by epoll will result in a shutdown
- // exception on the events. Can we get more accurate error
- // reporting somehow?
- }
- Queue*q = 0;
- bool in = (epollEvents & EPOLLIN);
- bool out = (epollEvents & EPOLLOUT);
- if ((in && out) || isShutdown())
- q = pick(); // Choose fairly, either non-empty queue.
- else if (in)
- q = &inQueue;
- else if (out)
- q = &outQueue;
- Event* e = (q && !q->empty()) ? q->pop() : 0;
- update();
- if (e)
- e->complete(*this);
- return e;
-}
-
-
-
-// ================================================================
-// EventChannel::Impl
-
-
-EventChannel::Impl::Impl(int epollSize):
- epollFd(-1), isShutdown(false)
-{
- // Create the epoll file descriptor.
- epollFd = epoll_create(epollSize);
- QPID_POSIX_CHECK(epollFd);
-
- // Create a pipe and write a single byte. The byte is never
- // read so the pipes read fd is always ready for read.
- // We activate the FD when there are messages in the queue.
- QPID_POSIX_CHECK(::pipe(shutdownPipe));
- static char zero = '\0';
- QPID_POSIX_CHECK(::write(shutdownPipe[1], &zero, 1));
-}
-
-EventChannel::Impl::~Impl() {
- shutdown();
- ::close(epollFd);
- ::close(shutdownPipe[0]);
- ::close(shutdownPipe[1]);
-}
-
-
-void EventChannel::Impl::shutdown() {
- Monitor::ScopedLock l(monitor);
- if (!isShutdown) { // I'm starting shutdown.
- isShutdown = true;
- if (nWaiters == 0)
- return;
-
- // TODO aconway 2006-12-20: If I just close the epollFd will
- // that wake all threads? If so with what? Would be simpler than:
-
- CleanStruct<epoll_event> ee;
- ee.data.ptr = 0;
- ee.events = EPOLLIN;
- QPID_POSIX_CHECK(
- epoll_ctl(epollFd, EPOLL_CTL_ADD, shutdownPipe[0], &ee));
- }
- // Wait for nWaiters to get out.
- while (nWaiters > 0) {
- monitor.wait();
- }
-}
-
-// TODO aconway 2006-12-20: DEBUG remove
-struct epoll {
- epoll(uint32_t e) : events(e) { }
- uint32_t events;
-};
-
-#define BIT(X) out << ((e.events & X) ? __STRING(X) "." : "")
-ostream& operator << (ostream& out, epoll e) {
- out << "epoll_event.events: ";
- BIT(EPOLLIN);
- BIT(EPOLLPRI);
- BIT(EPOLLOUT);
- BIT(EPOLLRDNORM);
- BIT(EPOLLRDBAND);
- BIT(EPOLLWRNORM);
- BIT(EPOLLWRBAND);
- BIT(EPOLLMSG);
- BIT(EPOLLERR);
- BIT(EPOLLHUP);
- BIT(EPOLLONESHOT);
- BIT(EPOLLET);
- return out;
-}
-
-
-
-/**
- * Wait for epoll to wake up, return the descriptor or 0 on timeout.
- */
-Event* EventChannel::Impl::wait(Duration timeoutNs)
-{
- {
- Monitor::ScopedLock l(monitor);
- if (isShutdown)
- throw ShutdownException();
- }
-
- // Increase nWaiters for the duration, notify the monitor if I'm
- // the last one out.
- //
- AtomicCount::ScopedIncrement si(
- nWaiters, boost::bind(&Monitor::notifyAll, &monitor));
-
- // No lock, all thread safe calls or local variables:
- //
- const long timeoutMs =
- (timeoutNs == TIME_INFINITE) ? -1 : timeoutNs/TIME_MSEC;
- CleanStruct<epoll_event> ee;
- Event* event = 0;
-
- // Loop till we get a completed event. Some events may repost
- // themselves and return 0, e.g. incomplete read or write events.
- //TODO aconway 2006-12-20: FIX THIS!
- while (!event) {
- int n = ::epoll_wait(epollFd, &ee, 1, timeoutMs); // Thread safe.
- if (n == 0) // Timeout
- return 0;
- if (n < 0 && errno == EINTR) // Interrupt, ignore it.
- continue;
- if (n < 0)
- throw QPID_POSIX_ERROR(errno);
- assert(n == 1);
- Descriptor* ed =
- reinterpret_cast<Descriptor*>(ee.data.ptr);
- if (ed == 0) // We're being shut-down.
- throw ShutdownException();
- assert(ed != 0);
- event = ed->wake(ee.events);
- }
- return event;
-}
-
-//EventChannel::Descriptor& EventChannel::Impl::getDescriptor(int fd) {
-// Mutex::ScopedLock l(monitor);
-// Descriptor& ed = descriptors[fd];
-// ed.activate(epollFd, fd);
-// return ed;
-//}
-
-
-// ================================================================
-// EventChannel
-
-EventChannel::shared_ptr EventChannel::create() {
- return shared_ptr(new EventChannel());
-}
-
-EventChannel::EventChannel() : impl(new EventChannel::Impl()) {}
-
-EventChannel::~EventChannel() {}
-
-void EventChannel::post(Event& e)
-{
- e.prepare(*impl);
-}
-
-Event* EventChannel::wait(Duration timeoutNs)
-{
- return impl->wait(timeoutNs);
-}
-
-void EventChannel::shutdown() {
- impl->shutdown();
-}
-
-
-// ================================================================
-// Event and subclasses.
-
-Event::~Event() {}
-
-Exception::shared_ptr_const Event::getException() const {
- return exception;
-}
-
-void Event::throwIfException() {
- if (getException())
- exception->throwSelf();
-}
-
-void Event::dispatch()
-{
- if (!callback.empty())
- callback();
-}
-
-void Event::setException(const std::exception& e) {
- const Exception* ex = dynamic_cast<const Exception*>(&e);
- if (ex)
- exception.reset(ex->clone().release());
- else
- exception.reset(new Exception(e));
-#ifndef NDEBUG
- // Throw and re-catch the exception. Has no effect on the
- // program but it triggers debuggers watching for throw. The
- // context that sets the exception is more informative for
- // debugging purposes than the one that ultimately throws it.
- //
- try {
- throwIfException();
- }
- catch (...) { } // Ignored.
-#endif
-}
-
-int FDEvent::getFDescriptor() const {
- return descriptor.getFD();
-}
-
-// TODO: AMS 21/12/06 Don't like the inline new, probably cause a memory leak
-ReadEvent::ReadEvent(int fd, void* buf, size_t sz,Callback cb, bool noWait) :
- IOEvent(cb, *(new EventChannel::Descriptor(fd)), sz, noWait), buffer(buf), bytesRead(0) {
-}
-
-void ReadEvent::prepare(EventChannel::Impl& impl) {
- EventChannel::Descriptor& d = getDescriptor();
- impl.activate(d);
- d.getQueue(IN).push(this);
-}
-
-void ReadEvent::complete(EventChannel::Descriptor& ed)
-{
- ssize_t n = ::read(getFDescriptor(),
- static_cast<char*>(buffer) + bytesRead,
- size - bytesRead);
- if (n > 0)
- bytesRead += n;
- if (n == 0 || (n < 0 && errno != EAGAIN)) {
- // Use ENODATA for file closed.
- setException(QPID_POSIX_ERROR(n == 0 ? ENODATA : errno));
- ed.shutdownUnsafe();
- }
-}
-
-WriteEvent::WriteEvent(int fd, const void* buf, size_t sz, Callback cb) :
- IOEvent(cb, *(new EventChannel::Descriptor(fd)), sz, noWait), buffer(buf), bytesWritten(0) {
-}
-
-void WriteEvent::prepare(EventChannel::Impl& impl) {
- EventChannel::Descriptor& d = getDescriptor();
- impl.activate(d);
- d.getQueue(OUT).push(this);
-}
-
-
-void WriteEvent::complete(EventChannel::Descriptor& ed)
-{
- ssize_t n = ::write(getFDescriptor(),
- static_cast<const char*>(buffer) + bytesWritten,
- size - bytesWritten);
- if (n > 0)
- bytesWritten += n;
- if(n < 0 && errno != EAGAIN) {
- setException(QPID_POSIX_ERROR(errno));
- ed.shutdownUnsafe(); // Called with lock held.
- }
-}
-
-AcceptEvent::AcceptEvent(int fd, Callback cb) :
- FDEvent(cb, *(new EventChannel::Descriptor(fd))), accepted(0) {
-}
-
-void AcceptEvent::prepare(EventChannel::Impl& impl) {
- EventChannel::Descriptor& d = getDescriptor();
- impl.activate(d);
- d.getQueue(IN).push(this);
-}
-
-void AcceptEvent::complete(EventChannel::Descriptor& ed)
-{
- accepted = ::accept(getFDescriptor(), 0, 0);
- if (accepted < 0) {
- setException(QPID_POSIX_ERROR(errno));
- ed.shutdownUnsafe(); // Called with lock held.
- }
-}
-
-}}