diff options
| author | Alan Conway <aconway@apache.org> | 2010-01-29 22:59:09 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2010-01-29 22:59:09 +0000 |
| commit | a78bf7b9144ed3db8e798124595f48fc75231cce (patch) | |
| tree | b7284043fe639a2c6a880fc33836ce0b51d21b7e /cpp/src/qpid/cluster | |
| parent | 726b23f43478a85b961365e4de3a9302a261f6b3 (diff) | |
| download | qpid-python-a78bf7b9144ed3db8e798124595f48fc75231cce.tar.gz | |
Replace PeriodicTimer with ClusterTimer, which inherits from Timer.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@904656 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 48 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterTimer.cpp | 114 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterTimer.h | 58 |
4 files changed, 207 insertions, 23 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 0d0fb7bcee..7dd8c7e62c 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -112,7 +112,7 @@ #include "qpid/cluster/RetractClient.h" #include "qpid/cluster/FailoverExchange.h" #include "qpid/cluster/UpdateExchange.h" -#include "qpid/cluster/PeriodicTimerImpl.h" +#include "qpid/cluster/ClusterTimer.h" #include "qpid/assert.h" #include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" @@ -137,7 +137,7 @@ #include "qpid/framing/ClusterUpdateRequestBody.h" #include "qpid/framing/ClusterConnectionAnnounceBody.h" #include "qpid/framing/ClusterErrorCheckBody.h" -#include "qpid/framing/ClusterPeriodicTimerBody.h" +#include "qpid/framing/ClusterTimerWakeupBody.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Helpers.h" #include "qpid/log/Statement.h" @@ -179,7 +179,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 903171; +const uint32_t Cluster::CLUSTER_VERSION = 904565; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -209,9 +209,8 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { cluster.errorCheck(member, type, frameSeq, l); } - void periodicTimer(const std::string& name) { - cluster.periodicTimer(member, name, l); - } + void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); } + void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); } void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); } @@ -245,6 +244,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : state(INIT), initMap(self, settings.size), store(broker.getDataDir().getPath()), + elder(false), lastSize(0), lastBroker(false), updateRetracted(false), @@ -252,8 +252,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : { // We give ownership of the timer to the broker and keep a plain pointer. // This is OK as it means the timer has the same lifetime as the broker. - timer = new PeriodicTimerImpl(*this); - broker.setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer>(timer)); + timer = new ClusterTimer(*this); + broker.setClusterTimer(std::auto_ptr<sys::Timer>(timer)); mAgent = broker.getManagementAgent(); if (mAgent != 0){ @@ -577,14 +577,13 @@ void Cluster::initMapCompleted(Lock& l) { initMap.checkConsistent(); elders = initMap.getElders(); QPID_LOG(debug, *this << " elders: " << elders); - if (!elders.empty()) { // I'm not the elder, I don't handle links & replication. + if (elders.empty()) + becomeElder(); + else { broker.getLinks().setPassive(true); broker.getQueueEvents().disable(); QPID_LOG(info, *this << " not active for links."); } - else { - QPID_LOG(info, this << " active for links."); - } setClusterId(initMap.getClusterId(), l); if (store.hasStore()) store.dirty(clusterId); @@ -636,14 +635,19 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& if (state >= CATCHUP && memberChange) { memberUpdate(l); - if (elders.empty()) { - // We are the oldest, reactive links if necessary - QPID_LOG(info, this << " becoming active for links."); - broker.getLinks().setPassive(false); - } + if (elders.empty()) becomeElder(); } } +void Cluster::becomeElder() { + if (elder) return; // We were already the elder. + // We are the oldest, reactive links if necessary + QPID_LOG(info, *this << " became the elder, active for links."); + elder = true; + broker.getLinks().setPassive(false); + timer->becomeElder(); +} + void Cluster::makeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isJoiner(id)) { state = OFFER; @@ -962,13 +966,17 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu error.respondNone(from, type, frameSeq); } -void Cluster::periodicTimer(const MemberId&, const std::string& name, Lock&) { - timer->deliver(name); +void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) { + timer->deliverWakeup(name); +} + +void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) { + timer->deliverDrop(name); } bool Cluster::isElder() const { Monitor::ScopedLock l(lock); - return state >= CATCHUP && elders.empty(); + return elder; } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 08911081ea..977c873e29 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -63,7 +63,7 @@ namespace cluster { class Connection; class EventFrame; -class PeriodicTimerImpl; +class ClusterTimer; /** * Connection to the cluster @@ -164,7 +164,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { void configChange(const MemberId&, const std::string& current, Lock& l); void messageExpired(const MemberId&, uint64_t, Lock& l); void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&); - void periodicTimer(const MemberId&, const std::string& name, Lock&); + void timerWakeup(const MemberId&, const std::string& name, Lock&); + void timerDrop(const MemberId&, const std::string& name, Lock&); void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&); @@ -201,6 +202,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { const struct cpg_address */*joined*/, int /*nJoined*/ ); + void becomeElder(); + // == Called in management threads. virtual qpid::management::ManagementObject* GetManagementObject() const; virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); @@ -265,6 +268,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { StoreStatus store; ClusterMap map; MemberSet elders; + bool elder; size_t lastSize; bool lastBroker; sys::Thread updateThread; @@ -272,7 +276,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { bool updateRetracted; ErrorCheck error; UpdateReceiver updateReceiver; - PeriodicTimerImpl* timer; + ClusterTimer* timer; friend std::ostream& operator<<(std::ostream&, const Cluster&); friend class ClusterDispatcher; diff --git a/cpp/src/qpid/cluster/ClusterTimer.cpp b/cpp/src/qpid/cluster/ClusterTimer.cpp new file mode 100644 index 0000000000..612758152f --- /dev/null +++ b/cpp/src/qpid/cluster/ClusterTimer.cpp @@ -0,0 +1,114 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Cluster.h" +#include "ClusterTimer.h" +#include "qpid/log/Statement.h" +#include "qpid/framing/ClusterTimerWakeupBody.h" +#include "qpid/framing/ClusterTimerDropBody.h" + +namespace qpid { +namespace cluster { + +using boost::intrusive_ptr; +using std::max; +using sys::Timer; +using sys::TimerTask; + + +ClusterTimer::ClusterTimer(Cluster& c) : cluster(c) {} + +ClusterTimer::~ClusterTimer() {} + +// Initialization or deliver thread. +void ClusterTimer::add(intrusive_ptr<TimerTask> task) +{ + QPID_LOG(trace, "Adding cluster timer task " << task->getName()); + Map::iterator i = map.find(task->getName()); + if (i != map.end()) + throw Exception(QPID_MSG("Task already exists with name " << task->getName())); + map[task->getName()] = task; + // Only the elder actually activates the task with the Timer base class. + if (cluster.isElder()) { + QPID_LOG(trace, "Elder activating cluster timer task " << task->getName()); + Timer::add(task); + } +} + +// Timer thread +void ClusterTimer::fire(intrusive_ptr<TimerTask> t) { + // Elder mcasts wakeup on fire, task is not fired until deliverWakeup + if (cluster.isElder()) { + QPID_LOG(trace, "Sending cluster timer wakeup " << t->getName()); + cluster.getMulticast().mcastControl( + framing::ClusterTimerWakeupBody(framing::ProtocolVersion(), t->getName()), + cluster.getId()); + } + else + QPID_LOG(trace, "Cluster timer task fired, but not elder " << t->getName()); +} + +// Timer thread +void ClusterTimer::drop(intrusive_ptr<TimerTask> t) { + // Elder mcasts drop, task is droped in deliverDrop + if (cluster.isElder()) { + QPID_LOG(trace, "Sending cluster timer drop " << t->getName()); + cluster.getMulticast().mcastControl( + framing::ClusterTimerDropBody(framing::ProtocolVersion(), t->getName()), + cluster.getId()); + } + else + QPID_LOG(trace, "Cluster timer task dropped, but not on elder " << t->getName()); +} + +// Deliver thread +void ClusterTimer::deliverWakeup(const std::string& name) { + QPID_LOG(trace, "Cluster timer wakeup delivered for " << name); + Map::iterator i = map.find(name); + if (i == map.end()) + throw Exception(QPID_MSG("Cluster timer wakeup non-existent task " << name)); + else { + intrusive_ptr<TimerTask> t = i->second; + map.erase(i); + Timer::fire(t); + } +} + +// Deliver thread +void ClusterTimer::deliverDrop(const std::string& name) { + QPID_LOG(trace, "Cluster timer drop delivered for " << name); + Map::iterator i = map.find(name); + if (i == map.end()) + throw Exception(QPID_MSG("Cluster timer drop non-existent task " << name)); + else { + intrusive_ptr<TimerTask> t = i->second; + map.erase(i); + } +} + +// Deliver thread +void ClusterTimer::becomeElder() { + for (Map::iterator i = map.begin(); i != map.end(); ++i) { + Timer::add(i->second); + } +} + +}} diff --git a/cpp/src/qpid/cluster/ClusterTimer.h b/cpp/src/qpid/cluster/ClusterTimer.h new file mode 100644 index 0000000000..395e505451 --- /dev/null +++ b/cpp/src/qpid/cluster/ClusterTimer.h @@ -0,0 +1,58 @@ +#ifndef QPID_CLUSTER_CLUSTERTIMER_H +#define QPID_CLUSTER_CLUSTERTIMER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Timer.h" +#include <map> + +namespace qpid { +namespace cluster { + +class Cluster; + +class ClusterTimer : public sys::Timer { + public: + ClusterTimer(Cluster&); + ~ClusterTimer(); + + void add(boost::intrusive_ptr<sys::TimerTask> task); + + void deliverWakeup(const std::string& name); + void deliverDrop(const std::string& name); + void becomeElder(); + + protected: + void fire(boost::intrusive_ptr<sys::TimerTask> task); + void drop(boost::intrusive_ptr<sys::TimerTask> task); + + private: + typedef std::map<std::string, boost::intrusive_ptr<sys::TimerTask> > Map; + Cluster& cluster; + Map map; +}; + + +}} + + +#endif /*!QPID_CLUSTER_CLUSTERTIMER_H*/ |
