summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp17
-rw-r--r--cpp/src/qpid/cluster/Cluster.h2
-rw-r--r--cpp/src/qpid/cluster/PeriodicTimerImpl.cpp14
3 files changed, 21 insertions, 12 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 44b95b75b8..0d0fb7bcee 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -250,9 +250,10 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
updateRetracted(false),
error(*this)
{
- // FIXME aconway 2010-01-26: must be done before management registers with timer.
- broker.setPeriodicTimer(
- std::auto_ptr<sys::PeriodicTimer>(new PeriodicTimerImpl(*this)));
+ // We give ownership of the timer to the broker and keep a plain pointer.
+ // This is OK as it means the timer has the same lifetime as the broker.
+ timer = new PeriodicTimerImpl(*this);
+ broker.setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer>(timer));
mAgent = broker.getManagementAgent();
if (mAgent != 0){
@@ -448,8 +449,8 @@ void Cluster::flagError(
// Handler for deliverFrameQueue.
// This thread executes the main logic.
void Cluster::deliveredFrame(const EventFrame& efConst) {
- sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
Mutex::ScopedLock l(lock);
+ sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
if (state == LEFT) return;
EventFrame e(efConst);
const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody());
@@ -961,13 +962,13 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu
error.respondNone(from, type, frameSeq);
}
-void Cluster::periodicTimer(const MemberId&, const std::string& , Lock&) {
- // FIXME aconway 2010-01-26:
+void Cluster::periodicTimer(const MemberId&, const std::string& name, Lock&) {
+ timer->deliver(name);
}
bool Cluster::isElder() const {
- Mutex::ScopedLock l(lock);
- return elders.empty();
+ Monitor::ScopedLock l(lock);
+ return state >= CATCHUP && elders.empty();
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 39c440723f..08911081ea 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -63,6 +63,7 @@ namespace cluster {
class Connection;
class EventFrame;
+class PeriodicTimerImpl;
/**
* Connection to the cluster
@@ -271,6 +272,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
bool updateRetracted;
ErrorCheck error;
UpdateReceiver updateReceiver;
+ PeriodicTimerImpl* timer;
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend class ClusterDispatcher;
diff --git a/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp b/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp
index 0d6cac8cc9..ced34b572d 100644
--- a/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp
+++ b/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp
@@ -32,20 +32,24 @@ PeriodicTimerImpl::TaskEntry::TaskEntry(
Cluster& c, const Task& t, sys::Duration d, const std::string& n)
: TimerTask(d), cluster(c), timer(c.getBroker().getTimer()),
task(t), name(n), inFlight(false)
-{}
+{
+ timer.add(this);
+}
void PeriodicTimerImpl::TaskEntry::fire() {
+ setupNextFire();
+ timer.add(this);
+ bool isElder = cluster.isElder(); // Call outside lock to avoid deadlock.
sys::Mutex::ScopedLock l(lock);
// Only the elder mcasts.
// Don't mcast another if we haven't yet received the last one.
- if (cluster.isElder() && !inFlight) {
+ if (isElder && !inFlight) {
+ QPID_LOG(trace, "Sending periodic-timer control for " << name);
inFlight = true;
cluster.getMulticast().mcastControl(
framing::ClusterPeriodicTimerBody(framing::ProtocolVersion(), name),
cluster.getId());
}
- setupNextFire();
- timer.add(this);
}
void PeriodicTimerImpl::TaskEntry::deliver() {
@@ -59,6 +63,7 @@ void PeriodicTimerImpl::add(
const Task& task, sys::Duration period, const std::string& name)
{
sys::Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, "Periodic timer add entry for " << name);
if (map.find(name) != map.end())
throw Exception(QPID_MSG("Cluster timer task name added twice: " << name));
map[name] = new TaskEntry(cluster, task, period, name);
@@ -72,6 +77,7 @@ void PeriodicTimerImpl::deliver(const std::string& name) {
if (i == map.end())
throw Exception(QPID_MSG("Cluster timer unknown task: " << name));
}
+ QPID_LOG(debug, "Periodic timer execute " << name);
i->second->deliver();
}