diff options
Diffstat (limited to 'cpp/src')
| -rw-r--r-- | cpp/src/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | cpp/src/cluster.cmake | 2 | ||||
| -rw-r--r-- | cpp/src/cluster.mk | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 19 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/PeriodicTimerImpl.cpp | 78 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/PeriodicTimerImpl.h | 72 |
7 files changed, 176 insertions, 6 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index a32a7a99e1..fd77e34619 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -573,6 +573,7 @@ set (qpidcommon_SOURCES qpid/sys/Runnable.cpp qpid/sys/Shlib.cpp qpid/sys/Timer.cpp + qpid/sys/PeriodicTimerImpl.cpp ) add_library (qpidcommon SHARED ${qpidcommon_SOURCES}) diff --git a/cpp/src/cluster.cmake b/cpp/src/cluster.cmake index 04f74264be..41b048382b 100644 --- a/cpp/src/cluster.cmake +++ b/cpp/src/cluster.cmake @@ -129,6 +129,8 @@ if (BUILD_CLUSTER) qpid/cluster/MemberSet.h qpid/cluster/MemberSet.cpp qpid/cluster/types.h + qpid/cluster/PeriodicTimerImpl.h + qpid/cluster/PeriodicTimerImpl.cpp qpid/cluster/StoreStatus.h qpid/cluster/StoreStatus.cpp ) diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index 081889130e..db2c20f5eb 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -88,6 +88,8 @@ cluster_la_SOURCES = \ qpid/cluster/MemberSet.h \ qpid/cluster/MemberSet.cpp \ qpid/cluster/types.h \ + qpid/cluster/PeriodicTimerImpl.h \ + qpid/cluster/PeriodicTimerImpl.cpp \ qpid/cluster/StoreStatus.h \ qpid/cluster/StoreStatus.cpp diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 779c162f9a..44b95b75b8 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -112,6 +112,7 @@ #include "qpid/cluster/RetractClient.h" #include "qpid/cluster/FailoverExchange.h" #include "qpid/cluster/UpdateExchange.h" +#include "qpid/cluster/PeriodicTimerImpl.h" #include "qpid/assert.h" #include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" @@ -136,6 +137,7 @@ #include "qpid/framing/ClusterUpdateRequestBody.h" #include "qpid/framing/ClusterConnectionAnnounceBody.h" #include "qpid/framing/ClusterErrorCheckBody.h" +#include "qpid/framing/ClusterPeriodicTimerBody.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Helpers.h" #include "qpid/log/Statement.h" @@ -177,7 +179,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 896973; +const uint32_t Cluster::CLUSTER_VERSION = 903171; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -207,6 +209,9 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { cluster.errorCheck(member, type, frameSeq, l); } + void periodicTimer(const std::string& name) { + cluster.periodicTimer(member, name, l); + } void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); } @@ -245,6 +250,10 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : updateRetracted(false), error(*this) { + // FIXME aconway 2010-01-26: must be done before management registers with timer. + broker.setPeriodicTimer( + std::auto_ptr<sys::PeriodicTimer>(new PeriodicTimerImpl(*this))); + mAgent = broker.getManagementAgent(); if (mAgent != 0){ _qmf::Package packageInit(mAgent); @@ -952,5 +961,13 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu error.respondNone(from, type, frameSeq); } +void Cluster::periodicTimer(const MemberId&, const std::string& , Lock&) { + // FIXME aconway 2010-01-26: +} + +bool Cluster::isElder() const { + Mutex::ScopedLock l(lock); + return elders.empty(); +} }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index ae3d667359..39c440723f 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -117,6 +117,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { UpdateReceiver& getUpdateReceiver() { return updateReceiver; } + bool isElder() const; + private: typedef sys::Monitor::ScopedLock Lock; @@ -161,6 +163,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void configChange(const MemberId&, const std::string& current, Lock& l); void messageExpired(const MemberId&, uint64_t, Lock& l); void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&); + void periodicTimer(const MemberId&, const std::string& name, Lock&); void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&); @@ -241,11 +244,6 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Remaining members are protected by lock. - - // TODO aconway 2009-03-06: Most of these members are also only used in - // deliverFrameQueue thread or during stall. Review and separate members - // that require a lock, drop lock when not needed. - mutable sys::Monitor lock; diff --git a/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp b/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp new file mode 100644 index 0000000000..0d6cac8cc9 --- /dev/null +++ b/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "PeriodicTimerImpl.h" +#include "Cluster.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/framing/ClusterPeriodicTimerBody.h" + +namespace qpid { +namespace cluster { + +PeriodicTimerImpl::PeriodicTimerImpl(Cluster& c) : cluster(c) {} + +PeriodicTimerImpl::TaskEntry::TaskEntry( + Cluster& c, const Task& t, sys::Duration d, const std::string& n) + : TimerTask(d), cluster(c), timer(c.getBroker().getTimer()), + task(t), name(n), inFlight(false) +{} + +void PeriodicTimerImpl::TaskEntry::fire() { + sys::Mutex::ScopedLock l(lock); + // Only the elder mcasts. + // Don't mcast another if we haven't yet received the last one. + if (cluster.isElder() && !inFlight) { + inFlight = true; + cluster.getMulticast().mcastControl( + framing::ClusterPeriodicTimerBody(framing::ProtocolVersion(), name), + cluster.getId()); + } + setupNextFire(); + timer.add(this); +} + +void PeriodicTimerImpl::TaskEntry::deliver() { + task(); + sys::Mutex::ScopedLock l(lock); + inFlight = false; +} + + +void PeriodicTimerImpl::add( + const Task& task, sys::Duration period, const std::string& name) +{ + sys::Mutex::ScopedLock l(lock); + if (map.find(name) != map.end()) + throw Exception(QPID_MSG("Cluster timer task name added twice: " << name)); + map[name] = new TaskEntry(cluster, task, period, name); +} + +void PeriodicTimerImpl::deliver(const std::string& name) { + Map::iterator i; + { + sys::Mutex::ScopedLock l(lock); + i = map.find(name); + if (i == map.end()) + throw Exception(QPID_MSG("Cluster timer unknown task: " << name)); + } + i->second->deliver(); +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/PeriodicTimerImpl.h b/cpp/src/qpid/cluster/PeriodicTimerImpl.h new file mode 100644 index 0000000000..cb0034bcef --- /dev/null +++ b/cpp/src/qpid/cluster/PeriodicTimerImpl.h @@ -0,0 +1,72 @@ +#ifndef QPID_CLUSTER_PERIODICTIMERIMPL_H +#define QPID_CLUSTER_PERIODICTIMERIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/PeriodicTimer.h" +#include "qpid/sys/Mutex.h" +#include <map> + +namespace qpid { +namespace cluster { + +class Cluster; + +/** + * Cluster implementation of PeriodicTimer. + * + * All members run a periodic task, elder mcasts periodic-timer control. + * Actual task is executed on delivery of periodic-timer. + */ +class PeriodicTimerImpl : public sys::PeriodicTimer +{ + public: + PeriodicTimerImpl(Cluster& cluster); + void add(const Task& task, sys::Duration period, const std::string& taskName); + void deliver(const std::string& name); + + private: + + class TaskEntry : public sys::TimerTask { + public: + TaskEntry(Cluster&, const Task&, sys::Duration period, const std::string& name); + void fire(); + void deliver(); + private: + sys::Mutex lock; + Cluster& cluster; + sys::Timer& timer; + Task task; + std::string name; + bool inFlight; + }; + + typedef std::map<std::string, boost::intrusive_ptr<TaskEntry> > Map; + struct TaskImpl; + + sys::Mutex lock; + Map map; + Cluster& cluster; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_PERIODICTIMER_H*/ |
