summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Timer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Timer.cpp')
-rw-r--r--cpp/src/qpid/broker/Timer.cpp83
1 files changed, 83 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/Timer.cpp b/cpp/src/qpid/broker/Timer.cpp
index 14727b3b35..f721ba5ef5 100644
--- a/cpp/src/qpid/broker/Timer.cpp
+++ b/cpp/src/qpid/broker/Timer.cpp
@@ -95,3 +95,86 @@ bool Later::operator()(const TimerTask::shared_ptr& a, const TimerTask::shared_p
{
return a.get() && b.get() && a->time > b->time;
}
+
+bool LaterA::operator()(const TimerTaskA::intrusive_ptr& a, const TimerTaskA::intrusive_ptr& b) const
+{
+ return a.get() && b.get() && a->time > b->time;
+}
+
+TimerA::TimerA(): active(false)
+{
+ start();
+}
+
+TimerA::~TimerA()
+{
+ stop();
+}
+void TimerA::run()
+{
+ Monitor::ScopedLock l(monitor);
+ while(active){
+ if (itasks.empty()) {
+ monitor.wait();
+ } else {
+ TimerTaskA::intrusive_ptr t = itasks.top();
+ if (t->cancelled) {
+ itasks.pop();
+ } else if(t->time < AbsTime::now()) {
+ itasks.pop();
+ t->fire();
+ } else {
+ monitor.wait(t->time);
+ }
+ }
+ }
+// ::run();
+}
+
+TimerTaskA::TimerTaskA(qpid::sys::Duration timeout): TimerTask(timeout), ref_cnt(0) {}
+TimerTaskA::TimerTaskA(qpid::sys::AbsTime time): TimerTask(time), ref_cnt(0) {}
+TimerTaskA::~TimerTaskA() {}
+
+
+void TimerA::add(TimerTaskA::intrusive_ptr& task)
+{
+ Monitor::ScopedLock l(monitor);
+ itasks.push(task);
+ monitor.notify();
+}
+
+void TimerA::start()
+{
+ Monitor::ScopedLock l(monitor);
+ if (!active) {
+ active = true;
+ runner = Thread(this);
+ }
+}
+
+void TimerA::stop()
+{
+ signalStop();
+ runner.join();
+}
+void TimerA::signalStop()
+{
+ Monitor::ScopedLock l(monitor);
+ if (active) {
+ active = false;
+ monitor.notifyAll();
+ }
+}
+
+void qpid::broker::intrusive_ptr_add_ref(TimerTaskA* fe)
+{
+ fe->ref();
+}
+
+void qpid::broker::intrusive_ptr_release(TimerTaskA* fe)
+{
+ fe->unref();
+ if (fe->refcnt() == 0) delete fe;
+}
+
+