From 45b526ce09daee869ec1313808583f7e05bff7bb Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Mon, 18 Jun 2007 12:11:32 +0000 Subject: Intermediate checkin with preliminary work on epoll based net IO git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@548337 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/sys/Dispatcher.cpp | 90 +++ cpp/src/qpid/sys/Dispatcher.h | 74 +++ cpp/src/qpid/sys/Poller.h | 93 +++ cpp/src/qpid/sys/ScopedIncrement.h | 14 +- cpp/src/qpid/sys/Time.h | 4 + cpp/src/qpid/sys/epoll/EpollPoller.cpp | 263 +++++++++ cpp/src/qpid/sys/posix/EventChannel.cpp | 679 +++++++++++++++------- cpp/src/qpid/sys/posix/EventChannel.h | 200 ++++--- cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp | 8 +- cpp/src/qpid/sys/posix/EventChannelConnection.cpp | 36 +- cpp/src/qpid/sys/posix/EventChannelConnection.h | 4 +- cpp/src/qpid/sys/posix/EventChannelThreads.cpp | 75 +-- cpp/src/qpid/sys/posix/EventChannelThreads.h | 43 +- cpp/src/qpid/sys/posix/check.h | 6 +- 14 files changed, 1234 insertions(+), 355 deletions(-) create mode 100644 cpp/src/qpid/sys/Dispatcher.cpp create mode 100644 cpp/src/qpid/sys/Dispatcher.h create mode 100644 cpp/src/qpid/sys/Poller.h create mode 100644 cpp/src/qpid/sys/epoll/EpollPoller.cpp (limited to 'cpp/src/qpid/sys') diff --git a/cpp/src/qpid/sys/Dispatcher.cpp b/cpp/src/qpid/sys/Dispatcher.cpp new file mode 100644 index 0000000000..4838e5e4cd --- /dev/null +++ b/cpp/src/qpid/sys/Dispatcher.cpp @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Dispatcher.h" + +#include + +namespace qpid { +namespace sys { + +Dispatcher::Dispatcher(Poller::shared_ptr poller0) : + poller(poller0) { +} + +Dispatcher::~Dispatcher() { +} + +void Dispatcher::run() { + do { + Poller::Event event = poller->wait(); + // Poller::wait guarantees to return an event + DispatchHandle* h = static_cast(event.handle); + switch (event.dir) { + case Poller::IN: + h->readableCallback(*h); + break; + case Poller::OUT: + h->writableCallback(*h); + break; + case Poller::INOUT: + h->readableCallback(*h); + h->writableCallback(*h); + break; + case Poller::SHUTDOWN: + goto dispatcher_shutdown; + default: + ; + } + } while (true); + +dispatcher_shutdown: + ; +} + +void DispatchHandle::watch(Poller::shared_ptr poller0) { + bool r = readableCallback; + bool w = writableCallback; + + // If no callbacks set then do nothing (that is what we were asked to do!) + // TODO: Maybe this should be an assert instead + if (!r && !w) + return; + + Poller::Direction d = r ? + (w ? Poller::INOUT : Poller::IN) : + Poller::OUT; + + poller = poller0; + poller->addFd(*this, d); +} + +void DispatchHandle::rewatch() { + assert(poller); + poller->rearmFd(*this); +} + +void DispatchHandle::unwatch() { + poller->delFd(*this); + poller.reset(); +} + +}} diff --git a/cpp/src/qpid/sys/Dispatcher.h b/cpp/src/qpid/sys/Dispatcher.h new file mode 100644 index 0000000000..c0b010eb39 --- /dev/null +++ b/cpp/src/qpid/sys/Dispatcher.h @@ -0,0 +1,74 @@ +#ifndef _sys_Dispatcher_h +#define _sys_Dispatcher_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include "Poller.h" +#include "Runnable.h" + +#include +#include + +#include + + +namespace qpid { +namespace sys { + +class Dispatcher; +class DispatchHandle : public PollerHandle { + friend class Dispatcher; +public: + typedef boost::function1 Callback; + +private: + Callback readableCallback; + Callback writableCallback; + Poller::shared_ptr poller; + +public: + + DispatchHandle(int fd, Callback rCb, Callback wCb) : + PollerHandle(fd), + readableCallback(rCb), + writableCallback(wCb) + {} + + void watch(Poller::shared_ptr poller); + void rewatch(); + void unwatch(); +}; + +class Dispatcher : public Runnable { + const Poller::shared_ptr poller; + +public: + Dispatcher(Poller::shared_ptr poller); + ~Dispatcher(); + + void run(); +}; + +}} + +#endif // _sys_Dispatcher_h diff --git a/cpp/src/qpid/sys/Poller.h b/cpp/src/qpid/sys/Poller.h new file mode 100644 index 0000000000..6fedd669a0 --- /dev/null +++ b/cpp/src/qpid/sys/Poller.h @@ -0,0 +1,93 @@ +#ifndef _sys_Poller_h +#define _sys_Poller_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Time.h" + +#include + +#include + +namespace qpid { +namespace sys { + +/** + * Handle class to use for polling + */ +class Poller; +class PollerHandlePrivate; +class PollerHandle { + friend class Poller; + + PollerHandlePrivate* impl; + const int fd; + +public: + PollerHandle(int fd0); + virtual ~PollerHandle(); + + int getFD() const { return fd; } +}; + +/** + * Poller: abstract class to encapsulate a file descriptor poll to be used + * by a reactor + */ +class PollerPrivate; +class Poller { + PollerPrivate* impl; + +public: + typedef boost::shared_ptr shared_ptr; + + enum Direction { + NONE, + IN, + OUT, + INOUT, + SHUTDOWN + }; + + struct Event { + PollerHandle* handle; + Direction dir; + + Event(PollerHandle* handle0, Direction dir0) : + handle(handle0), + dir(dir0) { + } + }; + + Poller(); + ~Poller(); + void shutdown(); + + void addFd(PollerHandle& handle, Direction dir); + void delFd(PollerHandle& handle); + void modFd(PollerHandle& handle, Direction dir); + void rearmFd(PollerHandle& handle); + Event wait(Duration timeout = TIME_INFINITE); +}; + +}} +#endif // _sys_Poller_h diff --git a/cpp/src/qpid/sys/ScopedIncrement.h b/cpp/src/qpid/sys/ScopedIncrement.h index f14461ddaf..ba9e89ba5f 100644 --- a/cpp/src/qpid/sys/ScopedIncrement.h +++ b/cpp/src/qpid/sys/ScopedIncrement.h @@ -20,19 +20,27 @@ */ #include +#include namespace qpid { namespace sys { -/** Increment counter in constructor and decrement in destructor. */ +/** + * Increment counter in constructor and decrement in destructor. + * Optionally call a function if the decremented counter value is 0. + * Note the function must not throw, it is called in the destructor. + */ template class ScopedIncrement : boost::noncopyable { public: - ScopedIncrement(T& c) : count(c) { ++count; } - ~ScopedIncrement() { --count; } + ScopedIncrement(T& c, boost::function0 f=0) + : count(c), callback(f) { ++count; } + ~ScopedIncrement() { if (--count == 0 && callback) callback(); } + private: T& count; + boost::function0 callback; }; diff --git a/cpp/src/qpid/sys/Time.h b/cpp/src/qpid/sys/Time.h index 4bb65e9f4a..25b1606844 100644 --- a/cpp/src/qpid/sys/Time.h +++ b/cpp/src/qpid/sys/Time.h @@ -97,8 +97,12 @@ const Duration TIME_USEC = 1000; /** Nanoseconds per nanosecond. */ const Duration TIME_NSEC = 1; +/** Value to represent an infinite timeout */ +const Duration TIME_INFINITE = std::numeric_limits::max(); + /** Time greater than any other time */ const AbsTime FAR_FUTURE = AbsTime::FarFuture(); + }} #endif /*!_sys_Time_h*/ diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp new file mode 100644 index 0000000000..65b2255023 --- /dev/null +++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -0,0 +1,263 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Poller.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/posix/check.h" + +#include +#include + +#include +#include +#include + +namespace qpid { +namespace sys { + +class PollerHandlePrivate { + friend class Poller; + friend class PollerHandle; + + enum FDStat { + ABSENT, + MONITORED, + INACTIVE + }; + + ::__uint32_t events; + FDStat stat; + Mutex lock; + + PollerHandlePrivate() : + events(0), + stat(ABSENT) { + } +}; + +PollerHandle::PollerHandle(int fd0) : + impl(new PollerHandlePrivate), + fd(fd0) +{} + +PollerHandle::~PollerHandle() { + delete impl; +} + +/** + * Concrete implementation of Poller to use the Linux specific epoll + * interface + */ +class PollerPrivate { + friend class Poller; + + static const int DefaultFds = 256; + + struct ReadablePipe { + int fds[2]; + + /** + * This encapsulates an always readable pipe which we can add + * to the epoll set to force epoll_wait to return + */ + ReadablePipe() { + QPID_POSIX_CHECK(::pipe(fds)); + // Just write the pipe's fds to the pipe + QPID_POSIX_CHECK(::write(fds[1], fds, 2)); + } + + ~ReadablePipe() { + ::close(fds[0]); + ::close(fds[1]); + } + + int getFD() { + return fds[0]; + } + }; + + static ReadablePipe alwaysReadable; + + const int epollFd; + bool isShutdown; + + static ::__uint32_t directionToEpollEvent(Poller::Direction dir) { + switch (dir) { + case Poller::IN: return ::EPOLLIN; + case Poller::OUT: return ::EPOLLOUT; + case Poller::INOUT: return ::EPOLLIN | ::EPOLLOUT; + default: return 0; + } + } + + static Poller::Direction epollToDirection(::__uint32_t events) { + ::__uint32_t e = events & (::EPOLLIN | ::EPOLLOUT); + switch (e) { + case ::EPOLLIN: return Poller::IN; + case ::EPOLLOUT: return Poller::OUT; + case ::EPOLLIN | ::EPOLLOUT: return Poller::INOUT; + default: return Poller::NONE; + } + } + + PollerPrivate() : + epollFd(::epoll_create(DefaultFds)), + isShutdown(false) { + QPID_POSIX_CHECK(epollFd); + } + + ~PollerPrivate() { + // It's probably okay to ignore any errors here as there can't be data loss + ::close(epollFd); + } +}; + +PollerPrivate::ReadablePipe PollerPrivate::alwaysReadable; + +void Poller::addFd(PollerHandle& handle, Direction dir) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock l(eh.lock); + ::epoll_event epe; + int op; + + if (eh.stat == PollerHandlePrivate::ABSENT) { + op = EPOLL_CTL_ADD; + epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT; + } else { + assert(eh.stat == PollerHandlePrivate::MONITORED); + op = EPOLL_CTL_MOD; + epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir); + } + epe.data.ptr = &handle; + + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, handle.getFD(), &epe)); + + // Record monitoring state of this fd + eh.events = epe.events; + eh.stat = PollerHandlePrivate::MONITORED; +} + +void Poller::delFd(PollerHandle& handle) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock l(eh.lock); + assert(eh.stat != PollerHandlePrivate::ABSENT); + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, handle.getFD(), 0)); + eh.stat = PollerHandlePrivate::ABSENT; +} + +// modFd is equivalent to delFd followed by addFd +void Poller::modFd(PollerHandle& handle, Direction dir) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock l(eh.lock); + assert(eh.stat != PollerHandlePrivate::ABSENT); + + ::epoll_event epe; + epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT; + epe.data.ptr = &handle; + + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, handle.getFD(), &epe)); + + // Record monitoring state of this fd + eh.events = epe.events; + eh.stat = PollerHandlePrivate::MONITORED; +} + +void Poller::rearmFd(PollerHandle& handle) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock l(eh.lock); + assert(eh.stat == PollerHandlePrivate::INACTIVE); + + ::epoll_event epe; + epe.events = eh.events; + epe.data.ptr = &handle; + + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, handle.getFD(), &epe)); + + eh.stat = PollerHandlePrivate::MONITORED; +} + +void Poller::shutdown() { + // Don't use any locking here - isshutdown will be visible to all + // after the epoll_ctl() anyway (it's a memory barrier) + impl->isShutdown = true; + + // Add always readable fd to epoll (not EPOLLONESHOT) + int fd = impl->alwaysReadable.getFD(); + ::epoll_event epe; + epe.events = ::EPOLLIN; + epe.data.ptr = 0; + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, fd, &epe)); +} + +Poller::Event Poller::wait(Duration timeout) { + epoll_event epe; + int timeoutMs = (timeout == TIME_INFINITE) ? -1 : timeout / TIME_MSEC; + + // Repeat until we weren't interupted + do { + int rc = ::epoll_wait(impl->epollFd, &epe, 1, timeoutMs); + + if (impl->isShutdown) { + return Event(0, SHUTDOWN); + } + + if (rc ==-1 && errno != EINTR) { + QPID_POSIX_CHECK(rc); + } else if (rc > 0) { + assert(rc == 1); + PollerHandle* handle = static_cast(epe.data.ptr); + PollerHandlePrivate& eh = *handle->impl; + + ScopedLock l(eh.lock); + + // the handle could have gone inactive since we left the epoll_wait + if (eh.stat == PollerHandlePrivate::MONITORED) { + eh.stat = PollerHandlePrivate::INACTIVE; + return Event(handle, PollerPrivate::epollToDirection(epe.events)); + } + } + // We only get here if one of the following: + // * epoll_wait was interrupted by a signal + // * epoll_wait timed out + // * the state of the handle changed after being returned by epoll_wait + // + // The only things we can do here are return a timeout or wait more. + // Obviously if we timed out we return timeout; if the wait was meant to + // be indefinite then we should never return with a time out so we go again. + // If the wait wasn't indefinite, but we were interrupted then we have to return + // with a timeout as we don't know how long we've waited so far and so we can't + // continue the wait. + if (rc == 0 || timeoutMs == -1) { + return Event(0, NONE); + } + } while (true); +} + +// Concrete constructors +Poller::Poller() : + impl(new PollerPrivate()) +{} + +Poller::~Poller() { + delete impl; +} + +}} diff --git a/cpp/src/qpid/sys/posix/EventChannel.cpp b/cpp/src/qpid/sys/posix/EventChannel.cpp index 6db397a165..d35eedf5a5 100644 --- a/cpp/src/qpid/sys/posix/EventChannel.cpp +++ b/cpp/src/qpid/sys/posix/EventChannel.cpp @@ -1,4 +1,4 @@ -/* +/* * * Copyright (c) 2006 The Apache Software Foundation * @@ -16,6 +16,19 @@ * */ +// TODO aconway 2006-12-15: Locking review. + +// TODO aconway 2006-12-15: use Descriptor pointers everywhere, +// get them from channel, pass them to Event constructors. +// Eliminate lookup. + + +#include "EventChannel.h" +#include "check.h" + +#include "qpid/QpidError.h" +#include "qpid/sys/AtomicCount.h" + #include #include #include @@ -29,139 +42,420 @@ #include #include -#include - -#include "qpid/QpidError.h" -#include "qpid/sys/Monitor.h" -#include "qpid/log/Statement.h" - -#include "check.h" -#include "EventChannel.h" +#include +#include using namespace std; -// Convenience template to zero out a struct. -template struct ZeroStruct : public S { - ZeroStruct() { memset(this, 0, sizeof(*this)); } -}; - namespace qpid { namespace sys { +// ================================================================ +// Private class declarations + +namespace { + +typedef enum { IN, OUT } Direction; + +typedef std::pair EventPair; + /** - * EventHandler wraps an epoll file descriptor. Acts as private - * interface between EventChannel and subclasses. - * - * Also implements Event interface for events that are not associated - * with a file descriptor and are passed via the message queue. - */ -class EventHandler : public Event, private Monitor + * Template to zero out a C-struct on construction. Avoids uninitialized memory + * warnings from valgrind or other mem checking tool. + */ +template struct CleanStruct : public T { + CleanStruct() { memset(this, 0, sizeof(*this)); } +}; + +} // namespace + +/** + * Queue of events corresponding to one IO direction (IN or OUT). + * Each Descriptor contains two Queues. + */ +class EventChannel::Queue : private boost::noncopyable { public: - EventHandler(int epollSize = 256); - ~EventHandler(); + Queue(Descriptor& container, Direction dir); - int getEpollFd() { return epollFd; } - void epollAdd(int fd, uint32_t epollEvents, Event* event); - void epollMod(int fd, uint32_t epollEvents, Event* event); - void epollDel(int fd); + /** Called by Event classes in prepare() */ + void push(Event* e); - void mqPut(Event* event); - Event* mqGet(); - - protected: - // Should never be called, only complete. - void prepare(EventHandler&) { assert(0); } - Event* complete(EventHandler& eh); + /** Called when epoll wakes. + *@return The next completed event or 0. + */ + Event* wake(uint32_t epollFlags); + + Event* pop() { Event* e = queue.front(); queue.pop_front(); return e; } + + bool empty() { return queue.empty(); } + + void setBit(uint32_t &epollFlags); + + void shutdown(); private: + typedef std::deque EventQ; + + inline bool isMyEvent(uint32_t flags) { return flags | myEvent; } + + Mutex& lock; // Shared with Descriptor. + Descriptor& descriptor; + uint32_t myEvent; // Epoll event flag. + EventQ queue; +}; + + +/** + * Manages a file descriptor in an epoll set. + * + * Can be shutdown and re-activated for the same file descriptor. + */ +class EventChannel::Descriptor : private boost::noncopyable { + public: + explicit Descriptor(int fd) : epollFd(-1), myFd(fd), + inQueue(*this, IN), outQueue(*this, OUT) {} + + void activate(int epollFd_); + + /** Epoll woke up for this descriptor. */ + Event* wake(uint32_t epollEvents); + + /** Shut down: close and remove file descriptor. + * May be re-activated if fd is reused. + */ + void shutdown(); + + // TODO aconway 2006-12-18: Nasty. Need to clean up interaction. + void shutdownUnsafe(); + + bool isShutdown() { return epollFd == -1; } + + Queue& getQueue(Direction d) { return d==IN ? inQueue : outQueue; } + int getFD() const { return myFd; } + + private: + void update(); + void epollCtl(int op, uint32_t events); + Queue* pick(); + + Mutex lock; int epollFd; - std::string mqName; - int mqFd; - std::queue mqEvents; + int myFd; + Queue inQueue, outQueue; + bool preferIn; + + friend class Queue; }; -EventHandler::EventHandler(int epollSize) -{ - epollFd = epoll_create(epollSize); - if (epollFd < 0) throw QPID_POSIX_ERROR(errno); + +/** + * Holds a map of Descriptors, which do most of the work. + */ +class EventChannel::Impl { + public: + Impl(int size = 256); + + ~Impl(); + + /** + * Activate descriptor + */ + void activate(Descriptor& d) { + d.activate(epollFd); + } + + /** Wait for an event, return 0 on timeout */ + Event* wait(Duration timeout); - // Create a POSIX message queue for non-fd events. - // We write one byte and never read it is always ready for read - // when we add it to epoll. + void shutdown(); + + private: + + Monitor monitor; + int epollFd; + int shutdownPipe[2]; + AtomicCount nWaiters; + bool isShutdown; +}; + + +// ================================================================ +// EventChannel::Queue::implementation. + +static const char* shutdownMsg = "Event queue shut down."; + +EventChannel::Queue::Queue(Descriptor& d, Direction dir) : + lock(d.lock), descriptor(d), + myEvent(dir==IN ? EPOLLIN : EPOLLOUT) +{} + +void EventChannel::Queue::push(Event* e) { + Mutex::ScopedLock l(lock); + if (descriptor.isShutdown()) + THROW_QPID_ERROR(INTERNAL_ERROR, shutdownMsg); + queue.push_back(e); + descriptor.update(); +} + +void EventChannel::Queue::setBit(uint32_t &epollFlags) { + if (queue.empty()) + epollFlags &= ~myEvent; + else + epollFlags |= myEvent; +} + +// TODO aconway 2006-12-20: REMOVE +Event* EventChannel::Queue::wake(uint32_t epollFlags) { + // Called with lock held. + if (!queue.empty() && (isMyEvent(epollFlags))) { + assert(!queue.empty()); + Event* e = queue.front(); + assert(e); + if (!e->getException()) { + // TODO aconway 2006-12-20: Can/should we move event completion + // out into dispatch() so it doesn't happen in Descriptor locks? + e->complete(descriptor); + } + queue.pop_front(); + return e; + } + return 0; +} + +void EventChannel::Queue::shutdown() { + // Mark all pending events with a shutdown exception. + // The server threads will remove and dispatch the events. // - ZeroStruct attr; - attr.mq_maxmsg = 1; - attr.mq_msgsize = 1; - do { - char tmpnam[L_tmpnam]; - tmpnam_r(tmpnam); - mqName = tmpnam + 4; // Skip "tmp/" - mqFd = mq_open( - mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr); - if (mqFd < 0) throw QPID_POSIX_ERROR(errno); - } while (mqFd == EEXIST); // Name already taken, try again. + qpid::QpidError ex(INTERNAL_ERROR, shutdownMsg, SRCLINE); + for_each(queue.begin(), queue.end(), + boost::bind(&Event::setException, _1, ex)); +} - static char zero = '\0'; - mq_send(mqFd, &zero, 1, 0); - epollAdd(mqFd, 0, this); + +// ================================================================ +// Descriptor + + +void EventChannel::Descriptor::activate(int epollFd_) { + Mutex::ScopedLock l(lock); + if (isShutdown()) { + epollFd = epollFd_; // We're back in business. + epollCtl(EPOLL_CTL_ADD, 0); + } } -EventHandler::~EventHandler() { - mq_close(mqFd); - mq_unlink(mqName.c_str()); +void EventChannel::Descriptor::shutdown() { + Mutex::ScopedLock l(lock); + shutdownUnsafe(); } -void EventHandler::mqPut(Event* event) { - ScopedLock l(*this); - assert(event != 0); - mqEvents.push(event); - epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); +void EventChannel::Descriptor::shutdownUnsafe() { + // Caller holds lock. + ::close(myFd); + epollFd = -1; // Mark myself as shutdown. + inQueue.shutdown(); + outQueue.shutdown(); +} + +// TODO aconway 2006-12-20: Inline into wake(). +void EventChannel::Descriptor::update() { + // Caller holds lock. + if (isShutdown()) // Nothing to do + return; + uint32_t events = EPOLLONESHOT | EPOLLERR | EPOLLHUP; + inQueue.setBit(events); + outQueue.setBit(events); + epollCtl(EPOLL_CTL_MOD, events); +} + +void EventChannel::Descriptor::epollCtl(int op, uint32_t events) { + // Caller holds lock + assert(!isShutdown()); + CleanStruct ee; + ee.data.ptr = this; + ee.events = events; + int status = ::epoll_ctl(epollFd, op, myFd, &ee); + if (status < 0) { + if (errno == EEXIST) // It's okay to add an existing fd + return; + else if (errno == EBADF) // FD was closed externally. + shutdownUnsafe(); + else + throw QPID_POSIX_ERROR(errno); + } } + -Event* EventHandler::mqGet() { - ScopedLock l(*this); - if (mqEvents.empty()) +EventChannel::Queue* EventChannel::Descriptor::pick() { + if (inQueue.empty() && outQueue.empty()) return 0; - Event* event = mqEvents.front(); - mqEvents.pop(); - if(!mqEvents.empty()) - epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); - return event; + if (inQueue.empty() || outQueue.empty()) + return !inQueue.empty() ? &inQueue : &outQueue; + // Neither is empty, pick fairly. + preferIn = !preferIn; + return preferIn ? &inQueue : &outQueue; } -void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event) -{ - ZeroStruct ee; - ee.data.ptr = event; - ee.events = epollEvents; - if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0) - throw QPID_POSIX_ERROR(errno); +Event* EventChannel::Descriptor::wake(uint32_t epollEvents) { + Mutex::ScopedLock l(lock); + // On error, shut down the Descriptor and both queues. + if (epollEvents & (EPOLLERR | EPOLLHUP)) { + shutdownUnsafe(); + // TODO aconway 2006-12-20: This error handling models means + // that any error reported by epoll will result in a shutdown + // exception on the events. Can we get more accurate error + // reporting somehow? + } + Queue*q = 0; + bool in = (epollEvents & EPOLLIN); + bool out = (epollEvents & EPOLLOUT); + if ((in && out) || isShutdown()) + q = pick(); // Choose fairly, either non-empty queue. + else if (in) + q = &inQueue; + else if (out) + q = &outQueue; + Event* e = (q && !q->empty()) ? q->pop() : 0; + update(); + if (e) + e->complete(*this); + return e; } -void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event) + + +// ================================================================ +// EventChannel::Impl + + +EventChannel::Impl::Impl(int epollSize): + epollFd(-1), isShutdown(false) { - ZeroStruct ee; - ee.data.ptr = event; - ee.events = epollEvents; - if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0) - throw QPID_POSIX_ERROR(errno); + // Create the epoll file descriptor. + epollFd = epoll_create(epollSize); + QPID_POSIX_CHECK(epollFd); + + // Create a pipe and write a single byte. The byte is never + // read so the pipes read fd is always ready for read. + // We activate the FD when there are messages in the queue. + QPID_POSIX_CHECK(::pipe(shutdownPipe)); + static char zero = '\0'; + QPID_POSIX_CHECK(::write(shutdownPipe[1], &zero, 1)); } -void EventHandler::epollDel(int fd) { - if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0) - throw QPID_POSIX_ERROR(errno); +EventChannel::Impl::~Impl() { + shutdown(); + ::close(epollFd); + ::close(shutdownPipe[0]); + ::close(shutdownPipe[1]); } -Event* EventHandler::complete(EventHandler& eh) -{ - assert(&eh == this); - Event* event = mqGet(); - return event==0 ? 0 : event->complete(eh); + +void EventChannel::Impl::shutdown() { + Monitor::ScopedLock l(monitor); + if (!isShutdown) { // I'm starting shutdown. + isShutdown = true; + if (nWaiters == 0) + return; + + // TODO aconway 2006-12-20: If I just close the epollFd will + // that wake all threads? If so with what? Would be simpler than: + + CleanStruct ee; + ee.data.ptr = 0; + ee.events = EPOLLIN; + QPID_POSIX_CHECK( + epoll_ctl(epollFd, EPOLL_CTL_ADD, shutdownPipe[0], &ee)); + } + // Wait for nWaiters to get out. + while (nWaiters > 0) { + monitor.wait(); + } +} + +// TODO aconway 2006-12-20: DEBUG remove +struct epoll { + epoll(uint32_t e) : events(e) { } + uint32_t events; +}; + +#define BIT(X) out << ((e.events & X) ? __STRING(X) "." : "") +ostream& operator << (ostream& out, epoll e) { + out << "epoll_event.events: "; + BIT(EPOLLIN); + BIT(EPOLLPRI); + BIT(EPOLLOUT); + BIT(EPOLLRDNORM); + BIT(EPOLLRDBAND); + BIT(EPOLLWRNORM); + BIT(EPOLLWRBAND); + BIT(EPOLLMSG); + BIT(EPOLLERR); + BIT(EPOLLHUP); + BIT(EPOLLONESHOT); + BIT(EPOLLET); + return out; } + + +/** + * Wait for epoll to wake up, return the descriptor or 0 on timeout. + */ +Event* EventChannel::Impl::wait(Duration timeoutNs) +{ + { + Monitor::ScopedLock l(monitor); + if (isShutdown) + throw ShutdownException(); + } + + // Increase nWaiters for the duration, notify the monitor if I'm + // the last one out. + // + AtomicCount::ScopedIncrement si( + nWaiters, boost::bind(&Monitor::notifyAll, &monitor)); + + // No lock, all thread safe calls or local variables: + // + const long timeoutMs = + (timeoutNs == TIME_INFINITE) ? -1 : timeoutNs/TIME_MSEC; + CleanStruct ee; + Event* event = 0; + + // Loop till we get a completed event. Some events may repost + // themselves and return 0, e.g. incomplete read or write events. + //TODO aconway 2006-12-20: FIX THIS! + while (!event) { + int n = ::epoll_wait(epollFd, &ee, 1, timeoutMs); // Thread safe. + if (n == 0) // Timeout + return 0; + if (n < 0 && errno == EINTR) // Interrupt, ignore it. + continue; + if (n < 0) + throw QPID_POSIX_ERROR(errno); + assert(n == 1); + Descriptor* ed = + reinterpret_cast(ee.data.ptr); + if (ed == 0) // We're being shut-down. + throw ShutdownException(); + assert(ed != 0); + event = ed->wake(ee.events); + } + return event; +} + +//EventChannel::Descriptor& EventChannel::Impl::getDescriptor(int fd) { +// Mutex::ScopedLock l(monitor); +// Descriptor& ed = descriptors[fd]; +// ed.activate(epollFd, fd); +// return ed; +//} + + // ================================================================ // EventChannel @@ -169,157 +463,134 @@ EventChannel::shared_ptr EventChannel::create() { return shared_ptr(new EventChannel()); } -EventChannel::EventChannel() : handler(new EventHandler()) {} +EventChannel::EventChannel() : impl(new EventChannel::Impl()) {} EventChannel::~EventChannel() {} -void EventChannel::postEvent(Event& e) +void EventChannel::post(Event& e) { - e.prepare(*handler); + e.prepare(*impl); } -Event* EventChannel::getEvent() +Event* EventChannel::wait(Duration timeoutNs) { - static const int infiniteTimeout = -1; - ZeroStruct epollEvent; + return impl->wait(timeoutNs); +} - // Loop until we can complete the event. Some events may re-post - // themselves and return 0 from complete, e.g. partial reads. // - Event* event = 0; - while (event == 0) { - int eventCount = epoll_wait(handler->getEpollFd(), - &epollEvent, 1, infiniteTimeout); - if (eventCount < 0) { - if (errno != EINTR) { - QPID_LOG(warn, "Ignoring error: " - << PosixError::getMessage(errno)); - assert(0); - } - } - else if (eventCount == 1) { - event = reinterpret_cast(epollEvent.data.ptr); - assert(event != 0); - try { - event = event->complete(*handler); - } - catch (const Exception& e) { - if (event) - event->setError(e); - } - catch (const std::exception& e) { - if (event) - event->setError(e); - } - } - } - return event; +void EventChannel::shutdown() { + impl->shutdown(); } + +// ================================================================ +// Event and subclasses. + Event::~Event() {} -void Event::prepare(EventHandler& handler) -{ - handler.mqPut(this); +Exception::shared_ptr_const Event::getException() const { + return exception; } -bool Event::hasError() const { - return error; +void Event::throwIfException() { + if (getException()) + exception->throwSelf(); } -void Event::throwIfError() throw (Exception) { - if (hasError()) - error.throwSelf(); -} - -Event* Event::complete(EventHandler&) +void Event::dispatch() { - return this; + if (!callback.empty()) + callback(); } -void Event::dispatch() -{ +void Event::setException(const std::exception& e) { + const Exception* ex = dynamic_cast(&e); + if (ex) + exception.reset(ex->clone().release()); + else + exception.reset(new Exception(e)); +#ifndef NDEBUG + // Throw and re-catch the exception. Has no effect on the + // program but it triggers debuggers watching for throw. The + // context that sets the exception is more informative for + // debugging purposes than the one that ultimately throws it. + // try { - if (!callback.empty()) - callback(); - } catch (const std::exception&) { - throw; - } catch (...) { - throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception."); + throwIfException(); } + catch (...) { } // Ignored. +#endif } -void Event::setError(const ExceptionHolder& e) { - error = e; +int FDEvent::getFDescriptor() const { + return descriptor.getFD(); } -void ReadEvent::prepare(EventHandler& handler) -{ - handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); +// TODO: AMS 21/12/06 Don't like the inline new, probably cause a memory leak +ReadEvent::ReadEvent(int fd, void* buf, size_t sz,Callback cb, bool noWait) : + IOEvent(cb, *(new EventChannel::Descriptor(fd)), sz, noWait), buffer(buf), bytesRead(0) { } -ssize_t ReadEvent::doRead() { - ssize_t n = ::read(descriptor, static_cast(buffer) + received, - size - received); - if (n > 0) received += n; - return n; +void ReadEvent::prepare(EventChannel::Impl& impl) { + EventChannel::Descriptor& d = getDescriptor(); + impl.activate(d); + d.getQueue(IN).push(this); } -Event* ReadEvent::complete(EventHandler& handler) +void ReadEvent::complete(EventChannel::Descriptor& ed) { - // Read as much as possible without blocking. - ssize_t n = doRead(); - while (n > 0 && received < size) doRead(); - - if (received == size) { - handler.epollDel(descriptor); - received = 0; // Reset for re-use. - return this; - } - else if (n <0 && (errno == EAGAIN)) { - // Keep polling for more. - handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this); - return 0; - } - else { - // Unexpected EOF or error. Throw ENODATA for EOF. - handler.epollDel(descriptor); - received = 0; // Reset for re-use. - throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA); + ssize_t n = ::read(getFDescriptor(), + static_cast(buffer) + bytesRead, + size - bytesRead); + if (n > 0) + bytesRead += n; + if (n == 0 || (n < 0 && errno != EAGAIN)) { + // Use ENODATA for file closed. + setException(QPID_POSIX_ERROR(n == 0 ? ENODATA : errno)); + ed.shutdownUnsafe(); } } -void WriteEvent::prepare(EventHandler& handler) -{ - handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this); +WriteEvent::WriteEvent(int fd, const void* buf, size_t sz, Callback cb) : + IOEvent(cb, *(new EventChannel::Descriptor(fd)), sz, noWait), buffer(buf), bytesWritten(0) { +} + +void WriteEvent::prepare(EventChannel::Impl& impl) { + EventChannel::Descriptor& d = getDescriptor(); + impl.activate(d); + d.getQueue(OUT).push(this); } -Event* WriteEvent::complete(EventHandler& handler) + +void WriteEvent::complete(EventChannel::Descriptor& ed) { - ssize_t n = write(descriptor, static_cast(buffer) + written, - size - written); - if (n < 0) throw QPID_POSIX_ERROR(errno); - written += n; - if(written < size) { - // Keep polling. - handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this); - return 0; + ssize_t n = ::write(getFDescriptor(), + static_cast(buffer) + bytesWritten, + size - bytesWritten); + if (n > 0) + bytesWritten += n; + if(n < 0 && errno != EAGAIN) { + setException(QPID_POSIX_ERROR(errno)); + ed.shutdownUnsafe(); // Called with lock held. } - written = 0; // Reset for re-use. - handler.epollDel(descriptor); - return this; } -void AcceptEvent::prepare(EventHandler& handler) -{ - handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); +AcceptEvent::AcceptEvent(int fd, Callback cb) : + FDEvent(cb, *(new EventChannel::Descriptor(fd))), accepted(0) { +} + +void AcceptEvent::prepare(EventChannel::Impl& impl) { + EventChannel::Descriptor& d = getDescriptor(); + impl.activate(d); + d.getQueue(IN).push(this); } -Event* AcceptEvent::complete(EventHandler& handler) +void AcceptEvent::complete(EventChannel::Descriptor& ed) { - handler.epollDel(descriptor); - accepted = ::accept(descriptor, 0, 0); - if (accepted < 0) throw QPID_POSIX_ERROR(errno); - return this; + accepted = ::accept(getFDescriptor(), 0, 0); + if (accepted < 0) { + setException(QPID_POSIX_ERROR(errno)); + ed.shutdownUnsafe(); // Called with lock held. + } } }} diff --git a/cpp/src/qpid/sys/posix/EventChannel.h b/cpp/src/qpid/sys/posix/EventChannel.h index f465580996..85e121379a 100644 --- a/cpp/src/qpid/sys/posix/EventChannel.h +++ b/cpp/src/qpid/sys/posix/EventChannel.h @@ -20,7 +20,10 @@ */ #include "qpid/SharedObject.h" -#include "qpid/ExceptionHolder.h" +#include "qpid/Exception.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Time.h" + #include #include @@ -28,11 +31,57 @@ namespace qpid { namespace sys { class Event; -class EventHandler; -class EventChannel; + +/** + * Channel to post and wait for events. + */ +class EventChannel : public qpid::SharedObject +{ + public: + static shared_ptr create(); + + /** Exception throw from wait() if channel is shut down. */ + class ShutdownException : public qpid::Exception {}; + + ~EventChannel(); + + /** Post an event to the channel. */ + void post(Event& event); + + /** + * Wait for the next complete event, up to timeout. + *@return Pointer to event or 0 if timeout elapses. + *@exception ShutdownException if the channel is shut down. + */ + Event* wait(Duration timeout = TIME_INFINITE); + + /** + * Shut down the event channel. + * Blocks till all threads have exited wait() + */ + void shutdown(); + + + // Internal classes. + class Impl; + class Queue; + class Descriptor; + + private: + + EventChannel(); + + Mutex lock; + boost::shared_ptr impl; +}; /** * Base class for all Events. + * + * Derived classes define events representing various async IO operations. + * When an event is complete, it is returned by the EventChannel to + * a thread calling wait. The thread will call Event::dispatch() to + * execute code associated with event completion. */ class Event { @@ -40,135 +89,124 @@ class Event /** Type for callback when event is dispatched */ typedef boost::function0 Callback; - /** - * Create an event with optional callback. - * Instances of Event are sent directly through the channel. - * Derived classes define additional waiting behaviour. - *@param cb A callback functor that is invoked when dispatch() is called. - */ - Event(Callback cb = 0) : callback(cb) {} - virtual ~Event(); /** Call the callback provided to the constructor, if any. */ void dispatch(); - /** True if there was an error processing this event */ - bool hasError() const; + /** + *If there was an exception processing this Event, return it. + *@return 0 if there was no exception. + */ + qpid::Exception::shared_ptr_const getException() const; + + /** If getException() throw the corresponding exception. */ + void throwIfException(); - /** If hasError() throw the corresponding exception. */ - void throwIfError() throw(Exception); + /** Set the dispatch callback. */ + void setCallback(Callback cb) { callback = cb; } + + /** Set the exception. */ + void setException(const std::exception& e); protected: - virtual void prepare(EventHandler&); - virtual Event* complete(EventHandler&); - void setError(const ExceptionHolder& e); + Event(Callback cb=0) : callback(cb) {} + + virtual void prepare(EventChannel::Impl&) = 0; + virtual void complete(EventChannel::Descriptor&) = 0; Callback callback; - ExceptionHolder error; + Exception::shared_ptr_const exception; friend class EventChannel; - friend class EventHandler; + friend class EventChannel::Queue; }; -template -class IOEvent : public Event { +/** Base class for events related to a file descriptor */ +class FDEvent : public Event { + public: + EventChannel::Descriptor& getDescriptor() const { return descriptor; } + int getFDescriptor() const; + + protected: + FDEvent(Callback cb, EventChannel::Descriptor& fd) + : Event(cb), descriptor(fd) {} + // TODO AMS: 1/6/07 I really don't think this is correct, but + // the descriptor is immutable + FDEvent& operator=(const FDEvent& rhs) { Event::operator=(rhs); return *this; } + + private: + EventChannel::Descriptor& descriptor; +}; + +/** Base class for read or write events. */ +class IOEvent : public FDEvent { public: - void getDescriptor() const { return descriptor; } size_t getSize() const { return size; } - BufT getBuffer() const { return buffer; } - + protected: - IOEvent(int fd, Callback cb, size_t sz, BufT buf) : - Event(cb), descriptor(fd), buffer(buf), size(sz) {} + IOEvent(Callback cb, EventChannel::Descriptor& fd, size_t sz, bool noWait_) : + FDEvent(cb, fd), size(sz), noWait(noWait_) {} - int descriptor; - BufT buffer; size_t size; + bool noWait; }; /** Asynchronous read event */ -class ReadEvent : public IOEvent +class ReadEvent : public IOEvent { public: - explicit ReadEvent(int fd=-1, void* buf=0, size_t sz=0, Callback cb=0) : - IOEvent(fd, cb, sz, buf), received(0) {} + explicit ReadEvent(int fd, void* buf=0, size_t sz=0,Callback cb=0, bool noWait=false); + void* getBuffer() const { return buffer; } + size_t getBytesRead() const { return bytesRead; } + private: - void prepare(EventHandler&); - Event* complete(EventHandler&); + void prepare(EventChannel::Impl&); + void complete(EventChannel::Descriptor&); ssize_t doRead(); - size_t received; + void* buffer; + size_t bytesRead; }; /** Asynchronous write event */ -class WriteEvent : public IOEvent +class WriteEvent : public IOEvent { public: - explicit WriteEvent(int fd=-1, const void* buf=0, size_t sz=0, - Callback cb=0) : - IOEvent(fd, cb, sz, buf), written(0) {} + explicit WriteEvent(int fd, const void* buf=0, size_t sz=0, Callback cb=0); - protected: - void prepare(EventHandler&); - Event* complete(EventHandler&); + const void* getBuffer() const { return buffer; } + size_t getBytesWritten() const { return bytesWritten; } private: + void prepare(EventChannel::Impl&); + void complete(EventChannel::Descriptor&); ssize_t doWrite(); - size_t written; + + const void* buffer; + size_t bytesWritten; }; + /** Asynchronous socket accept event */ -class AcceptEvent : public Event +class AcceptEvent : public FDEvent { public: /** Accept a connection on fd. */ - explicit AcceptEvent(int fd=-1, Callback cb=0) : - Event(cb), descriptor(fd), accepted(0) {} - - /** Get descriptor for server socket */ + explicit AcceptEvent(int fd, Callback cb=0); + + /** Get descriptor for accepted server socket */ int getAcceptedDesscriptor() const { return accepted; } private: - void prepare(EventHandler&); - Event* complete(EventHandler&); + void prepare(EventChannel::Impl&); + void complete(EventChannel::Descriptor&); - int descriptor; int accepted; }; -class QueueSet; - -/** - * Channel to post and wait for events. - */ -class EventChannel : public qpid::SharedObject -{ - public: - static shared_ptr create(); - - ~EventChannel(); - - /** Post an event to the channel. */ - void postEvent(Event& event); - - /** Post an event to the channel. Must not be 0. */ - void postEvent(Event* event) { postEvent(*event); } - - /** - * Wait for the next complete event. - *@return Pointer to event. Will never return 0. - */ - Event* getEvent(); - - private: - EventChannel(); - boost::shared_ptr handler; -}; - - }} diff --git a/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp b/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp index d5a2c238d9..1a5fceb56e 100644 --- a/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp +++ b/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp @@ -106,7 +106,7 @@ void EventChannelAcceptor::run(ConnectionInputHandlerFactory* f) { if (!isRunning && !isShutdown) { isRunning = true; factory = f; - threads->postEvent(acceptEvent); + threads->post(acceptEvent); } } threads->join(); // Wait for shutdown. @@ -120,7 +120,7 @@ void EventChannelAcceptor::shutdown() { isShutdown = true; } if (doShutdown) { - ::close(acceptEvent.getDescriptor()); + ::close(acceptEvent.getFDescriptor()); threads->shutdown(); for_each(connections.begin(), connections.end(), boost::bind(&EventChannelConnection::close, _1)); @@ -139,11 +139,11 @@ void EventChannelAcceptor::accept() shutdown(); return; } - // TODO aconway 2006-11-29: Need to reap closed connections also. int fd = acceptEvent.getAcceptedDesscriptor(); + threads->post(acceptEvent); // Keep accepting. + // TODO aconway 2006-11-29: Need to reap closed connections also. connections.push_back( new EventChannelConnection(threads, *factory, fd, fd, isTrace)); - threads->postEvent(acceptEvent); // Keep accepting. } }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/posix/EventChannelConnection.cpp b/cpp/src/qpid/sys/posix/EventChannelConnection.cpp index 0c1c81b6fe..a36f096a4d 100644 --- a/cpp/src/qpid/sys/posix/EventChannelConnection.cpp +++ b/cpp/src/qpid/sys/posix/EventChannelConnection.cpp @@ -24,7 +24,6 @@ #include "EventChannelConnection.h" #include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/QpidError.h" -#include "qpid/log/Statement.h" using namespace std; using namespace qpid; @@ -44,6 +43,8 @@ EventChannelConnection::EventChannelConnection( ) : readFd(rfd), writeFd(wfd ? wfd : rfd), + readEvent(readFd), + writeEvent(writeFd), readCallback(boost::bind(&EventChannelConnection::closeOnException, this, &EventChannelConnection::endInitRead)), @@ -55,8 +56,8 @@ EventChannelConnection::EventChannelConnection( out(bufferSize), isTrace(isTrace_) { - BOOST_ASSERT(readFd > 0); - BOOST_ASSERT(writeFd > 0); + assert(readFd > 0); + assert(writeFd > 0); closeOnException(&EventChannelConnection::startRead); } @@ -133,14 +134,17 @@ void EventChannelConnection::startWrite() { } // No need to lock here - only one thread can be writing at a time. out.clear(); - QPID_LOG(trace, "Send on socket " << writeFd << ": " << *frame); + if (isTrace) + cout << "Send on socket " << writeFd << ": " << *frame << endl; frame->encode(out); out.flip(); + // TODO: AMS 1/6/07 This only works because we already have the correct fd + // in the descriptor - change not to use assigment writeEvent = WriteEvent( writeFd, out.start(), out.available(), boost::bind(&EventChannelConnection::closeOnException, this, &EventChannelConnection::endWrite)); - threads->postEvent(writeEvent); + threads->post(writeEvent); } // ScopedBusy ctor increments busyThreads. @@ -161,12 +165,18 @@ void EventChannelConnection::endWrite() { ScopedBusy(*this); { Monitor::ScopedLock lock(monitor); + assert(isWriting); isWriting = false; - if (isClosed) + if (isClosed) return; writeEvent.throwIfException(); + if (writeEvent.getBytesWritten() < writeEvent.getSize()) { + // Keep writing the current event till done. + isWriting = true; + threads->post(writeEvent); + } } - // Check if there's more in to write in the write queue. + // Continue writing from writeFrames queue. startWrite(); } @@ -179,8 +189,8 @@ void EventChannelConnection::endWrite() { void EventChannelConnection::startRead() { // Non blocking read, as much as we can swallow. readEvent = ReadEvent( - readFd, in.start(), in.available(), readCallback,true); - threads->postEvent(readEvent); + readFd, in.start(), in.available(), readCallback); + threads->post(readEvent); } // Completion of initial read, expect protocolInit. @@ -194,7 +204,7 @@ void EventChannelConnection::endInitRead() { in.flip(); ProtocolInitiation protocolInit; if(protocolInit.decode(in)){ - handler->initiated(&protocolInit); + handler->initiated(protocolInit); readCallback = boost::bind( &EventChannelConnection::closeOnException, this, &EventChannelConnection::endRead); @@ -215,8 +225,10 @@ void EventChannelConnection::endRead() { in.flip(); AMQFrame frame; while (frame.decode(in)) { - QPID_LOG(trace, "Received on socket " << readFd - << ": " << frame); + // TODO aconway 2006-11-30: received should take Frame& + if (isTrace) + cout << "Received on socket " << readFd + << ": " << frame << endl; handler->received(&frame); } in.compact(); diff --git a/cpp/src/qpid/sys/posix/EventChannelConnection.h b/cpp/src/qpid/sys/posix/EventChannelConnection.h index bd010a4240..394df55fd9 100644 --- a/cpp/src/qpid/sys/posix/EventChannelConnection.h +++ b/cpp/src/qpid/sys/posix/EventChannelConnection.h @@ -34,7 +34,7 @@ namespace sys { class ConnectionInputHandlerFactory; /** - * Implements ConnectionOutputHandler and delegates to a ConnectionInputHandler + * Implements SessionContext and delegates to a SessionHandler * for a connection via the EventChannel. *@param readDescriptor file descriptor for reading. *@param writeDescriptor file descriptor for writing, @@ -50,7 +50,7 @@ class EventChannelConnection : public ConnectionOutputHandler { bool isTrace = false ); - // TODO aconway 2006-11-30: ConnectionOutputHandler::send should take auto_ptr + // TODO aconway 2006-11-30: SessionContext::send should take auto_ptr virtual void send(qpid::framing::AMQFrame* frame) { send(std::auto_ptr(frame)); } diff --git a/cpp/src/qpid/sys/posix/EventChannelThreads.cpp b/cpp/src/qpid/sys/posix/EventChannelThreads.cpp index 68c57405d5..70954d0c16 100644 --- a/cpp/src/qpid/sys/posix/EventChannelThreads.cpp +++ b/cpp/src/qpid/sys/posix/EventChannelThreads.cpp @@ -16,27 +16,40 @@ * */ -#include "EventChannelThreads.h" -#include "qpid/sys/Runnable.h" -#include "qpid/log/Statement.h" #include -using namespace std; +#include + #include +#include "qpid/sys/Runnable.h" + +#include "EventChannelThreads.h" + namespace qpid { namespace sys { +const size_t EventChannelThreads::unlimited = + std::numeric_limits::max(); + EventChannelThreads::shared_ptr EventChannelThreads::create( - EventChannel::shared_ptr ec) + EventChannel::shared_ptr ec, size_t min, size_t max +) { - return EventChannelThreads::shared_ptr(new EventChannelThreads(ec)); + return EventChannelThreads::shared_ptr( + new EventChannelThreads(ec, min, max)); } -EventChannelThreads::EventChannelThreads(EventChannel::shared_ptr ec) : - channel(ec), nWaiting(0), state(RUNNING) +EventChannelThreads::EventChannelThreads( + EventChannel::shared_ptr ec, size_t min, size_t max) : + minThreads(std::max(size_t(1), min)), + maxThreads(std::min(min, max)), + channel(ec), + nWaiting(0), + state(RUNNING) { - // TODO aconway 2006-11-15: Estimate initial threads based on CPUs. - addThread(); + Monitor::ScopedLock l(monitor); + while (workers.size() < minThreads) + workers.push_back(Thread(*this)); } EventChannelThreads::~EventChannelThreads() { @@ -46,32 +59,30 @@ EventChannelThreads::~EventChannelThreads() { void EventChannelThreads::shutdown() { - ScopedLock lock(*this); + Monitor::ScopedLock lock(monitor); if (state != RUNNING) // Already shutting down. return; - for (size_t i = 0; i < workers.size(); ++i) { - channel->postEvent(terminate); - } - state = TERMINATE_SENT; - notify(); // Wake up one join() thread. + state = TERMINATING; + channel->shutdown(); + monitor.notify(); // Wake up one join() thread. } void EventChannelThreads::join() { { - ScopedLock lock(*this); + Monitor::ScopedLock lock(monitor); while (state == RUNNING) // Wait for shutdown to start. - wait(); + monitor.wait(); if (state == SHUTDOWN) // Shutdown is complete return; if (state == JOINING) { // Someone else is doing the join. while (state != SHUTDOWN) - wait(); + monitor.wait(); return; } // I'm the joining thread - assert(state == TERMINATE_SENT); + assert(state == TERMINATING); state = JOINING; } // Drop the lock. @@ -80,12 +91,13 @@ void EventChannelThreads::join() workers[i].join(); } state = SHUTDOWN; - notifyAll(); // Notify other join() threaeds. + monitor.notifyAll(); // Notify any other join() threads. } void EventChannelThreads::addThread() { - ScopedLock l(*this); - workers.push_back(Thread(*this)); + Monitor::ScopedLock l(monitor); + if (workers.size() < maxThreads) + workers.push_back(Thread(*this)); } void EventChannelThreads::run() @@ -94,23 +106,20 @@ void EventChannelThreads::run() AtomicCount::ScopedIncrement inc(nWaiting); try { while (true) { - Event* e = channel->getEvent(); + Event* e = channel->wait(); assert(e != 0); - if (e == &terminate) { - return; - } AtomicCount::ScopedDecrement dec(nWaiting); - // I'm no longer waiting, make sure someone is. - if (dec == 0) + // Make sure there's at least one waiting thread. + if (dec == 0 && state == RUNNING) addThread(); e->dispatch(); } } - catch (const std::exception& e) { - QPID_LOG(error, e.what()); + catch (const EventChannel::ShutdownException& e) { + return; } - catch (...) { - QPID_LOG(error, "unknown exception"); + catch (const std::exception& e) { + Exception::log(e, "Exception in EventChannelThreads::run()"); } } diff --git a/cpp/src/qpid/sys/posix/EventChannelThreads.h b/cpp/src/qpid/sys/posix/EventChannelThreads.h index 245cefe585..19112cf4db 100644 --- a/cpp/src/qpid/sys/posix/EventChannelThreads.h +++ b/cpp/src/qpid/sys/posix/EventChannelThreads.h @@ -18,14 +18,16 @@ * limitations under the License. * */ -#include +#include "EventChannel.h" #include "qpid/Exception.h" -#include "qpid/sys/Time.h" +#include "qpid/sys/AtomicCount.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Thread.h" -#include "qpid/sys/AtomicCount.h" -#include "EventChannel.h" +#include "qpid/sys/Time.h" +#include "qpid/sys/Runnable.h" + +#include namespace qpid { namespace sys { @@ -33,26 +35,33 @@ namespace sys { /** Dynamic thread pool serving an EventChannel. - Threads run a loop { e = getEvent(); e->dispatch(); } + Threads run a loop { e = wait(); e->dispatch(); } The size of the thread pool is automatically adjusted to optimal size. */ class EventChannelThreads : public qpid::SharedObject, - public sys::Monitor, private sys::Runnable + private sys::Runnable { public: - /** Create the thread pool and start initial threads. */ + /** Constant to represent an unlimited number of threads */ + static const size_t unlimited; + + /** + * Create the thread pool and start initial threads. + * @param minThreads Pool will initialy contain minThreads threads and + * will never shrink to less until shutdown. + * @param maxThreads Pool will never grow to more than maxThreads. + */ static EventChannelThreads::shared_ptr create( - EventChannel::shared_ptr channel + EventChannel::shared_ptr channel = EventChannel::create(), + size_t minThreads = 1, + size_t maxThreads = unlimited ); ~EventChannelThreads(); /** Post event to the underlying channel */ - void postEvent(Event& event) { channel->postEvent(event); } - - /** Post event to the underlying channel Must not be 0. */ - void postEvent(Event* event) { channel->postEvent(event); } + void post(Event& event) { channel->post(event); } /** * Terminate all threads. @@ -68,21 +77,25 @@ class EventChannelThreads : private: typedef std::vector Threads; typedef enum { - RUNNING, TERMINATE_SENT, JOINING, SHUTDOWN + RUNNING, TERMINATING, JOINING, SHUTDOWN } State; - EventChannelThreads(EventChannel::shared_ptr underlyingChannel); + EventChannelThreads( + EventChannel::shared_ptr channel, size_t min, size_t max); + void addThread(); void run(); bool keepRunning(); void adjustThreads(); + Monitor monitor; + size_t minThreads; + size_t maxThreads; EventChannel::shared_ptr channel; Threads workers; sys::AtomicCount nWaiting; State state; - Event terminate; }; diff --git a/cpp/src/qpid/sys/posix/check.h b/cpp/src/qpid/sys/posix/check.h index fe53321e27..7fa7b69d3b 100644 --- a/cpp/src/qpid/sys/posix/check.h +++ b/cpp/src/qpid/sys/posix/check.h @@ -43,7 +43,7 @@ class PosixError : public qpid::QpidError int getErrNo() { return errNo; } - Exception* clone() const throw() { return new PosixError(*this); } + Exception::auto_ptr clone() const throw() { return Exception::auto_ptr(new PosixError(*this)); } void throwSelf() const { throw *this; } @@ -56,6 +56,10 @@ class PosixError : public qpid::QpidError /** Create a PosixError for the current file/line and errno. */ #define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE) +/** Throw QPID_POSIX_ERROR(errno) if RESULT is less than zero */ +#define QPID_POSIX_CHECK(RESULT) \ + if ((RESULT) < 0) throw QPID_POSIX_ERROR((errno)) + /** Throw a posix error if errNo is non-zero */ #define QPID_POSIX_THROW_IF(ERRNO) \ if ((ERRNO) != 0) throw QPID_POSIX_ERROR((ERRNO)) -- cgit v1.2.1