diff options
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 17 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/PeriodicTimerImpl.cpp | 14 |
3 files changed, 21 insertions, 12 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 44b95b75b8..0d0fb7bcee 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -250,9 +250,10 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : updateRetracted(false), error(*this) { - // FIXME aconway 2010-01-26: must be done before management registers with timer. - broker.setPeriodicTimer( - std::auto_ptr<sys::PeriodicTimer>(new PeriodicTimerImpl(*this))); + // We give ownership of the timer to the broker and keep a plain pointer. + // This is OK as it means the timer has the same lifetime as the broker. + timer = new PeriodicTimerImpl(*this); + broker.setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer>(timer)); mAgent = broker.getManagementAgent(); if (mAgent != 0){ @@ -448,8 +449,8 @@ void Cluster::flagError( // Handler for deliverFrameQueue. // This thread executes the main logic. void Cluster::deliveredFrame(const EventFrame& efConst) { - sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts. Mutex::ScopedLock l(lock); + sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts. if (state == LEFT) return; EventFrame e(efConst); const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody()); @@ -961,13 +962,13 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu error.respondNone(from, type, frameSeq); } -void Cluster::periodicTimer(const MemberId&, const std::string& , Lock&) { - // FIXME aconway 2010-01-26: +void Cluster::periodicTimer(const MemberId&, const std::string& name, Lock&) { + timer->deliver(name); } bool Cluster::isElder() const { - Mutex::ScopedLock l(lock); - return elders.empty(); + Monitor::ScopedLock l(lock); + return state >= CATCHUP && elders.empty(); } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 39c440723f..08911081ea 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -63,6 +63,7 @@ namespace cluster { class Connection; class EventFrame; +class PeriodicTimerImpl; /** * Connection to the cluster @@ -271,6 +272,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { bool updateRetracted; ErrorCheck error; UpdateReceiver updateReceiver; + PeriodicTimerImpl* timer; friend std::ostream& operator<<(std::ostream&, const Cluster&); friend class ClusterDispatcher; diff --git a/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp b/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp index 0d6cac8cc9..ced34b572d 100644 --- a/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp +++ b/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp @@ -32,20 +32,24 @@ PeriodicTimerImpl::TaskEntry::TaskEntry( Cluster& c, const Task& t, sys::Duration d, const std::string& n) : TimerTask(d), cluster(c), timer(c.getBroker().getTimer()), task(t), name(n), inFlight(false) -{} +{ + timer.add(this); +} void PeriodicTimerImpl::TaskEntry::fire() { + setupNextFire(); + timer.add(this); + bool isElder = cluster.isElder(); // Call outside lock to avoid deadlock. sys::Mutex::ScopedLock l(lock); // Only the elder mcasts. // Don't mcast another if we haven't yet received the last one. - if (cluster.isElder() && !inFlight) { + if (isElder && !inFlight) { + QPID_LOG(trace, "Sending periodic-timer control for " << name); inFlight = true; cluster.getMulticast().mcastControl( framing::ClusterPeriodicTimerBody(framing::ProtocolVersion(), name), cluster.getId()); } - setupNextFire(); - timer.add(this); } void PeriodicTimerImpl::TaskEntry::deliver() { @@ -59,6 +63,7 @@ void PeriodicTimerImpl::add( const Task& task, sys::Duration period, const std::string& name) { sys::Mutex::ScopedLock l(lock); + QPID_LOG(debug, "Periodic timer add entry for " << name); if (map.find(name) != map.end()) throw Exception(QPID_MSG("Cluster timer task name added twice: " << name)); map[name] = new TaskEntry(cluster, task, period, name); @@ -72,6 +77,7 @@ void PeriodicTimerImpl::deliver(const std::string& name) { if (i == map.end()) throw Exception(QPID_MSG("Cluster timer unknown task: " << name)); } + QPID_LOG(debug, "Periodic timer execute " << name); i->second->deliver(); } |
