summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-01-29 22:59:09 +0000
committerAlan Conway <aconway@apache.org>2010-01-29 22:59:09 +0000
commita78bf7b9144ed3db8e798124595f48fc75231cce (patch)
treeb7284043fe639a2c6a880fc33836ce0b51d21b7e /cpp/src/qpid/cluster
parent726b23f43478a85b961365e4de3a9302a261f6b3 (diff)
downloadqpid-python-a78bf7b9144ed3db8e798124595f48fc75231cce.tar.gz
Replace PeriodicTimer with ClusterTimer, which inherits from Timer.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@904656 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp48
-rw-r--r--cpp/src/qpid/cluster/Cluster.h10
-rw-r--r--cpp/src/qpid/cluster/ClusterTimer.cpp114
-rw-r--r--cpp/src/qpid/cluster/ClusterTimer.h58
4 files changed, 207 insertions, 23 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 0d0fb7bcee..7dd8c7e62c 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -112,7 +112,7 @@
#include "qpid/cluster/RetractClient.h"
#include "qpid/cluster/FailoverExchange.h"
#include "qpid/cluster/UpdateExchange.h"
-#include "qpid/cluster/PeriodicTimerImpl.h"
+#include "qpid/cluster/ClusterTimer.h"
#include "qpid/assert.h"
#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
@@ -137,7 +137,7 @@
#include "qpid/framing/ClusterUpdateRequestBody.h"
#include "qpid/framing/ClusterConnectionAnnounceBody.h"
#include "qpid/framing/ClusterErrorCheckBody.h"
-#include "qpid/framing/ClusterPeriodicTimerBody.h"
+#include "qpid/framing/ClusterTimerWakeupBody.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Helpers.h"
#include "qpid/log/Statement.h"
@@ -179,7 +179,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 903171;
+const uint32_t Cluster::CLUSTER_VERSION = 904565;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -209,9 +209,8 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) {
cluster.errorCheck(member, type, frameSeq, l);
}
- void periodicTimer(const std::string& name) {
- cluster.periodicTimer(member, name, l);
- }
+ void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); }
+ void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); }
void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); }
@@ -245,6 +244,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
state(INIT),
initMap(self, settings.size),
store(broker.getDataDir().getPath()),
+ elder(false),
lastSize(0),
lastBroker(false),
updateRetracted(false),
@@ -252,8 +252,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
{
// We give ownership of the timer to the broker and keep a plain pointer.
// This is OK as it means the timer has the same lifetime as the broker.
- timer = new PeriodicTimerImpl(*this);
- broker.setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer>(timer));
+ timer = new ClusterTimer(*this);
+ broker.setClusterTimer(std::auto_ptr<sys::Timer>(timer));
mAgent = broker.getManagementAgent();
if (mAgent != 0){
@@ -577,14 +577,13 @@ void Cluster::initMapCompleted(Lock& l) {
initMap.checkConsistent();
elders = initMap.getElders();
QPID_LOG(debug, *this << " elders: " << elders);
- if (!elders.empty()) { // I'm not the elder, I don't handle links & replication.
+ if (elders.empty())
+ becomeElder();
+ else {
broker.getLinks().setPassive(true);
broker.getQueueEvents().disable();
QPID_LOG(info, *this << " not active for links.");
}
- else {
- QPID_LOG(info, this << " active for links.");
- }
setClusterId(initMap.getClusterId(), l);
if (store.hasStore()) store.dirty(clusterId);
@@ -636,14 +635,19 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock&
if (state >= CATCHUP && memberChange) {
memberUpdate(l);
- if (elders.empty()) {
- // We are the oldest, reactive links if necessary
- QPID_LOG(info, this << " becoming active for links.");
- broker.getLinks().setPassive(false);
- }
+ if (elders.empty()) becomeElder();
}
}
+void Cluster::becomeElder() {
+ if (elder) return; // We were already the elder.
+ // We are the oldest, reactive links if necessary
+ QPID_LOG(info, *this << " became the elder, active for links.");
+ elder = true;
+ broker.getLinks().setPassive(false);
+ timer->becomeElder();
+}
+
void Cluster::makeOffer(const MemberId& id, Lock& ) {
if (state == READY && map.isJoiner(id)) {
state = OFFER;
@@ -962,13 +966,17 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu
error.respondNone(from, type, frameSeq);
}
-void Cluster::periodicTimer(const MemberId&, const std::string& name, Lock&) {
- timer->deliver(name);
+void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) {
+ timer->deliverWakeup(name);
+}
+
+void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) {
+ timer->deliverDrop(name);
}
bool Cluster::isElder() const {
Monitor::ScopedLock l(lock);
- return state >= CATCHUP && elders.empty();
+ return elder;
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 08911081ea..977c873e29 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -63,7 +63,7 @@ namespace cluster {
class Connection;
class EventFrame;
-class PeriodicTimerImpl;
+class ClusterTimer;
/**
* Connection to the cluster
@@ -164,7 +164,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void configChange(const MemberId&, const std::string& current, Lock& l);
void messageExpired(const MemberId&, uint64_t, Lock& l);
void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
- void periodicTimer(const MemberId&, const std::string& name, Lock&);
+ void timerWakeup(const MemberId&, const std::string& name, Lock&);
+ void timerDrop(const MemberId&, const std::string& name, Lock&);
void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&);
@@ -201,6 +202,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
const struct cpg_address */*joined*/, int /*nJoined*/
);
+ void becomeElder();
+
// == Called in management threads.
virtual qpid::management::ManagementObject* GetManagementObject() const;
virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
@@ -265,6 +268,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
StoreStatus store;
ClusterMap map;
MemberSet elders;
+ bool elder;
size_t lastSize;
bool lastBroker;
sys::Thread updateThread;
@@ -272,7 +276,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
bool updateRetracted;
ErrorCheck error;
UpdateReceiver updateReceiver;
- PeriodicTimerImpl* timer;
+ ClusterTimer* timer;
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend class ClusterDispatcher;
diff --git a/cpp/src/qpid/cluster/ClusterTimer.cpp b/cpp/src/qpid/cluster/ClusterTimer.cpp
new file mode 100644
index 0000000000..612758152f
--- /dev/null
+++ b/cpp/src/qpid/cluster/ClusterTimer.cpp
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Cluster.h"
+#include "ClusterTimer.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/ClusterTimerWakeupBody.h"
+#include "qpid/framing/ClusterTimerDropBody.h"
+
+namespace qpid {
+namespace cluster {
+
+using boost::intrusive_ptr;
+using std::max;
+using sys::Timer;
+using sys::TimerTask;
+
+
+ClusterTimer::ClusterTimer(Cluster& c) : cluster(c) {}
+
+ClusterTimer::~ClusterTimer() {}
+
+// Initialization or deliver thread.
+void ClusterTimer::add(intrusive_ptr<TimerTask> task)
+{
+ QPID_LOG(trace, "Adding cluster timer task " << task->getName());
+ Map::iterator i = map.find(task->getName());
+ if (i != map.end())
+ throw Exception(QPID_MSG("Task already exists with name " << task->getName()));
+ map[task->getName()] = task;
+ // Only the elder actually activates the task with the Timer base class.
+ if (cluster.isElder()) {
+ QPID_LOG(trace, "Elder activating cluster timer task " << task->getName());
+ Timer::add(task);
+ }
+}
+
+// Timer thread
+void ClusterTimer::fire(intrusive_ptr<TimerTask> t) {
+ // Elder mcasts wakeup on fire, task is not fired until deliverWakeup
+ if (cluster.isElder()) {
+ QPID_LOG(trace, "Sending cluster timer wakeup " << t->getName());
+ cluster.getMulticast().mcastControl(
+ framing::ClusterTimerWakeupBody(framing::ProtocolVersion(), t->getName()),
+ cluster.getId());
+ }
+ else
+ QPID_LOG(trace, "Cluster timer task fired, but not elder " << t->getName());
+}
+
+// Timer thread
+void ClusterTimer::drop(intrusive_ptr<TimerTask> t) {
+ // Elder mcasts drop, task is droped in deliverDrop
+ if (cluster.isElder()) {
+ QPID_LOG(trace, "Sending cluster timer drop " << t->getName());
+ cluster.getMulticast().mcastControl(
+ framing::ClusterTimerDropBody(framing::ProtocolVersion(), t->getName()),
+ cluster.getId());
+ }
+ else
+ QPID_LOG(trace, "Cluster timer task dropped, but not on elder " << t->getName());
+}
+
+// Deliver thread
+void ClusterTimer::deliverWakeup(const std::string& name) {
+ QPID_LOG(trace, "Cluster timer wakeup delivered for " << name);
+ Map::iterator i = map.find(name);
+ if (i == map.end())
+ throw Exception(QPID_MSG("Cluster timer wakeup non-existent task " << name));
+ else {
+ intrusive_ptr<TimerTask> t = i->second;
+ map.erase(i);
+ Timer::fire(t);
+ }
+}
+
+// Deliver thread
+void ClusterTimer::deliverDrop(const std::string& name) {
+ QPID_LOG(trace, "Cluster timer drop delivered for " << name);
+ Map::iterator i = map.find(name);
+ if (i == map.end())
+ throw Exception(QPID_MSG("Cluster timer drop non-existent task " << name));
+ else {
+ intrusive_ptr<TimerTask> t = i->second;
+ map.erase(i);
+ }
+}
+
+// Deliver thread
+void ClusterTimer::becomeElder() {
+ for (Map::iterator i = map.begin(); i != map.end(); ++i) {
+ Timer::add(i->second);
+ }
+}
+
+}}
diff --git a/cpp/src/qpid/cluster/ClusterTimer.h b/cpp/src/qpid/cluster/ClusterTimer.h
new file mode 100644
index 0000000000..395e505451
--- /dev/null
+++ b/cpp/src/qpid/cluster/ClusterTimer.h
@@ -0,0 +1,58 @@
+#ifndef QPID_CLUSTER_CLUSTERTIMER_H
+#define QPID_CLUSTER_CLUSTERTIMER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Timer.h"
+#include <map>
+
+namespace qpid {
+namespace cluster {
+
+class Cluster;
+
+class ClusterTimer : public sys::Timer {
+ public:
+ ClusterTimer(Cluster&);
+ ~ClusterTimer();
+
+ void add(boost::intrusive_ptr<sys::TimerTask> task);
+
+ void deliverWakeup(const std::string& name);
+ void deliverDrop(const std::string& name);
+ void becomeElder();
+
+ protected:
+ void fire(boost::intrusive_ptr<sys::TimerTask> task);
+ void drop(boost::intrusive_ptr<sys::TimerTask> task);
+
+ private:
+ typedef std::map<std::string, boost::intrusive_ptr<sys::TimerTask> > Map;
+ Cluster& cluster;
+ Map map;
+};
+
+
+}}
+
+
+#endif /*!QPID_CLUSTER_CLUSTERTIMER_H*/