diff options
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 17 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 15 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/InitialStatusMap.cpp | 110 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/InitialStatusMap.h | 68 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/types.h | 7 |
7 files changed, 207 insertions, 14 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 3ced6263df..d6312e7b93 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -153,7 +153,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 834052; +const uint32_t Cluster::CLUSTER_VERSION = 835547; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -162,12 +162,19 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {} void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } + void initialStatus(bool active, bool persistent, const framing::FieldTable& props) { + cluster.initialStatus(member, active, persistent, props); + } void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& current) { cluster.configChange(member, current, l); } - void updateOffer(uint64_t updatee, const Uuid& id, uint32_t version) { cluster.updateOffer(member, updatee, id, version, l); } + void updateOffer(uint64_t updatee, const Uuid& id, uint32_t version) { + cluster.updateOffer(member, updatee, id, version, l); + } void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); } void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } - void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { cluster.errorCheck(member, type, frameSeq, l); } + void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { + cluster.errorCheck(member, type, frameSeq, l); + } void shutdown() { cluster.shutdown(member, l); } @@ -603,6 +610,10 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) makeOffer(id, l); } +void Cluster::initialStatus(const MemberId&, bool /*active*/, bool /*persistent*/, + const framing::FieldTable&) { + // FIXME aconway 2009-11-12: fill in. +} void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { if (map.ready(id, Url(url))) memberUpdate(l); diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 286d7867c9..751a71867d 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -147,6 +147,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, uint32_t version, Lock&); void retractOffer(const MemberId& updater, uint64_t updatee, Lock&); + void initialStatus(const MemberId&, bool active, bool persistent, const framing::FieldTable& props); void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& current, Lock& l); void messageExpired(const MemberId&, uint64_t, Lock& l); diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h index 5735a6335d..2e682a6f4a 100644 --- a/cpp/src/qpid/cluster/ClusterMap.h +++ b/cpp/src/qpid/cluster/ClusterMap.h @@ -33,14 +33,11 @@ #include <vector> #include <deque> #include <map> -#include <set> #include <iosfwd> namespace qpid { namespace cluster { -typedef std::set<MemberId> MemberSet; - /** * Map of established cluster members and joiners waiting for an update, * along with other cluster state that must be updated. diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index 316b5386f1..49a814b848 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -217,12 +217,15 @@ MemberId Cpg::self() const { namespace { int byte(uint32_t value, int i) { return (value >> (i*8)) & 0xff; } } -ostream& operator <<(ostream& out, const MemberId& id) { - out << byte(id.first, 0) << "." - << byte(id.first, 1) << "." - << byte(id.first, 2) << "." - << byte(id.first, 3); - return out << ":" << id.second; +ostream& operator<<(ostream& out, const MemberId& id) { + if (id.first) { + out << byte(id.first, 0) << "." + << byte(id.first, 1) << "." + << byte(id.first, 2) << "." + << byte(id.first, 3) + << ":"; + } + return out << id.second; } ostream& operator<<(ostream& o, const ConnectionId& c) { diff --git a/cpp/src/qpid/cluster/InitialStatusMap.cpp b/cpp/src/qpid/cluster/InitialStatusMap.cpp new file mode 100644 index 0000000000..8c55a66ed7 --- /dev/null +++ b/cpp/src/qpid/cluster/InitialStatusMap.cpp @@ -0,0 +1,110 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "InitialStatusMap.h" +#include <algorithm> +#include <boost/bind.hpp> + +using namespace std; +using namespace boost; + +namespace qpid { +namespace cluster { + +InitialStatusMap::InitialStatusMap(const MemberId& self_) + : self(self_), complete(), updateNeeded(), resendNeeded() +{ + map[self] = optional<Status>(); +} + +void InitialStatusMap::configChange(const MemberSet& members) { + resendNeeded = false; + if (firstConfig.empty()) firstConfig = members; + MemberSet::const_iterator i = members.begin(); + Map::iterator j = map.begin(); + while (i != members.end() || j != map.end()) { + if (i == members.end()) { // j not in members, member left + Map::iterator k = j++; + map.erase(k); + } + else if (j == map.end()) { // i not in map, member joined + resendNeeded = true; + map[*i] = optional<Status>(); + ++i; + } + else if (*i < j->first) { // i not in map, member joined + resendNeeded = true; + map[*i] = optional<Status>(); + ++i; + } + else if (*i > j->first) { // j not in members, member left + Map::iterator k = j++; + map.erase(k); + } + else { + i++; j++; + } + } + if (resendNeeded) { // Clear all status + for (Map::iterator i = map.begin(); i != map.end(); ++i) + i->second = optional<Status>(); + } +} + +void InitialStatusMap::received(const MemberId& m, const Status& s){ + map[m] = s; +} + +bool InitialStatusMap::notInitialized(const Map::value_type& v) { + return !v.second; +} + +bool InitialStatusMap::isActive(const Map::value_type& v) { + return v.second && v.second->getActive(); +} + +bool InitialStatusMap::isComplete() { + return find_if(map.begin(), map.end(), ¬Initialized) == map.end(); +} + +bool InitialStatusMap::isResendNeeded() { + bool ret = resendNeeded; + resendNeeded = false; + return ret; +} + +bool InitialStatusMap::isUpdateNeeded() { + assert(isComplete()); + // If there are any active members we need an update. + return find_if(map.begin(), map.end(), &isActive) != map.end(); +} + +MemberSet InitialStatusMap::getElders() { + assert(isComplete()); + MemberSet elders; + // Elders are from first config change, active or higher node-id. + for (MemberSet::iterator i = firstConfig.begin(); i != firstConfig.end(); ++i) { + if (map.find(*i) != map.end() && (map[*i]->getActive() || *i > self)) + elders.insert(*i); + } + return elders; +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/InitialStatusMap.h b/cpp/src/qpid/cluster/InitialStatusMap.h new file mode 100644 index 0000000000..d139722623 --- /dev/null +++ b/cpp/src/qpid/cluster/InitialStatusMap.h @@ -0,0 +1,68 @@ +#ifndef QPID_CLUSTER_INITIALSTATUSMAP_H +#define QPID_CLUSTER_INITIALSTATUSMAP_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include <qpid/framing/ClusterInitialStatusBody.h> +#include <boost/optional.hpp> + +namespace qpid { +namespace cluster { + +/** + * Track status of cluster members during initialization. + */ +class InitialStatusMap +{ + public: + typedef framing::ClusterInitialStatusBody Status; + + InitialStatusMap(const MemberId& self); + /** Process a config change. @return true if we need to re-send our status */ + void configChange(const MemberSet& newConfig); + /** @return true if we need to re-send status */ + bool isResendNeeded(); + + /** Process received status */ + void received(const MemberId&, const Status& is); + + /**@return true if the map is complete. */ + bool isComplete(); + /**@pre isComplete. @return this node's elders */ + MemberSet getElders(); + /**@pre isComplete. @return True if we need an update. */ + bool isUpdateNeeded(); + + private: + typedef std::map<MemberId, boost::optional<Status> > Map; + static bool notInitialized(const Map::value_type&); + static bool isActive(const Map::value_type&); + void check(); + Map map; + MemberSet firstConfig; + MemberId self; + bool complete, updateNeeded, resendNeeded; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_INITIALSTATUSMAP_H*/ diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h index c19152e4d8..6777c9674c 100644 --- a/cpp/src/qpid/cluster/types.h +++ b/cpp/src/qpid/cluster/types.h @@ -29,6 +29,7 @@ #include <utility> #include <iosfwd> #include <string> +#include <set> extern "C" { @@ -52,8 +53,8 @@ enum EventType { DATA, CONTROL }; /** first=node-id, second=pid */ struct MemberId : std::pair<uint32_t, uint32_t> { - explicit MemberId(uint64_t n) : std::pair<uint32_t,uint32_t>( n >> 32, n & 0xffffffff) {} - explicit MemberId(uint32_t node=0, uint32_t pid=0) : std::pair<uint32_t,uint32_t>(node, pid) {} + MemberId(uint64_t n=0) : std::pair<uint32_t,uint32_t>( n >> 32, n & 0xffffffff) {} + MemberId(uint32_t node, uint32_t pid) : std::pair<uint32_t,uint32_t>(node, pid) {} MemberId(const cpg_address& caddr) : std::pair<uint32_t,uint32_t>(caddr.nodeid, caddr.pid) {} MemberId(const std::string&); // Decode from string. uint32_t getNode() const { return first; } @@ -75,6 +76,8 @@ struct ConnectionId : public std::pair<MemberId, uint64_t> { uint64_t getNumber() const { return second; } }; +typedef std::set<MemberId> MemberSet; + std::ostream& operator<<(std::ostream&, const ConnectionId&); std::ostream& operator<<(std::ostream&, EventType); |
