From 1ff0cbe80e155edc2740ee5f5fae6eaf6c611785 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Fri, 2 Nov 2007 13:12:14 +0000 Subject: Added Timer class which uses boost intrusive_ptr. This needs to be made into a C++ template at some point. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@591321 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/Timer.cpp | 83 +++++++++++++++++++++++++++++++++++++++++++ cpp/src/qpid/broker/Timer.h | 59 ++++++++++++++++++++++++++---- 2 files changed, 136 insertions(+), 6 deletions(-) (limited to 'cpp/src') diff --git a/cpp/src/qpid/broker/Timer.cpp b/cpp/src/qpid/broker/Timer.cpp index 14727b3b35..f721ba5ef5 100644 --- a/cpp/src/qpid/broker/Timer.cpp +++ b/cpp/src/qpid/broker/Timer.cpp @@ -95,3 +95,86 @@ bool Later::operator()(const TimerTask::shared_ptr& a, const TimerTask::shared_p { return a.get() && b.get() && a->time > b->time; } + +bool LaterA::operator()(const TimerTaskA::intrusive_ptr& a, const TimerTaskA::intrusive_ptr& b) const +{ + return a.get() && b.get() && a->time > b->time; +} + +TimerA::TimerA(): active(false) +{ + start(); +} + +TimerA::~TimerA() +{ + stop(); +} +void TimerA::run() +{ + Monitor::ScopedLock l(monitor); + while(active){ + if (itasks.empty()) { + monitor.wait(); + } else { + TimerTaskA::intrusive_ptr t = itasks.top(); + if (t->cancelled) { + itasks.pop(); + } else if(t->time < AbsTime::now()) { + itasks.pop(); + t->fire(); + } else { + monitor.wait(t->time); + } + } + } +// ::run(); +} + +TimerTaskA::TimerTaskA(qpid::sys::Duration timeout): TimerTask(timeout), ref_cnt(0) {} +TimerTaskA::TimerTaskA(qpid::sys::AbsTime time): TimerTask(time), ref_cnt(0) {} +TimerTaskA::~TimerTaskA() {} + + +void TimerA::add(TimerTaskA::intrusive_ptr& task) +{ + Monitor::ScopedLock l(monitor); + itasks.push(task); + monitor.notify(); +} + +void TimerA::start() +{ + Monitor::ScopedLock l(monitor); + if (!active) { + active = true; + runner = Thread(this); + } +} + +void TimerA::stop() +{ + signalStop(); + runner.join(); +} +void TimerA::signalStop() +{ + Monitor::ScopedLock l(monitor); + if (active) { + active = false; + monitor.notifyAll(); + } +} + +void qpid::broker::intrusive_ptr_add_ref(TimerTaskA* fe) +{ + fe->ref(); +} + +void qpid::broker::intrusive_ptr_release(TimerTaskA* fe) +{ + fe->unref(); + if (fe->refcnt() == 0) delete fe; +} + + diff --git a/cpp/src/qpid/broker/Timer.h b/cpp/src/qpid/broker/Timer.h index e89ae499b7..eaa9192bec 100644 --- a/cpp/src/qpid/broker/Timer.h +++ b/cpp/src/qpid/broker/Timer.h @@ -24,6 +24,7 @@ #include #include #include +#include #include "qpid/sys/Monitor.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Runnable.h" @@ -44,24 +45,45 @@ struct TimerTask virtual void fire() = 0; }; - struct Later - { - bool operator()(const TimerTask::shared_ptr& a, const TimerTask::shared_ptr& b) const; - }; +struct TimerTaskA : public TimerTask +{ + typedef boost::intrusive_ptr intrusive_ptr; + + TimerTaskA(qpid::sys::Duration timeout); + TimerTaskA(qpid::sys::AbsTime time); + virtual ~TimerTaskA(); + + size_t ref_cnt; + inline size_t refcnt(void) { return ref_cnt;} + inline void ref(void) { ref_cnt++; } + inline void unref(void) { ref_cnt--; } +}; + +struct Later +{ + bool operator()(const TimerTask::shared_ptr& a, const TimerTask::shared_ptr& b) const; +}; + +struct LaterA +{ + bool operator()(const TimerTaskA::intrusive_ptr& a, const TimerTaskA::intrusive_ptr& b) const; +}; + class Timer : private qpid::sys::Runnable { +protected: qpid::sys::Monitor monitor; std::priority_queue, Later> tasks; qpid::sys::Thread runner; bool active; - void run(); + virtual void run(); void signalStop(); public: Timer(); - ~Timer(); + virtual ~Timer(); void add(TimerTask::shared_ptr task); void start(); @@ -69,6 +91,31 @@ public: }; +class TimerA : private qpid::sys::Runnable +{ +protected: + qpid::sys::Monitor monitor; + std::priority_queue, + LaterA> itasks; + qpid::sys::Thread runner; + bool active; + + virtual void run(); + void signalStop(); + +public: + TimerA(); + virtual ~TimerA(); + + void add(TimerTaskA::intrusive_ptr& task); + void start(); + void stop(); +}; + +void intrusive_ptr_add_ref(TimerTaskA* r); +void intrusive_ptr_release(TimerTaskA* r); + + } } -- cgit v1.2.1