summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/posix/EventChannel.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/posix/EventChannel.h')
-rw-r--r--cpp/src/qpid/sys/posix/EventChannel.h200
1 files changed, 119 insertions, 81 deletions
diff --git a/cpp/src/qpid/sys/posix/EventChannel.h b/cpp/src/qpid/sys/posix/EventChannel.h
index f465580996..85e121379a 100644
--- a/cpp/src/qpid/sys/posix/EventChannel.h
+++ b/cpp/src/qpid/sys/posix/EventChannel.h
@@ -20,7 +20,10 @@
*/
#include "qpid/SharedObject.h"
-#include "qpid/ExceptionHolder.h"
+#include "qpid/Exception.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Time.h"
+
#include <boost/function.hpp>
#include <memory>
@@ -28,11 +31,57 @@ namespace qpid {
namespace sys {
class Event;
-class EventHandler;
-class EventChannel;
+
+/**
+ * Channel to post and wait for events.
+ */
+class EventChannel : public qpid::SharedObject<EventChannel>
+{
+ public:
+ static shared_ptr create();
+
+ /** Exception throw from wait() if channel is shut down. */
+ class ShutdownException : public qpid::Exception {};
+
+ ~EventChannel();
+
+ /** Post an event to the channel. */
+ void post(Event& event);
+
+ /**
+ * Wait for the next complete event, up to timeout.
+ *@return Pointer to event or 0 if timeout elapses.
+ *@exception ShutdownException if the channel is shut down.
+ */
+ Event* wait(Duration timeout = TIME_INFINITE);
+
+ /**
+ * Shut down the event channel.
+ * Blocks till all threads have exited wait()
+ */
+ void shutdown();
+
+
+ // Internal classes.
+ class Impl;
+ class Queue;
+ class Descriptor;
+
+ private:
+
+ EventChannel();
+
+ Mutex lock;
+ boost::shared_ptr<Impl> impl;
+};
/**
* Base class for all Events.
+ *
+ * Derived classes define events representing various async IO operations.
+ * When an event is complete, it is returned by the EventChannel to
+ * a thread calling wait. The thread will call Event::dispatch() to
+ * execute code associated with event completion.
*/
class Event
{
@@ -40,135 +89,124 @@ class Event
/** Type for callback when event is dispatched */
typedef boost::function0<void> Callback;
- /**
- * Create an event with optional callback.
- * Instances of Event are sent directly through the channel.
- * Derived classes define additional waiting behaviour.
- *@param cb A callback functor that is invoked when dispatch() is called.
- */
- Event(Callback cb = 0) : callback(cb) {}
-
virtual ~Event();
/** Call the callback provided to the constructor, if any. */
void dispatch();
- /** True if there was an error processing this event */
- bool hasError() const;
+ /**
+ *If there was an exception processing this Event, return it.
+ *@return 0 if there was no exception.
+ */
+ qpid::Exception::shared_ptr_const getException() const;
+
+ /** If getException() throw the corresponding exception. */
+ void throwIfException();
- /** If hasError() throw the corresponding exception. */
- void throwIfError() throw(Exception);
+ /** Set the dispatch callback. */
+ void setCallback(Callback cb) { callback = cb; }
+
+ /** Set the exception. */
+ void setException(const std::exception& e);
protected:
- virtual void prepare(EventHandler&);
- virtual Event* complete(EventHandler&);
- void setError(const ExceptionHolder& e);
+ Event(Callback cb=0) : callback(cb) {}
+
+ virtual void prepare(EventChannel::Impl&) = 0;
+ virtual void complete(EventChannel::Descriptor&) = 0;
Callback callback;
- ExceptionHolder error;
+ Exception::shared_ptr_const exception;
friend class EventChannel;
- friend class EventHandler;
+ friend class EventChannel::Queue;
};
-template <class BufT>
-class IOEvent : public Event {
+/** Base class for events related to a file descriptor */
+class FDEvent : public Event {
+ public:
+ EventChannel::Descriptor& getDescriptor() const { return descriptor; }
+ int getFDescriptor() const;
+
+ protected:
+ FDEvent(Callback cb, EventChannel::Descriptor& fd)
+ : Event(cb), descriptor(fd) {}
+ // TODO AMS: 1/6/07 I really don't think this is correct, but
+ // the descriptor is immutable
+ FDEvent& operator=(const FDEvent& rhs) { Event::operator=(rhs); return *this; }
+
+ private:
+ EventChannel::Descriptor& descriptor;
+};
+
+/** Base class for read or write events. */
+class IOEvent : public FDEvent {
public:
- void getDescriptor() const { return descriptor; }
size_t getSize() const { return size; }
- BufT getBuffer() const { return buffer; }
-
+
protected:
- IOEvent(int fd, Callback cb, size_t sz, BufT buf) :
- Event(cb), descriptor(fd), buffer(buf), size(sz) {}
+ IOEvent(Callback cb, EventChannel::Descriptor& fd, size_t sz, bool noWait_) :
+ FDEvent(cb, fd), size(sz), noWait(noWait_) {}
- int descriptor;
- BufT buffer;
size_t size;
+ bool noWait;
};
/** Asynchronous read event */
-class ReadEvent : public IOEvent<void*>
+class ReadEvent : public IOEvent
{
public:
- explicit ReadEvent(int fd=-1, void* buf=0, size_t sz=0, Callback cb=0) :
- IOEvent<void*>(fd, cb, sz, buf), received(0) {}
+ explicit ReadEvent(int fd, void* buf=0, size_t sz=0,Callback cb=0, bool noWait=false);
+ void* getBuffer() const { return buffer; }
+ size_t getBytesRead() const { return bytesRead; }
+
private:
- void prepare(EventHandler&);
- Event* complete(EventHandler&);
+ void prepare(EventChannel::Impl&);
+ void complete(EventChannel::Descriptor&);
ssize_t doRead();
- size_t received;
+ void* buffer;
+ size_t bytesRead;
};
/** Asynchronous write event */
-class WriteEvent : public IOEvent<const void*>
+class WriteEvent : public IOEvent
{
public:
- explicit WriteEvent(int fd=-1, const void* buf=0, size_t sz=0,
- Callback cb=0) :
- IOEvent<const void*>(fd, cb, sz, buf), written(0) {}
+ explicit WriteEvent(int fd, const void* buf=0, size_t sz=0, Callback cb=0);
- protected:
- void prepare(EventHandler&);
- Event* complete(EventHandler&);
+ const void* getBuffer() const { return buffer; }
+ size_t getBytesWritten() const { return bytesWritten; }
private:
+ void prepare(EventChannel::Impl&);
+ void complete(EventChannel::Descriptor&);
ssize_t doWrite();
- size_t written;
+
+ const void* buffer;
+ size_t bytesWritten;
};
+
/** Asynchronous socket accept event */
-class AcceptEvent : public Event
+class AcceptEvent : public FDEvent
{
public:
/** Accept a connection on fd. */
- explicit AcceptEvent(int fd=-1, Callback cb=0) :
- Event(cb), descriptor(fd), accepted(0) {}
-
- /** Get descriptor for server socket */
+ explicit AcceptEvent(int fd, Callback cb=0);
+
+ /** Get descriptor for accepted server socket */
int getAcceptedDesscriptor() const { return accepted; }
private:
- void prepare(EventHandler&);
- Event* complete(EventHandler&);
+ void prepare(EventChannel::Impl&);
+ void complete(EventChannel::Descriptor&);
- int descriptor;
int accepted;
};
-class QueueSet;
-
-/**
- * Channel to post and wait for events.
- */
-class EventChannel : public qpid::SharedObject<EventChannel>
-{
- public:
- static shared_ptr create();
-
- ~EventChannel();
-
- /** Post an event to the channel. */
- void postEvent(Event& event);
-
- /** Post an event to the channel. Must not be 0. */
- void postEvent(Event* event) { postEvent(*event); }
-
- /**
- * Wait for the next complete event.
- *@return Pointer to event. Will never return 0.
- */
- Event* getEvent();
-
- private:
- EventChannel();
- boost::shared_ptr<EventHandler> handler;
-};
-
-
}}