diff options
| author | Alan Conway <aconway@apache.org> | 2007-08-21 23:35:23 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-08-21 23:35:23 +0000 |
| commit | 9ef0c3dc8bc5ef4af668a3c19f8e254fb5e01ada (patch) | |
| tree | b8afa05ed63e9d2d05392df406053c0d69d2fc36 /cpp/src/qpid | |
| parent | 9ebcf9839197cafe78beb8dfa14b803bd78f5a5e (diff) | |
| download | qpid-python-9ef0c3dc8bc5ef4af668a3c19f8e254fb5e01ada.tar.gz | |
* src/qpid/sys/Serializer.h, .cpp:
Template Serializer on functor for execute().
Old Serializer equivalent to Serializer<boost::function<void()> >
* src/qpid/broker/BrokerQueue.h, .cpp:
Use hand-written functor for Serializer instead of boost::function.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@568332 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.h | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/Serializer.cpp | 71 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/Serializer.h | 130 |
4 files changed, 119 insertions, 94 deletions
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index 706179fb52..5ff9f950eb 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -50,7 +50,7 @@ Queue::Queue(const string& _name, bool _autodelete, exclusive(0), persistenceId(0), serializer(false), - dispatchCallback(boost::bind(&Queue::dispatch, this)) + dispatchCallback(*this) { } diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h index 35aa954c1e..962c11d8ee 100644 --- a/cpp/src/qpid/broker/BrokerQueue.h +++ b/cpp/src/qpid/broker/BrokerQueue.h @@ -60,6 +60,12 @@ namespace qpid { typedef std::vector<Consumer*> Consumers; typedef std::deque<Message::shared_ptr> Messages; + struct DispatchFunctor { + Queue& queue; + DispatchFunctor(Queue& q) : queue(q) {} + void operator()() { queue.dispatch(); } + }; + const string name; const bool autodelete; MessageStore* const store; @@ -75,8 +81,8 @@ namespace qpid { std::auto_ptr<QueuePolicy> policy; QueueBindings bindings; boost::shared_ptr<Exchange> alternateExchange; - qpid::sys::Serializer serializer; - qpid::sys::Serializer::Task dispatchCallback; + qpid::sys::Serializer<DispatchFunctor> serializer; + DispatchFunctor dispatchCallback; void pop(); void push(Message::shared_ptr& msg); diff --git a/cpp/src/qpid/sys/Serializer.cpp b/cpp/src/qpid/sys/Serializer.cpp index faf94a0f93..76dfaa6f6a 100644 --- a/cpp/src/qpid/sys/Serializer.cpp +++ b/cpp/src/qpid/sys/Serializer.cpp @@ -29,14 +29,14 @@ namespace qpid { namespace sys { -Serializer::Serializer(bool allowImmediate, Task notifyDispatchFn) +SerializerBase::SerializerBase(bool allowImmediate, VoidFn0 notifyDispatchFn) : state(IDLE), immediate(allowImmediate), notifyDispatch(notifyDispatchFn) { if (notifyDispatch.empty()) - notifyDispatch = boost::bind(&Serializer::notifyWorker, this); + notifyDispatch = boost::bind(&SerializerBase::notifyWorker, this); } -Serializer::~Serializer() { +SerializerBase::~SerializerBase() { { Mutex::ScopedLock l(lock); state = SHUTDOWN; @@ -46,75 +46,14 @@ Serializer::~Serializer() { worker.join(); } -void Serializer::dispatch(Task& task) { - Mutex::ScopedUnlock u(lock); - // Preconditions: lock is held, state is EXECUTING or DISPATCHING - assert(state != IDLE); - assert(state != SHUTDOWN); - assert(state == EXECUTING || state == DISPATCHING); - try { - task(); - } catch (const std::exception& e) { - QPID_LOG(critical, "Unexpected exception in Serializer::dispatch" - << e.what()); - assert(0); // Should not happen. - } catch (...) { - QPID_LOG(critical, "Unexpected exception in Serializer::dispatch."); - assert(0); // Should not happen. - } -} - -void Serializer::execute(Task& task) { - bool needNotify = false; - { - Mutex::ScopedLock l(lock); - assert(state != SHUTDOWN); - if (immediate && state == IDLE) { - state = EXECUTING; - dispatch(task); - if (state != SHUTDOWN) { - assert(state == EXECUTING); - state = IDLE; - } - } - else - queue.push_back(task); - - if (!queue.empty() && state == IDLE) { - state = DISPATCHING; - needNotify = true; - } - } - if (needNotify) - notifyDispatch(); // Not my function, call outside lock. -} - -void Serializer::dispatch() { - Mutex::ScopedLock l(lock); - // TODO aconway 2007-07-16: This loop could be unbounded - // if other threads add work while we're in dispatch(Task&). - // If we need to bound it we could dispatch just the elements - // that were enqueued when dispatch() was first called - save - // begin() iterator and pop only up to that. - while (!queue.empty() && state != SHUTDOWN) { - assert(state == DISPATCHING); - dispatch(queue.front()); - queue.pop_front(); - } - if (state != SHUTDOWN) { - assert(state == DISPATCHING); - state = IDLE; - } -} - -void Serializer::notifyWorker() { +void SerializerBase::notifyWorker() { if (!worker.id()) worker = Thread(*this); else lock.notify(); } -void Serializer::run() { +void SerializerBase::run() { Mutex::ScopedLock l(lock); while (state != SHUTDOWN) { dispatch(); diff --git a/cpp/src/qpid/sys/Serializer.h b/cpp/src/qpid/sys/Serializer.h index 337686cca0..085d51d7e2 100644 --- a/cpp/src/qpid/sys/Serializer.h +++ b/cpp/src/qpid/sys/Serializer.h @@ -36,29 +36,69 @@ namespace qpid { namespace sys { +/** Abstract base class for Serializer below. */ +class SerializerBase : private boost::noncopyable, private Runnable +{ + public: + typedef boost::function<void()> VoidFn0; + /** @see Serializer::Serializer */ + SerializerBase(bool immediate=true, VoidFn0 notifyDispatch=VoidFn0()); + + virtual ~SerializerBase(); + + virtual void dispatch() = 0; + protected: + enum State { + IDLE, ///< No threads are active. + EXECUTING, ///< execute() is executing a single task. + DISPATCHING, ///< dispatch() is draining the queue. + SHUTDOWN ///< SerailizerBase is being destroyed. + }; + + void notifyWorker(); + void run(); + virtual bool empty() = 0; + + Monitor lock; + State state; + bool immediate; + Thread worker; + boost::function<void()> notifyDispatch; +}; + + /** * Execute tasks sequentially, queuing tasks when necessary to * ensure only one thread at a time executes a task and tasks * are executed in order. + * + * Task is a void returning 0-arg functor. It must not throw exceptions. + * + * Note we deliberately do not use boost::function as the task type + * because copying a boost::functor allocates the target object on the + * heap. */ -class Serializer : private boost::noncopyable, private Runnable -{ - public: - typedef boost::function<void()> Task; +template <class Task> +class Serializer : public SerializerBase { + + std::deque<Task> queue; + bool empty() { return queue.empty(); } + void dispatch(Task& task); + + public: /** Start a serializer. * * @param notifyDispatch Called when work is pending and there is no * active dispatch thread. Must arrange for dispatch() to be called * in some thread other than the calling thread and return. - * By default the Serializer supplies its own dispatch thread. + * By default the Serailizer supplies its own dispatch thread. * * @param immediate Allow execute() to execute a task immediatly * in the current thread. */ - Serializer(bool immediate=true, Task notifyDispatch=Task()); - - ~Serializer(); + Serializer(bool immediate=true, VoidFn0 notifyDispatch=VoidFn0()) + : SerializerBase(immediate, notifyDispatch) {} /** * Task may be executed immediately in the calling thread if there @@ -68,33 +108,73 @@ class Serializer : private boost::noncopyable, private Runnable */ void execute(Task& task); + /** Execute pending tasks sequentially in calling thread. * Drains the task queue and returns, does not block for more tasks. * * @exception ShutdownException if the serializer is being destroyed. */ void dispatch(); - - private: - enum State { - IDLE, ///< No threads are active. - EXECUTING, ///< execute() is executing a single task. - DISPATCHING, ///< dispatch() is draining the queue. - SHUTDOWN ///< Serializer is being destroyed. }; - void dispatch(Task&); - void notifyWorker(); - void run(); - Monitor lock; +template <class Task> +void Serializer<Task>::execute(Task& task) { + bool needNotify = false; + { + Mutex::ScopedLock l(lock); + assert(state != SHUTDOWN); + if (immediate && state == IDLE) { + state = EXECUTING; + dispatch(task); + if (state != SHUTDOWN) { + assert(state == EXECUTING); + state = IDLE; + } + } + else + queue.push_back(task); + if (!queue.empty() && state == IDLE) { + state = DISPATCHING; + needNotify = true; + } + } + if (needNotify) + notifyDispatch(); // Not my function, call outside lock. +} + +template <class Task> +void Serializer<Task>::dispatch() { + Mutex::ScopedLock l(lock); + // TODO aconway 2007-07-16: This loop could be unbounded + // if other threads add work while we're in dispatch(Task&). + // If we need to bound it we could dispatch just the elements + // that were enqueued when dispatch() was first called - save + // begin() iterator and pop only up to that. + while (!queue.empty() && state != SHUTDOWN) { + assert(state == DISPATCHING); + dispatch(queue.front()); + queue.pop_front(); + } + if (state != SHUTDOWN) { + assert(state == DISPATCHING); + state = IDLE; + } +} + +template <class Task> +void Serializer<Task>::dispatch(Task& task) { + Mutex::ScopedUnlock u(lock); + // Preconditions: lock is held, state is EXECUTING or DISPATCHING + assert(state != IDLE); + assert(state != SHUTDOWN); + assert(state == EXECUTING || state == DISPATCHING); + // No exceptions allowed in task. + try { task(); } catch (...) { assert(0); } +} + + - State state; - bool immediate; - std::deque<Task> queue; - Thread worker; - Task notifyDispatch; -}; }} // namespace qpid::sys |
