From 45b526ce09daee869ec1313808583f7e05bff7bb Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Mon, 18 Jun 2007 12:11:32 +0000 Subject: Intermediate checkin with preliminary work on epoll based net IO git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@548337 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/sys/posix/EventChannel.cpp | 679 +++++++++++++++------- cpp/src/qpid/sys/posix/EventChannel.h | 200 ++++--- cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp | 8 +- cpp/src/qpid/sys/posix/EventChannelConnection.cpp | 36 +- cpp/src/qpid/sys/posix/EventChannelConnection.h | 4 +- cpp/src/qpid/sys/posix/EventChannelThreads.cpp | 75 +-- cpp/src/qpid/sys/posix/EventChannelThreads.h | 43 +- cpp/src/qpid/sys/posix/check.h | 6 +- 8 files changed, 699 insertions(+), 352 deletions(-) (limited to 'cpp/src/qpid/sys/posix') diff --git a/cpp/src/qpid/sys/posix/EventChannel.cpp b/cpp/src/qpid/sys/posix/EventChannel.cpp index 6db397a165..d35eedf5a5 100644 --- a/cpp/src/qpid/sys/posix/EventChannel.cpp +++ b/cpp/src/qpid/sys/posix/EventChannel.cpp @@ -1,4 +1,4 @@ -/* +/* * * Copyright (c) 2006 The Apache Software Foundation * @@ -16,6 +16,19 @@ * */ +// TODO aconway 2006-12-15: Locking review. + +// TODO aconway 2006-12-15: use Descriptor pointers everywhere, +// get them from channel, pass them to Event constructors. +// Eliminate lookup. + + +#include "EventChannel.h" +#include "check.h" + +#include "qpid/QpidError.h" +#include "qpid/sys/AtomicCount.h" + #include #include #include @@ -29,139 +42,420 @@ #include #include -#include - -#include "qpid/QpidError.h" -#include "qpid/sys/Monitor.h" -#include "qpid/log/Statement.h" - -#include "check.h" -#include "EventChannel.h" +#include +#include using namespace std; -// Convenience template to zero out a struct. -template struct ZeroStruct : public S { - ZeroStruct() { memset(this, 0, sizeof(*this)); } -}; - namespace qpid { namespace sys { +// ================================================================ +// Private class declarations + +namespace { + +typedef enum { IN, OUT } Direction; + +typedef std::pair EventPair; + /** - * EventHandler wraps an epoll file descriptor. Acts as private - * interface between EventChannel and subclasses. - * - * Also implements Event interface for events that are not associated - * with a file descriptor and are passed via the message queue. - */ -class EventHandler : public Event, private Monitor + * Template to zero out a C-struct on construction. Avoids uninitialized memory + * warnings from valgrind or other mem checking tool. + */ +template struct CleanStruct : public T { + CleanStruct() { memset(this, 0, sizeof(*this)); } +}; + +} // namespace + +/** + * Queue of events corresponding to one IO direction (IN or OUT). + * Each Descriptor contains two Queues. + */ +class EventChannel::Queue : private boost::noncopyable { public: - EventHandler(int epollSize = 256); - ~EventHandler(); + Queue(Descriptor& container, Direction dir); - int getEpollFd() { return epollFd; } - void epollAdd(int fd, uint32_t epollEvents, Event* event); - void epollMod(int fd, uint32_t epollEvents, Event* event); - void epollDel(int fd); + /** Called by Event classes in prepare() */ + void push(Event* e); - void mqPut(Event* event); - Event* mqGet(); - - protected: - // Should never be called, only complete. - void prepare(EventHandler&) { assert(0); } - Event* complete(EventHandler& eh); + /** Called when epoll wakes. + *@return The next completed event or 0. + */ + Event* wake(uint32_t epollFlags); + + Event* pop() { Event* e = queue.front(); queue.pop_front(); return e; } + + bool empty() { return queue.empty(); } + + void setBit(uint32_t &epollFlags); + + void shutdown(); private: + typedef std::deque EventQ; + + inline bool isMyEvent(uint32_t flags) { return flags | myEvent; } + + Mutex& lock; // Shared with Descriptor. + Descriptor& descriptor; + uint32_t myEvent; // Epoll event flag. + EventQ queue; +}; + + +/** + * Manages a file descriptor in an epoll set. + * + * Can be shutdown and re-activated for the same file descriptor. + */ +class EventChannel::Descriptor : private boost::noncopyable { + public: + explicit Descriptor(int fd) : epollFd(-1), myFd(fd), + inQueue(*this, IN), outQueue(*this, OUT) {} + + void activate(int epollFd_); + + /** Epoll woke up for this descriptor. */ + Event* wake(uint32_t epollEvents); + + /** Shut down: close and remove file descriptor. + * May be re-activated if fd is reused. + */ + void shutdown(); + + // TODO aconway 2006-12-18: Nasty. Need to clean up interaction. + void shutdownUnsafe(); + + bool isShutdown() { return epollFd == -1; } + + Queue& getQueue(Direction d) { return d==IN ? inQueue : outQueue; } + int getFD() const { return myFd; } + + private: + void update(); + void epollCtl(int op, uint32_t events); + Queue* pick(); + + Mutex lock; int epollFd; - std::string mqName; - int mqFd; - std::queue mqEvents; + int myFd; + Queue inQueue, outQueue; + bool preferIn; + + friend class Queue; }; -EventHandler::EventHandler(int epollSize) -{ - epollFd = epoll_create(epollSize); - if (epollFd < 0) throw QPID_POSIX_ERROR(errno); + +/** + * Holds a map of Descriptors, which do most of the work. + */ +class EventChannel::Impl { + public: + Impl(int size = 256); + + ~Impl(); + + /** + * Activate descriptor + */ + void activate(Descriptor& d) { + d.activate(epollFd); + } + + /** Wait for an event, return 0 on timeout */ + Event* wait(Duration timeout); - // Create a POSIX message queue for non-fd events. - // We write one byte and never read it is always ready for read - // when we add it to epoll. + void shutdown(); + + private: + + Monitor monitor; + int epollFd; + int shutdownPipe[2]; + AtomicCount nWaiters; + bool isShutdown; +}; + + +// ================================================================ +// EventChannel::Queue::implementation. + +static const char* shutdownMsg = "Event queue shut down."; + +EventChannel::Queue::Queue(Descriptor& d, Direction dir) : + lock(d.lock), descriptor(d), + myEvent(dir==IN ? EPOLLIN : EPOLLOUT) +{} + +void EventChannel::Queue::push(Event* e) { + Mutex::ScopedLock l(lock); + if (descriptor.isShutdown()) + THROW_QPID_ERROR(INTERNAL_ERROR, shutdownMsg); + queue.push_back(e); + descriptor.update(); +} + +void EventChannel::Queue::setBit(uint32_t &epollFlags) { + if (queue.empty()) + epollFlags &= ~myEvent; + else + epollFlags |= myEvent; +} + +// TODO aconway 2006-12-20: REMOVE +Event* EventChannel::Queue::wake(uint32_t epollFlags) { + // Called with lock held. + if (!queue.empty() && (isMyEvent(epollFlags))) { + assert(!queue.empty()); + Event* e = queue.front(); + assert(e); + if (!e->getException()) { + // TODO aconway 2006-12-20: Can/should we move event completion + // out into dispatch() so it doesn't happen in Descriptor locks? + e->complete(descriptor); + } + queue.pop_front(); + return e; + } + return 0; +} + +void EventChannel::Queue::shutdown() { + // Mark all pending events with a shutdown exception. + // The server threads will remove and dispatch the events. // - ZeroStruct attr; - attr.mq_maxmsg = 1; - attr.mq_msgsize = 1; - do { - char tmpnam[L_tmpnam]; - tmpnam_r(tmpnam); - mqName = tmpnam + 4; // Skip "tmp/" - mqFd = mq_open( - mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr); - if (mqFd < 0) throw QPID_POSIX_ERROR(errno); - } while (mqFd == EEXIST); // Name already taken, try again. + qpid::QpidError ex(INTERNAL_ERROR, shutdownMsg, SRCLINE); + for_each(queue.begin(), queue.end(), + boost::bind(&Event::setException, _1, ex)); +} - static char zero = '\0'; - mq_send(mqFd, &zero, 1, 0); - epollAdd(mqFd, 0, this); + +// ================================================================ +// Descriptor + + +void EventChannel::Descriptor::activate(int epollFd_) { + Mutex::ScopedLock l(lock); + if (isShutdown()) { + epollFd = epollFd_; // We're back in business. + epollCtl(EPOLL_CTL_ADD, 0); + } } -EventHandler::~EventHandler() { - mq_close(mqFd); - mq_unlink(mqName.c_str()); +void EventChannel::Descriptor::shutdown() { + Mutex::ScopedLock l(lock); + shutdownUnsafe(); } -void EventHandler::mqPut(Event* event) { - ScopedLock l(*this); - assert(event != 0); - mqEvents.push(event); - epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); +void EventChannel::Descriptor::shutdownUnsafe() { + // Caller holds lock. + ::close(myFd); + epollFd = -1; // Mark myself as shutdown. + inQueue.shutdown(); + outQueue.shutdown(); +} + +// TODO aconway 2006-12-20: Inline into wake(). +void EventChannel::Descriptor::update() { + // Caller holds lock. + if (isShutdown()) // Nothing to do + return; + uint32_t events = EPOLLONESHOT | EPOLLERR | EPOLLHUP; + inQueue.setBit(events); + outQueue.setBit(events); + epollCtl(EPOLL_CTL_MOD, events); +} + +void EventChannel::Descriptor::epollCtl(int op, uint32_t events) { + // Caller holds lock + assert(!isShutdown()); + CleanStruct ee; + ee.data.ptr = this; + ee.events = events; + int status = ::epoll_ctl(epollFd, op, myFd, &ee); + if (status < 0) { + if (errno == EEXIST) // It's okay to add an existing fd + return; + else if (errno == EBADF) // FD was closed externally. + shutdownUnsafe(); + else + throw QPID_POSIX_ERROR(errno); + } } + -Event* EventHandler::mqGet() { - ScopedLock l(*this); - if (mqEvents.empty()) +EventChannel::Queue* EventChannel::Descriptor::pick() { + if (inQueue.empty() && outQueue.empty()) return 0; - Event* event = mqEvents.front(); - mqEvents.pop(); - if(!mqEvents.empty()) - epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); - return event; + if (inQueue.empty() || outQueue.empty()) + return !inQueue.empty() ? &inQueue : &outQueue; + // Neither is empty, pick fairly. + preferIn = !preferIn; + return preferIn ? &inQueue : &outQueue; } -void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event) -{ - ZeroStruct ee; - ee.data.ptr = event; - ee.events = epollEvents; - if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0) - throw QPID_POSIX_ERROR(errno); +Event* EventChannel::Descriptor::wake(uint32_t epollEvents) { + Mutex::ScopedLock l(lock); + // On error, shut down the Descriptor and both queues. + if (epollEvents & (EPOLLERR | EPOLLHUP)) { + shutdownUnsafe(); + // TODO aconway 2006-12-20: This error handling models means + // that any error reported by epoll will result in a shutdown + // exception on the events. Can we get more accurate error + // reporting somehow? + } + Queue*q = 0; + bool in = (epollEvents & EPOLLIN); + bool out = (epollEvents & EPOLLOUT); + if ((in && out) || isShutdown()) + q = pick(); // Choose fairly, either non-empty queue. + else if (in) + q = &inQueue; + else if (out) + q = &outQueue; + Event* e = (q && !q->empty()) ? q->pop() : 0; + update(); + if (e) + e->complete(*this); + return e; } -void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event) + + +// ================================================================ +// EventChannel::Impl + + +EventChannel::Impl::Impl(int epollSize): + epollFd(-1), isShutdown(false) { - ZeroStruct ee; - ee.data.ptr = event; - ee.events = epollEvents; - if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0) - throw QPID_POSIX_ERROR(errno); + // Create the epoll file descriptor. + epollFd = epoll_create(epollSize); + QPID_POSIX_CHECK(epollFd); + + // Create a pipe and write a single byte. The byte is never + // read so the pipes read fd is always ready for read. + // We activate the FD when there are messages in the queue. + QPID_POSIX_CHECK(::pipe(shutdownPipe)); + static char zero = '\0'; + QPID_POSIX_CHECK(::write(shutdownPipe[1], &zero, 1)); } -void EventHandler::epollDel(int fd) { - if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0) - throw QPID_POSIX_ERROR(errno); +EventChannel::Impl::~Impl() { + shutdown(); + ::close(epollFd); + ::close(shutdownPipe[0]); + ::close(shutdownPipe[1]); } -Event* EventHandler::complete(EventHandler& eh) -{ - assert(&eh == this); - Event* event = mqGet(); - return event==0 ? 0 : event->complete(eh); + +void EventChannel::Impl::shutdown() { + Monitor::ScopedLock l(monitor); + if (!isShutdown) { // I'm starting shutdown. + isShutdown = true; + if (nWaiters == 0) + return; + + // TODO aconway 2006-12-20: If I just close the epollFd will + // that wake all threads? If so with what? Would be simpler than: + + CleanStruct ee; + ee.data.ptr = 0; + ee.events = EPOLLIN; + QPID_POSIX_CHECK( + epoll_ctl(epollFd, EPOLL_CTL_ADD, shutdownPipe[0], &ee)); + } + // Wait for nWaiters to get out. + while (nWaiters > 0) { + monitor.wait(); + } +} + +// TODO aconway 2006-12-20: DEBUG remove +struct epoll { + epoll(uint32_t e) : events(e) { } + uint32_t events; +}; + +#define BIT(X) out << ((e.events & X) ? __STRING(X) "." : "") +ostream& operator << (ostream& out, epoll e) { + out << "epoll_event.events: "; + BIT(EPOLLIN); + BIT(EPOLLPRI); + BIT(EPOLLOUT); + BIT(EPOLLRDNORM); + BIT(EPOLLRDBAND); + BIT(EPOLLWRNORM); + BIT(EPOLLWRBAND); + BIT(EPOLLMSG); + BIT(EPOLLERR); + BIT(EPOLLHUP); + BIT(EPOLLONESHOT); + BIT(EPOLLET); + return out; } + + +/** + * Wait for epoll to wake up, return the descriptor or 0 on timeout. + */ +Event* EventChannel::Impl::wait(Duration timeoutNs) +{ + { + Monitor::ScopedLock l(monitor); + if (isShutdown) + throw ShutdownException(); + } + + // Increase nWaiters for the duration, notify the monitor if I'm + // the last one out. + // + AtomicCount::ScopedIncrement si( + nWaiters, boost::bind(&Monitor::notifyAll, &monitor)); + + // No lock, all thread safe calls or local variables: + // + const long timeoutMs = + (timeoutNs == TIME_INFINITE) ? -1 : timeoutNs/TIME_MSEC; + CleanStruct ee; + Event* event = 0; + + // Loop till we get a completed event. Some events may repost + // themselves and return 0, e.g. incomplete read or write events. + //TODO aconway 2006-12-20: FIX THIS! + while (!event) { + int n = ::epoll_wait(epollFd, &ee, 1, timeoutMs); // Thread safe. + if (n == 0) // Timeout + return 0; + if (n < 0 && errno == EINTR) // Interrupt, ignore it. + continue; + if (n < 0) + throw QPID_POSIX_ERROR(errno); + assert(n == 1); + Descriptor* ed = + reinterpret_cast(ee.data.ptr); + if (ed == 0) // We're being shut-down. + throw ShutdownException(); + assert(ed != 0); + event = ed->wake(ee.events); + } + return event; +} + +//EventChannel::Descriptor& EventChannel::Impl::getDescriptor(int fd) { +// Mutex::ScopedLock l(monitor); +// Descriptor& ed = descriptors[fd]; +// ed.activate(epollFd, fd); +// return ed; +//} + + // ================================================================ // EventChannel @@ -169,157 +463,134 @@ EventChannel::shared_ptr EventChannel::create() { return shared_ptr(new EventChannel()); } -EventChannel::EventChannel() : handler(new EventHandler()) {} +EventChannel::EventChannel() : impl(new EventChannel::Impl()) {} EventChannel::~EventChannel() {} -void EventChannel::postEvent(Event& e) +void EventChannel::post(Event& e) { - e.prepare(*handler); + e.prepare(*impl); } -Event* EventChannel::getEvent() +Event* EventChannel::wait(Duration timeoutNs) { - static const int infiniteTimeout = -1; - ZeroStruct epollEvent; + return impl->wait(timeoutNs); +} - // Loop until we can complete the event. Some events may re-post - // themselves and return 0 from complete, e.g. partial reads. // - Event* event = 0; - while (event == 0) { - int eventCount = epoll_wait(handler->getEpollFd(), - &epollEvent, 1, infiniteTimeout); - if (eventCount < 0) { - if (errno != EINTR) { - QPID_LOG(warn, "Ignoring error: " - << PosixError::getMessage(errno)); - assert(0); - } - } - else if (eventCount == 1) { - event = reinterpret_cast(epollEvent.data.ptr); - assert(event != 0); - try { - event = event->complete(*handler); - } - catch (const Exception& e) { - if (event) - event->setError(e); - } - catch (const std::exception& e) { - if (event) - event->setError(e); - } - } - } - return event; +void EventChannel::shutdown() { + impl->shutdown(); } + +// ================================================================ +// Event and subclasses. + Event::~Event() {} -void Event::prepare(EventHandler& handler) -{ - handler.mqPut(this); +Exception::shared_ptr_const Event::getException() const { + return exception; } -bool Event::hasError() const { - return error; +void Event::throwIfException() { + if (getException()) + exception->throwSelf(); } -void Event::throwIfError() throw (Exception) { - if (hasError()) - error.throwSelf(); -} - -Event* Event::complete(EventHandler&) +void Event::dispatch() { - return this; + if (!callback.empty()) + callback(); } -void Event::dispatch() -{ +void Event::setException(const std::exception& e) { + const Exception* ex = dynamic_cast(&e); + if (ex) + exception.reset(ex->clone().release()); + else + exception.reset(new Exception(e)); +#ifndef NDEBUG + // Throw and re-catch the exception. Has no effect on the + // program but it triggers debuggers watching for throw. The + // context that sets the exception is more informative for + // debugging purposes than the one that ultimately throws it. + // try { - if (!callback.empty()) - callback(); - } catch (const std::exception&) { - throw; - } catch (...) { - throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception."); + throwIfException(); } + catch (...) { } // Ignored. +#endif } -void Event::setError(const ExceptionHolder& e) { - error = e; +int FDEvent::getFDescriptor() const { + return descriptor.getFD(); } -void ReadEvent::prepare(EventHandler& handler) -{ - handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); +// TODO: AMS 21/12/06 Don't like the inline new, probably cause a memory leak +ReadEvent::ReadEvent(int fd, void* buf, size_t sz,Callback cb, bool noWait) : + IOEvent(cb, *(new EventChannel::Descriptor(fd)), sz, noWait), buffer(buf), bytesRead(0) { } -ssize_t ReadEvent::doRead() { - ssize_t n = ::read(descriptor, static_cast(buffer) + received, - size - received); - if (n > 0) received += n; - return n; +void ReadEvent::prepare(EventChannel::Impl& impl) { + EventChannel::Descriptor& d = getDescriptor(); + impl.activate(d); + d.getQueue(IN).push(this); } -Event* ReadEvent::complete(EventHandler& handler) +void ReadEvent::complete(EventChannel::Descriptor& ed) { - // Read as much as possible without blocking. - ssize_t n = doRead(); - while (n > 0 && received < size) doRead(); - - if (received == size) { - handler.epollDel(descriptor); - received = 0; // Reset for re-use. - return this; - } - else if (n <0 && (errno == EAGAIN)) { - // Keep polling for more. - handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this); - return 0; - } - else { - // Unexpected EOF or error. Throw ENODATA for EOF. - handler.epollDel(descriptor); - received = 0; // Reset for re-use. - throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA); + ssize_t n = ::read(getFDescriptor(), + static_cast(buffer) + bytesRead, + size - bytesRead); + if (n > 0) + bytesRead += n; + if (n == 0 || (n < 0 && errno != EAGAIN)) { + // Use ENODATA for file closed. + setException(QPID_POSIX_ERROR(n == 0 ? ENODATA : errno)); + ed.shutdownUnsafe(); } } -void WriteEvent::prepare(EventHandler& handler) -{ - handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this); +WriteEvent::WriteEvent(int fd, const void* buf, size_t sz, Callback cb) : + IOEvent(cb, *(new EventChannel::Descriptor(fd)), sz, noWait), buffer(buf), bytesWritten(0) { +} + +void WriteEvent::prepare(EventChannel::Impl& impl) { + EventChannel::Descriptor& d = getDescriptor(); + impl.activate(d); + d.getQueue(OUT).push(this); } -Event* WriteEvent::complete(EventHandler& handler) + +void WriteEvent::complete(EventChannel::Descriptor& ed) { - ssize_t n = write(descriptor, static_cast(buffer) + written, - size - written); - if (n < 0) throw QPID_POSIX_ERROR(errno); - written += n; - if(written < size) { - // Keep polling. - handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this); - return 0; + ssize_t n = ::write(getFDescriptor(), + static_cast(buffer) + bytesWritten, + size - bytesWritten); + if (n > 0) + bytesWritten += n; + if(n < 0 && errno != EAGAIN) { + setException(QPID_POSIX_ERROR(errno)); + ed.shutdownUnsafe(); // Called with lock held. } - written = 0; // Reset for re-use. - handler.epollDel(descriptor); - return this; } -void AcceptEvent::prepare(EventHandler& handler) -{ - handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); +AcceptEvent::AcceptEvent(int fd, Callback cb) : + FDEvent(cb, *(new EventChannel::Descriptor(fd))), accepted(0) { +} + +void AcceptEvent::prepare(EventChannel::Impl& impl) { + EventChannel::Descriptor& d = getDescriptor(); + impl.activate(d); + d.getQueue(IN).push(this); } -Event* AcceptEvent::complete(EventHandler& handler) +void AcceptEvent::complete(EventChannel::Descriptor& ed) { - handler.epollDel(descriptor); - accepted = ::accept(descriptor, 0, 0); - if (accepted < 0) throw QPID_POSIX_ERROR(errno); - return this; + accepted = ::accept(getFDescriptor(), 0, 0); + if (accepted < 0) { + setException(QPID_POSIX_ERROR(errno)); + ed.shutdownUnsafe(); // Called with lock held. + } } }} diff --git a/cpp/src/qpid/sys/posix/EventChannel.h b/cpp/src/qpid/sys/posix/EventChannel.h index f465580996..85e121379a 100644 --- a/cpp/src/qpid/sys/posix/EventChannel.h +++ b/cpp/src/qpid/sys/posix/EventChannel.h @@ -20,7 +20,10 @@ */ #include "qpid/SharedObject.h" -#include "qpid/ExceptionHolder.h" +#include "qpid/Exception.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Time.h" + #include #include @@ -28,11 +31,57 @@ namespace qpid { namespace sys { class Event; -class EventHandler; -class EventChannel; + +/** + * Channel to post and wait for events. + */ +class EventChannel : public qpid::SharedObject +{ + public: + static shared_ptr create(); + + /** Exception throw from wait() if channel is shut down. */ + class ShutdownException : public qpid::Exception {}; + + ~EventChannel(); + + /** Post an event to the channel. */ + void post(Event& event); + + /** + * Wait for the next complete event, up to timeout. + *@return Pointer to event or 0 if timeout elapses. + *@exception ShutdownException if the channel is shut down. + */ + Event* wait(Duration timeout = TIME_INFINITE); + + /** + * Shut down the event channel. + * Blocks till all threads have exited wait() + */ + void shutdown(); + + + // Internal classes. + class Impl; + class Queue; + class Descriptor; + + private: + + EventChannel(); + + Mutex lock; + boost::shared_ptr impl; +}; /** * Base class for all Events. + * + * Derived classes define events representing various async IO operations. + * When an event is complete, it is returned by the EventChannel to + * a thread calling wait. The thread will call Event::dispatch() to + * execute code associated with event completion. */ class Event { @@ -40,135 +89,124 @@ class Event /** Type for callback when event is dispatched */ typedef boost::function0 Callback; - /** - * Create an event with optional callback. - * Instances of Event are sent directly through the channel. - * Derived classes define additional waiting behaviour. - *@param cb A callback functor that is invoked when dispatch() is called. - */ - Event(Callback cb = 0) : callback(cb) {} - virtual ~Event(); /** Call the callback provided to the constructor, if any. */ void dispatch(); - /** True if there was an error processing this event */ - bool hasError() const; + /** + *If there was an exception processing this Event, return it. + *@return 0 if there was no exception. + */ + qpid::Exception::shared_ptr_const getException() const; + + /** If getException() throw the corresponding exception. */ + void throwIfException(); - /** If hasError() throw the corresponding exception. */ - void throwIfError() throw(Exception); + /** Set the dispatch callback. */ + void setCallback(Callback cb) { callback = cb; } + + /** Set the exception. */ + void setException(const std::exception& e); protected: - virtual void prepare(EventHandler&); - virtual Event* complete(EventHandler&); - void setError(const ExceptionHolder& e); + Event(Callback cb=0) : callback(cb) {} + + virtual void prepare(EventChannel::Impl&) = 0; + virtual void complete(EventChannel::Descriptor&) = 0; Callback callback; - ExceptionHolder error; + Exception::shared_ptr_const exception; friend class EventChannel; - friend class EventHandler; + friend class EventChannel::Queue; }; -template -class IOEvent : public Event { +/** Base class for events related to a file descriptor */ +class FDEvent : public Event { + public: + EventChannel::Descriptor& getDescriptor() const { return descriptor; } + int getFDescriptor() const; + + protected: + FDEvent(Callback cb, EventChannel::Descriptor& fd) + : Event(cb), descriptor(fd) {} + // TODO AMS: 1/6/07 I really don't think this is correct, but + // the descriptor is immutable + FDEvent& operator=(const FDEvent& rhs) { Event::operator=(rhs); return *this; } + + private: + EventChannel::Descriptor& descriptor; +}; + +/** Base class for read or write events. */ +class IOEvent : public FDEvent { public: - void getDescriptor() const { return descriptor; } size_t getSize() const { return size; } - BufT getBuffer() const { return buffer; } - + protected: - IOEvent(int fd, Callback cb, size_t sz, BufT buf) : - Event(cb), descriptor(fd), buffer(buf), size(sz) {} + IOEvent(Callback cb, EventChannel::Descriptor& fd, size_t sz, bool noWait_) : + FDEvent(cb, fd), size(sz), noWait(noWait_) {} - int descriptor; - BufT buffer; size_t size; + bool noWait; }; /** Asynchronous read event */ -class ReadEvent : public IOEvent +class ReadEvent : public IOEvent { public: - explicit ReadEvent(int fd=-1, void* buf=0, size_t sz=0, Callback cb=0) : - IOEvent(fd, cb, sz, buf), received(0) {} + explicit ReadEvent(int fd, void* buf=0, size_t sz=0,Callback cb=0, bool noWait=false); + void* getBuffer() const { return buffer; } + size_t getBytesRead() const { return bytesRead; } + private: - void prepare(EventHandler&); - Event* complete(EventHandler&); + void prepare(EventChannel::Impl&); + void complete(EventChannel::Descriptor&); ssize_t doRead(); - size_t received; + void* buffer; + size_t bytesRead; }; /** Asynchronous write event */ -class WriteEvent : public IOEvent +class WriteEvent : public IOEvent { public: - explicit WriteEvent(int fd=-1, const void* buf=0, size_t sz=0, - Callback cb=0) : - IOEvent(fd, cb, sz, buf), written(0) {} + explicit WriteEvent(int fd, const void* buf=0, size_t sz=0, Callback cb=0); - protected: - void prepare(EventHandler&); - Event* complete(EventHandler&); + const void* getBuffer() const { return buffer; } + size_t getBytesWritten() const { return bytesWritten; } private: + void prepare(EventChannel::Impl&); + void complete(EventChannel::Descriptor&); ssize_t doWrite(); - size_t written; + + const void* buffer; + size_t bytesWritten; }; + /** Asynchronous socket accept event */ -class AcceptEvent : public Event +class AcceptEvent : public FDEvent { public: /** Accept a connection on fd. */ - explicit AcceptEvent(int fd=-1, Callback cb=0) : - Event(cb), descriptor(fd), accepted(0) {} - - /** Get descriptor for server socket */ + explicit AcceptEvent(int fd, Callback cb=0); + + /** Get descriptor for accepted server socket */ int getAcceptedDesscriptor() const { return accepted; } private: - void prepare(EventHandler&); - Event* complete(EventHandler&); + void prepare(EventChannel::Impl&); + void complete(EventChannel::Descriptor&); - int descriptor; int accepted; }; -class QueueSet; - -/** - * Channel to post and wait for events. - */ -class EventChannel : public qpid::SharedObject -{ - public: - static shared_ptr create(); - - ~EventChannel(); - - /** Post an event to the channel. */ - void postEvent(Event& event); - - /** Post an event to the channel. Must not be 0. */ - void postEvent(Event* event) { postEvent(*event); } - - /** - * Wait for the next complete event. - *@return Pointer to event. Will never return 0. - */ - Event* getEvent(); - - private: - EventChannel(); - boost::shared_ptr handler; -}; - - }} diff --git a/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp b/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp index d5a2c238d9..1a5fceb56e 100644 --- a/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp +++ b/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp @@ -106,7 +106,7 @@ void EventChannelAcceptor::run(ConnectionInputHandlerFactory* f) { if (!isRunning && !isShutdown) { isRunning = true; factory = f; - threads->postEvent(acceptEvent); + threads->post(acceptEvent); } } threads->join(); // Wait for shutdown. @@ -120,7 +120,7 @@ void EventChannelAcceptor::shutdown() { isShutdown = true; } if (doShutdown) { - ::close(acceptEvent.getDescriptor()); + ::close(acceptEvent.getFDescriptor()); threads->shutdown(); for_each(connections.begin(), connections.end(), boost::bind(&EventChannelConnection::close, _1)); @@ -139,11 +139,11 @@ void EventChannelAcceptor::accept() shutdown(); return; } - // TODO aconway 2006-11-29: Need to reap closed connections also. int fd = acceptEvent.getAcceptedDesscriptor(); + threads->post(acceptEvent); // Keep accepting. + // TODO aconway 2006-11-29: Need to reap closed connections also. connections.push_back( new EventChannelConnection(threads, *factory, fd, fd, isTrace)); - threads->postEvent(acceptEvent); // Keep accepting. } }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/posix/EventChannelConnection.cpp b/cpp/src/qpid/sys/posix/EventChannelConnection.cpp index 0c1c81b6fe..a36f096a4d 100644 --- a/cpp/src/qpid/sys/posix/EventChannelConnection.cpp +++ b/cpp/src/qpid/sys/posix/EventChannelConnection.cpp @@ -24,7 +24,6 @@ #include "EventChannelConnection.h" #include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/QpidError.h" -#include "qpid/log/Statement.h" using namespace std; using namespace qpid; @@ -44,6 +43,8 @@ EventChannelConnection::EventChannelConnection( ) : readFd(rfd), writeFd(wfd ? wfd : rfd), + readEvent(readFd), + writeEvent(writeFd), readCallback(boost::bind(&EventChannelConnection::closeOnException, this, &EventChannelConnection::endInitRead)), @@ -55,8 +56,8 @@ EventChannelConnection::EventChannelConnection( out(bufferSize), isTrace(isTrace_) { - BOOST_ASSERT(readFd > 0); - BOOST_ASSERT(writeFd > 0); + assert(readFd > 0); + assert(writeFd > 0); closeOnException(&EventChannelConnection::startRead); } @@ -133,14 +134,17 @@ void EventChannelConnection::startWrite() { } // No need to lock here - only one thread can be writing at a time. out.clear(); - QPID_LOG(trace, "Send on socket " << writeFd << ": " << *frame); + if (isTrace) + cout << "Send on socket " << writeFd << ": " << *frame << endl; frame->encode(out); out.flip(); + // TODO: AMS 1/6/07 This only works because we already have the correct fd + // in the descriptor - change not to use assigment writeEvent = WriteEvent( writeFd, out.start(), out.available(), boost::bind(&EventChannelConnection::closeOnException, this, &EventChannelConnection::endWrite)); - threads->postEvent(writeEvent); + threads->post(writeEvent); } // ScopedBusy ctor increments busyThreads. @@ -161,12 +165,18 @@ void EventChannelConnection::endWrite() { ScopedBusy(*this); { Monitor::ScopedLock lock(monitor); + assert(isWriting); isWriting = false; - if (isClosed) + if (isClosed) return; writeEvent.throwIfException(); + if (writeEvent.getBytesWritten() < writeEvent.getSize()) { + // Keep writing the current event till done. + isWriting = true; + threads->post(writeEvent); + } } - // Check if there's more in to write in the write queue. + // Continue writing from writeFrames queue. startWrite(); } @@ -179,8 +189,8 @@ void EventChannelConnection::endWrite() { void EventChannelConnection::startRead() { // Non blocking read, as much as we can swallow. readEvent = ReadEvent( - readFd, in.start(), in.available(), readCallback,true); - threads->postEvent(readEvent); + readFd, in.start(), in.available(), readCallback); + threads->post(readEvent); } // Completion of initial read, expect protocolInit. @@ -194,7 +204,7 @@ void EventChannelConnection::endInitRead() { in.flip(); ProtocolInitiation protocolInit; if(protocolInit.decode(in)){ - handler->initiated(&protocolInit); + handler->initiated(protocolInit); readCallback = boost::bind( &EventChannelConnection::closeOnException, this, &EventChannelConnection::endRead); @@ -215,8 +225,10 @@ void EventChannelConnection::endRead() { in.flip(); AMQFrame frame; while (frame.decode(in)) { - QPID_LOG(trace, "Received on socket " << readFd - << ": " << frame); + // TODO aconway 2006-11-30: received should take Frame& + if (isTrace) + cout << "Received on socket " << readFd + << ": " << frame << endl; handler->received(&frame); } in.compact(); diff --git a/cpp/src/qpid/sys/posix/EventChannelConnection.h b/cpp/src/qpid/sys/posix/EventChannelConnection.h index bd010a4240..394df55fd9 100644 --- a/cpp/src/qpid/sys/posix/EventChannelConnection.h +++ b/cpp/src/qpid/sys/posix/EventChannelConnection.h @@ -34,7 +34,7 @@ namespace sys { class ConnectionInputHandlerFactory; /** - * Implements ConnectionOutputHandler and delegates to a ConnectionInputHandler + * Implements SessionContext and delegates to a SessionHandler * for a connection via the EventChannel. *@param readDescriptor file descriptor for reading. *@param writeDescriptor file descriptor for writing, @@ -50,7 +50,7 @@ class EventChannelConnection : public ConnectionOutputHandler { bool isTrace = false ); - // TODO aconway 2006-11-30: ConnectionOutputHandler::send should take auto_ptr + // TODO aconway 2006-11-30: SessionContext::send should take auto_ptr virtual void send(qpid::framing::AMQFrame* frame) { send(std::auto_ptr(frame)); } diff --git a/cpp/src/qpid/sys/posix/EventChannelThreads.cpp b/cpp/src/qpid/sys/posix/EventChannelThreads.cpp index 68c57405d5..70954d0c16 100644 --- a/cpp/src/qpid/sys/posix/EventChannelThreads.cpp +++ b/cpp/src/qpid/sys/posix/EventChannelThreads.cpp @@ -16,27 +16,40 @@ * */ -#include "EventChannelThreads.h" -#include "qpid/sys/Runnable.h" -#include "qpid/log/Statement.h" #include -using namespace std; +#include + #include +#include "qpid/sys/Runnable.h" + +#include "EventChannelThreads.h" + namespace qpid { namespace sys { +const size_t EventChannelThreads::unlimited = + std::numeric_limits::max(); + EventChannelThreads::shared_ptr EventChannelThreads::create( - EventChannel::shared_ptr ec) + EventChannel::shared_ptr ec, size_t min, size_t max +) { - return EventChannelThreads::shared_ptr(new EventChannelThreads(ec)); + return EventChannelThreads::shared_ptr( + new EventChannelThreads(ec, min, max)); } -EventChannelThreads::EventChannelThreads(EventChannel::shared_ptr ec) : - channel(ec), nWaiting(0), state(RUNNING) +EventChannelThreads::EventChannelThreads( + EventChannel::shared_ptr ec, size_t min, size_t max) : + minThreads(std::max(size_t(1), min)), + maxThreads(std::min(min, max)), + channel(ec), + nWaiting(0), + state(RUNNING) { - // TODO aconway 2006-11-15: Estimate initial threads based on CPUs. - addThread(); + Monitor::ScopedLock l(monitor); + while (workers.size() < minThreads) + workers.push_back(Thread(*this)); } EventChannelThreads::~EventChannelThreads() { @@ -46,32 +59,30 @@ EventChannelThreads::~EventChannelThreads() { void EventChannelThreads::shutdown() { - ScopedLock lock(*this); + Monitor::ScopedLock lock(monitor); if (state != RUNNING) // Already shutting down. return; - for (size_t i = 0; i < workers.size(); ++i) { - channel->postEvent(terminate); - } - state = TERMINATE_SENT; - notify(); // Wake up one join() thread. + state = TERMINATING; + channel->shutdown(); + monitor.notify(); // Wake up one join() thread. } void EventChannelThreads::join() { { - ScopedLock lock(*this); + Monitor::ScopedLock lock(monitor); while (state == RUNNING) // Wait for shutdown to start. - wait(); + monitor.wait(); if (state == SHUTDOWN) // Shutdown is complete return; if (state == JOINING) { // Someone else is doing the join. while (state != SHUTDOWN) - wait(); + monitor.wait(); return; } // I'm the joining thread - assert(state == TERMINATE_SENT); + assert(state == TERMINATING); state = JOINING; } // Drop the lock. @@ -80,12 +91,13 @@ void EventChannelThreads::join() workers[i].join(); } state = SHUTDOWN; - notifyAll(); // Notify other join() threaeds. + monitor.notifyAll(); // Notify any other join() threads. } void EventChannelThreads::addThread() { - ScopedLock l(*this); - workers.push_back(Thread(*this)); + Monitor::ScopedLock l(monitor); + if (workers.size() < maxThreads) + workers.push_back(Thread(*this)); } void EventChannelThreads::run() @@ -94,23 +106,20 @@ void EventChannelThreads::run() AtomicCount::ScopedIncrement inc(nWaiting); try { while (true) { - Event* e = channel->getEvent(); + Event* e = channel->wait(); assert(e != 0); - if (e == &terminate) { - return; - } AtomicCount::ScopedDecrement dec(nWaiting); - // I'm no longer waiting, make sure someone is. - if (dec == 0) + // Make sure there's at least one waiting thread. + if (dec == 0 && state == RUNNING) addThread(); e->dispatch(); } } - catch (const std::exception& e) { - QPID_LOG(error, e.what()); + catch (const EventChannel::ShutdownException& e) { + return; } - catch (...) { - QPID_LOG(error, "unknown exception"); + catch (const std::exception& e) { + Exception::log(e, "Exception in EventChannelThreads::run()"); } } diff --git a/cpp/src/qpid/sys/posix/EventChannelThreads.h b/cpp/src/qpid/sys/posix/EventChannelThreads.h index 245cefe585..19112cf4db 100644 --- a/cpp/src/qpid/sys/posix/EventChannelThreads.h +++ b/cpp/src/qpid/sys/posix/EventChannelThreads.h @@ -18,14 +18,16 @@ * limitations under the License. * */ -#include +#include "EventChannel.h" #include "qpid/Exception.h" -#include "qpid/sys/Time.h" +#include "qpid/sys/AtomicCount.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Thread.h" -#include "qpid/sys/AtomicCount.h" -#include "EventChannel.h" +#include "qpid/sys/Time.h" +#include "qpid/sys/Runnable.h" + +#include namespace qpid { namespace sys { @@ -33,26 +35,33 @@ namespace sys { /** Dynamic thread pool serving an EventChannel. - Threads run a loop { e = getEvent(); e->dispatch(); } + Threads run a loop { e = wait(); e->dispatch(); } The size of the thread pool is automatically adjusted to optimal size. */ class EventChannelThreads : public qpid::SharedObject, - public sys::Monitor, private sys::Runnable + private sys::Runnable { public: - /** Create the thread pool and start initial threads. */ + /** Constant to represent an unlimited number of threads */ + static const size_t unlimited; + + /** + * Create the thread pool and start initial threads. + * @param minThreads Pool will initialy contain minThreads threads and + * will never shrink to less until shutdown. + * @param maxThreads Pool will never grow to more than maxThreads. + */ static EventChannelThreads::shared_ptr create( - EventChannel::shared_ptr channel + EventChannel::shared_ptr channel = EventChannel::create(), + size_t minThreads = 1, + size_t maxThreads = unlimited ); ~EventChannelThreads(); /** Post event to the underlying channel */ - void postEvent(Event& event) { channel->postEvent(event); } - - /** Post event to the underlying channel Must not be 0. */ - void postEvent(Event* event) { channel->postEvent(event); } + void post(Event& event) { channel->post(event); } /** * Terminate all threads. @@ -68,21 +77,25 @@ class EventChannelThreads : private: typedef std::vector Threads; typedef enum { - RUNNING, TERMINATE_SENT, JOINING, SHUTDOWN + RUNNING, TERMINATING, JOINING, SHUTDOWN } State; - EventChannelThreads(EventChannel::shared_ptr underlyingChannel); + EventChannelThreads( + EventChannel::shared_ptr channel, size_t min, size_t max); + void addThread(); void run(); bool keepRunning(); void adjustThreads(); + Monitor monitor; + size_t minThreads; + size_t maxThreads; EventChannel::shared_ptr channel; Threads workers; sys::AtomicCount nWaiting; State state; - Event terminate; }; diff --git a/cpp/src/qpid/sys/posix/check.h b/cpp/src/qpid/sys/posix/check.h index fe53321e27..7fa7b69d3b 100644 --- a/cpp/src/qpid/sys/posix/check.h +++ b/cpp/src/qpid/sys/posix/check.h @@ -43,7 +43,7 @@ class PosixError : public qpid::QpidError int getErrNo() { return errNo; } - Exception* clone() const throw() { return new PosixError(*this); } + Exception::auto_ptr clone() const throw() { return Exception::auto_ptr(new PosixError(*this)); } void throwSelf() const { throw *this; } @@ -56,6 +56,10 @@ class PosixError : public qpid::QpidError /** Create a PosixError for the current file/line and errno. */ #define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE) +/** Throw QPID_POSIX_ERROR(errno) if RESULT is less than zero */ +#define QPID_POSIX_CHECK(RESULT) \ + if ((RESULT) < 0) throw QPID_POSIX_ERROR((errno)) + /** Throw a posix error if errNo is non-zero */ #define QPID_POSIX_THROW_IF(ERRNO) \ if ((ERRNO) != 0) throw QPID_POSIX_ERROR((ERRNO)) -- cgit v1.2.1