summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-11-02 13:12:14 +0000
committerKim van der Riet <kpvdr@apache.org>2007-11-02 13:12:14 +0000
commit1ff0cbe80e155edc2740ee5f5fae6eaf6c611785 (patch)
tree78d99b34dc677aac63b164fd2311604dc5714b9a /cpp
parente834668f813a7024be1677116e9213b315956ef2 (diff)
downloadqpid-python-1ff0cbe80e155edc2740ee5f5fae6eaf6c611785.tar.gz
Added Timer class which uses boost intrusive_ptr. This needs to be made into a C++ template at some point.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@591321 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/Timer.cpp83
-rw-r--r--cpp/src/qpid/broker/Timer.h59
2 files changed, 136 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/Timer.cpp b/cpp/src/qpid/broker/Timer.cpp
index 14727b3b35..f721ba5ef5 100644
--- a/cpp/src/qpid/broker/Timer.cpp
+++ b/cpp/src/qpid/broker/Timer.cpp
@@ -95,3 +95,86 @@ bool Later::operator()(const TimerTask::shared_ptr& a, const TimerTask::shared_p
{
return a.get() && b.get() && a->time > b->time;
}
+
+bool LaterA::operator()(const TimerTaskA::intrusive_ptr& a, const TimerTaskA::intrusive_ptr& b) const
+{
+ return a.get() && b.get() && a->time > b->time;
+}
+
+TimerA::TimerA(): active(false)
+{
+ start();
+}
+
+TimerA::~TimerA()
+{
+ stop();
+}
+void TimerA::run()
+{
+ Monitor::ScopedLock l(monitor);
+ while(active){
+ if (itasks.empty()) {
+ monitor.wait();
+ } else {
+ TimerTaskA::intrusive_ptr t = itasks.top();
+ if (t->cancelled) {
+ itasks.pop();
+ } else if(t->time < AbsTime::now()) {
+ itasks.pop();
+ t->fire();
+ } else {
+ monitor.wait(t->time);
+ }
+ }
+ }
+// ::run();
+}
+
+TimerTaskA::TimerTaskA(qpid::sys::Duration timeout): TimerTask(timeout), ref_cnt(0) {}
+TimerTaskA::TimerTaskA(qpid::sys::AbsTime time): TimerTask(time), ref_cnt(0) {}
+TimerTaskA::~TimerTaskA() {}
+
+
+void TimerA::add(TimerTaskA::intrusive_ptr& task)
+{
+ Monitor::ScopedLock l(monitor);
+ itasks.push(task);
+ monitor.notify();
+}
+
+void TimerA::start()
+{
+ Monitor::ScopedLock l(monitor);
+ if (!active) {
+ active = true;
+ runner = Thread(this);
+ }
+}
+
+void TimerA::stop()
+{
+ signalStop();
+ runner.join();
+}
+void TimerA::signalStop()
+{
+ Monitor::ScopedLock l(monitor);
+ if (active) {
+ active = false;
+ monitor.notifyAll();
+ }
+}
+
+void qpid::broker::intrusive_ptr_add_ref(TimerTaskA* fe)
+{
+ fe->ref();
+}
+
+void qpid::broker::intrusive_ptr_release(TimerTaskA* fe)
+{
+ fe->unref();
+ if (fe->refcnt() == 0) delete fe;
+}
+
+
diff --git a/cpp/src/qpid/broker/Timer.h b/cpp/src/qpid/broker/Timer.h
index e89ae499b7..eaa9192bec 100644
--- a/cpp/src/qpid/broker/Timer.h
+++ b/cpp/src/qpid/broker/Timer.h
@@ -24,6 +24,7 @@
#include <memory>
#include <queue>
#include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
@@ -44,24 +45,45 @@ struct TimerTask
virtual void fire() = 0;
};
- struct Later
- {
- bool operator()(const TimerTask::shared_ptr& a, const TimerTask::shared_ptr& b) const;
- };
+struct TimerTaskA : public TimerTask
+{
+ typedef boost::intrusive_ptr<TimerTaskA> intrusive_ptr;
+
+ TimerTaskA(qpid::sys::Duration timeout);
+ TimerTaskA(qpid::sys::AbsTime time);
+ virtual ~TimerTaskA();
+
+ size_t ref_cnt;
+ inline size_t refcnt(void) { return ref_cnt;}
+ inline void ref(void) { ref_cnt++; }
+ inline void unref(void) { ref_cnt--; }
+};
+
+struct Later
+{
+ bool operator()(const TimerTask::shared_ptr& a, const TimerTask::shared_ptr& b) const;
+};
+
+struct LaterA
+{
+ bool operator()(const TimerTaskA::intrusive_ptr& a, const TimerTaskA::intrusive_ptr& b) const;
+};
+
class Timer : private qpid::sys::Runnable
{
+protected:
qpid::sys::Monitor monitor;
std::priority_queue<TimerTask::shared_ptr, std::vector<TimerTask::shared_ptr>, Later> tasks;
qpid::sys::Thread runner;
bool active;
- void run();
+ virtual void run();
void signalStop();
public:
Timer();
- ~Timer();
+ virtual ~Timer();
void add(TimerTask::shared_ptr task);
void start();
@@ -69,6 +91,31 @@ public:
};
+class TimerA : private qpid::sys::Runnable
+{
+protected:
+ qpid::sys::Monitor monitor;
+ std::priority_queue<TimerTaskA::intrusive_ptr&, std::vector<TimerTaskA::intrusive_ptr>,
+ LaterA> itasks;
+ qpid::sys::Thread runner;
+ bool active;
+
+ virtual void run();
+ void signalStop();
+
+public:
+ TimerA();
+ virtual ~TimerA();
+
+ void add(TimerTaskA::intrusive_ptr& task);
+ void start();
+ void stop();
+};
+
+void intrusive_ptr_add_ref(TimerTaskA* r);
+void intrusive_ptr_release(TimerTaskA* r);
+
+
}
}