diff options
Diffstat (limited to 'cpp/src/qpid/sys/posix/EventChannel.h')
| -rw-r--r-- | cpp/src/qpid/sys/posix/EventChannel.h | 200 |
1 files changed, 119 insertions, 81 deletions
diff --git a/cpp/src/qpid/sys/posix/EventChannel.h b/cpp/src/qpid/sys/posix/EventChannel.h index f465580996..85e121379a 100644 --- a/cpp/src/qpid/sys/posix/EventChannel.h +++ b/cpp/src/qpid/sys/posix/EventChannel.h @@ -20,7 +20,10 @@ */ #include "qpid/SharedObject.h" -#include "qpid/ExceptionHolder.h" +#include "qpid/Exception.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Time.h" + #include <boost/function.hpp> #include <memory> @@ -28,11 +31,57 @@ namespace qpid { namespace sys { class Event; -class EventHandler; -class EventChannel; + +/** + * Channel to post and wait for events. + */ +class EventChannel : public qpid::SharedObject<EventChannel> +{ + public: + static shared_ptr create(); + + /** Exception throw from wait() if channel is shut down. */ + class ShutdownException : public qpid::Exception {}; + + ~EventChannel(); + + /** Post an event to the channel. */ + void post(Event& event); + + /** + * Wait for the next complete event, up to timeout. + *@return Pointer to event or 0 if timeout elapses. + *@exception ShutdownException if the channel is shut down. + */ + Event* wait(Duration timeout = TIME_INFINITE); + + /** + * Shut down the event channel. + * Blocks till all threads have exited wait() + */ + void shutdown(); + + + // Internal classes. + class Impl; + class Queue; + class Descriptor; + + private: + + EventChannel(); + + Mutex lock; + boost::shared_ptr<Impl> impl; +}; /** * Base class for all Events. + * + * Derived classes define events representing various async IO operations. + * When an event is complete, it is returned by the EventChannel to + * a thread calling wait. The thread will call Event::dispatch() to + * execute code associated with event completion. */ class Event { @@ -40,135 +89,124 @@ class Event /** Type for callback when event is dispatched */ typedef boost::function0<void> Callback; - /** - * Create an event with optional callback. - * Instances of Event are sent directly through the channel. - * Derived classes define additional waiting behaviour. - *@param cb A callback functor that is invoked when dispatch() is called. - */ - Event(Callback cb = 0) : callback(cb) {} - virtual ~Event(); /** Call the callback provided to the constructor, if any. */ void dispatch(); - /** True if there was an error processing this event */ - bool hasError() const; + /** + *If there was an exception processing this Event, return it. + *@return 0 if there was no exception. + */ + qpid::Exception::shared_ptr_const getException() const; + + /** If getException() throw the corresponding exception. */ + void throwIfException(); - /** If hasError() throw the corresponding exception. */ - void throwIfError() throw(Exception); + /** Set the dispatch callback. */ + void setCallback(Callback cb) { callback = cb; } + + /** Set the exception. */ + void setException(const std::exception& e); protected: - virtual void prepare(EventHandler&); - virtual Event* complete(EventHandler&); - void setError(const ExceptionHolder& e); + Event(Callback cb=0) : callback(cb) {} + + virtual void prepare(EventChannel::Impl&) = 0; + virtual void complete(EventChannel::Descriptor&) = 0; Callback callback; - ExceptionHolder error; + Exception::shared_ptr_const exception; friend class EventChannel; - friend class EventHandler; + friend class EventChannel::Queue; }; -template <class BufT> -class IOEvent : public Event { +/** Base class for events related to a file descriptor */ +class FDEvent : public Event { + public: + EventChannel::Descriptor& getDescriptor() const { return descriptor; } + int getFDescriptor() const; + + protected: + FDEvent(Callback cb, EventChannel::Descriptor& fd) + : Event(cb), descriptor(fd) {} + // TODO AMS: 1/6/07 I really don't think this is correct, but + // the descriptor is immutable + FDEvent& operator=(const FDEvent& rhs) { Event::operator=(rhs); return *this; } + + private: + EventChannel::Descriptor& descriptor; +}; + +/** Base class for read or write events. */ +class IOEvent : public FDEvent { public: - void getDescriptor() const { return descriptor; } size_t getSize() const { return size; } - BufT getBuffer() const { return buffer; } - + protected: - IOEvent(int fd, Callback cb, size_t sz, BufT buf) : - Event(cb), descriptor(fd), buffer(buf), size(sz) {} + IOEvent(Callback cb, EventChannel::Descriptor& fd, size_t sz, bool noWait_) : + FDEvent(cb, fd), size(sz), noWait(noWait_) {} - int descriptor; - BufT buffer; size_t size; + bool noWait; }; /** Asynchronous read event */ -class ReadEvent : public IOEvent<void*> +class ReadEvent : public IOEvent { public: - explicit ReadEvent(int fd=-1, void* buf=0, size_t sz=0, Callback cb=0) : - IOEvent<void*>(fd, cb, sz, buf), received(0) {} + explicit ReadEvent(int fd, void* buf=0, size_t sz=0,Callback cb=0, bool noWait=false); + void* getBuffer() const { return buffer; } + size_t getBytesRead() const { return bytesRead; } + private: - void prepare(EventHandler&); - Event* complete(EventHandler&); + void prepare(EventChannel::Impl&); + void complete(EventChannel::Descriptor&); ssize_t doRead(); - size_t received; + void* buffer; + size_t bytesRead; }; /** Asynchronous write event */ -class WriteEvent : public IOEvent<const void*> +class WriteEvent : public IOEvent { public: - explicit WriteEvent(int fd=-1, const void* buf=0, size_t sz=0, - Callback cb=0) : - IOEvent<const void*>(fd, cb, sz, buf), written(0) {} + explicit WriteEvent(int fd, const void* buf=0, size_t sz=0, Callback cb=0); - protected: - void prepare(EventHandler&); - Event* complete(EventHandler&); + const void* getBuffer() const { return buffer; } + size_t getBytesWritten() const { return bytesWritten; } private: + void prepare(EventChannel::Impl&); + void complete(EventChannel::Descriptor&); ssize_t doWrite(); - size_t written; + + const void* buffer; + size_t bytesWritten; }; + /** Asynchronous socket accept event */ -class AcceptEvent : public Event +class AcceptEvent : public FDEvent { public: /** Accept a connection on fd. */ - explicit AcceptEvent(int fd=-1, Callback cb=0) : - Event(cb), descriptor(fd), accepted(0) {} - - /** Get descriptor for server socket */ + explicit AcceptEvent(int fd, Callback cb=0); + + /** Get descriptor for accepted server socket */ int getAcceptedDesscriptor() const { return accepted; } private: - void prepare(EventHandler&); - Event* complete(EventHandler&); + void prepare(EventChannel::Impl&); + void complete(EventChannel::Descriptor&); - int descriptor; int accepted; }; -class QueueSet; - -/** - * Channel to post and wait for events. - */ -class EventChannel : public qpid::SharedObject<EventChannel> -{ - public: - static shared_ptr create(); - - ~EventChannel(); - - /** Post an event to the channel. */ - void postEvent(Event& event); - - /** Post an event to the channel. Must not be 0. */ - void postEvent(Event* event) { postEvent(*event); } - - /** - * Wait for the next complete event. - *@return Pointer to event. Will never return 0. - */ - Event* getEvent(); - - private: - EventChannel(); - boost::shared_ptr<EventHandler> handler; -}; - - }} |
