diff options
Diffstat (limited to 'cpp/src/qpid/cluster/ClusterMap.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.cpp | 137 |
1 files changed, 91 insertions, 46 deletions
diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp index b5b71cd397..f3b5451afb 100644 --- a/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/cpp/src/qpid/cluster/ClusterMap.cpp @@ -33,73 +33,118 @@ using namespace framing; namespace cluster { -ClusterMap::ClusterMap() {} +namespace { +void insertSet(ClusterMap::Set& set, const ClusterMap::Map::value_type& v) { set.insert(v.first); } -MemberId ClusterMap::first() const { - return (members.empty()) ? MemberId() : members.begin()->first; +void insertMap(ClusterMap::Map& map, FieldTable::ValueMap::value_type vt) { + map.insert(ClusterMap::Map::value_type(vt.first, Url(vt.second->get<std::string>()))); } -bool ClusterMap::left(const cpg_address* addrs, size_t nLeft) { - bool changed=false; - for (const cpg_address* a = addrs; a < addrs+nLeft; ++a) - changed = members.erase(*a) || changed; - if (dumper && !isMember(dumper)) - dumper = MemberId(); - QPID_LOG_IF(debug, changed, "Members left. " << *this); - return changed; +void assignMap(ClusterMap::Map& map, const FieldTable& ft) { + map.clear(); + std::for_each(ft.begin(), ft.end(), boost::bind(&insertMap, boost::ref(map), _1)); } -framing::ClusterUpdateBody ClusterMap::toControl() const { - framing::ClusterUpdateBody b; - for (Members::const_iterator i = members.begin(); i != members.end(); ++i) - b.getMembers().setString(i->first.str(), i->second.str()); - b.setDumper(dumper); - return b; +void insertFieldTable(FieldTable& ft, const ClusterMap::Map::value_type& vt) { + return ft.setString(vt.first.str(), vt.second.str()); +} + +void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) { + ft.clear(); + std::for_each(map.begin(), map.end(), boost::bind(&insertFieldTable, boost::ref(ft), _1)); +} +} + +ClusterMap::ClusterMap() {} + +ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) { + alive.insert(id); + if (isMember) + members[id] = url; + else + newbies[id] = url; } -bool ClusterMap::update(const framing::FieldTable& ftMembers, uint64_t dumper_) { - dumper = MemberId(dumper_); - bool changed = false; - framing:: FieldTable::ValueMap::const_iterator i; - for (i = ftMembers.begin(); i != ftMembers.end(); ++i) { - MemberId id(i->first); - Url url(i->second->get<std::string>()); - changed = members.insert(Members::value_type(id, url)).second || changed; +ClusterMap::ClusterMap(const FieldTable& newbiesFt, const FieldTable& membersFt) { + assignMap(newbies, newbiesFt); + assignMap(members, membersFt); + std::for_each(newbies.begin(), newbies.end(), boost::bind(&insertSet, boost::ref(alive), _1)); + std::for_each(members.begin(), members.end(), boost::bind(&insertSet, boost::ref(alive), _1)); +} + +void ClusterMap::configChange( + cpg_address *current, int nCurrent, + cpg_address *left, int nLeft, + cpg_address */*joined*/, int /*nJoined*/) +{ + cpg_address* a; + for (a = left; a != left+nLeft; ++a) { + members.erase(*a); + newbies.erase(*a); } - QPID_LOG_IF(debug, changed, "Update: " << *this); - return changed; + alive.clear(); + std::copy(current, current+nCurrent, std::inserter(alive, alive.end())); +} + +Url ClusterMap::getUrl(const Map& map, const MemberId& id) { + Map::const_iterator i = map.find(id); + return i == map.end() ? Url() : i->second; +} + +MemberId ClusterMap::firstNewbie() const { + return newbies.empty() ? MemberId() : newbies.begin()->first; +} + +ClusterConnectionMembershipBody ClusterMap::asMethodBody() const { + framing::ClusterConnectionMembershipBody b; + assignFieldTable(b.getNewbies(), newbies); + assignFieldTable(b.getMembers(), members); + return b; } std::vector<Url> ClusterMap::memberUrls() const { - std::vector<Url> result(size()); - std::transform(members.begin(), members.end(), result.begin(), - boost::bind(&Members::value_type::second, _1)); - return result; + std::vector<Url> urls(members.size()); + std::transform(members.begin(), members.end(), urls.begin(), + boost::bind(&Map::value_type::second, _1)); + return urls; +} + +std::ostream& operator<<(std::ostream& o, const ClusterMap::Map& m) { + std::ostream_iterator<MemberId> oi(o); + std::transform(m.begin(), m.end(), oi, boost::bind(&ClusterMap::Map::value_type::first, _1)); + return o; } std::ostream& operator<<(std::ostream& o, const ClusterMap& m) { - o << "Broker members:"; - for (ClusterMap::Members::const_iterator i=m.members.begin(); i != m.members.end(); ++i) { - o << " " << i->first; - if (i->first == m.dumper) o << "(dumping)"; + for (ClusterMap::Set::const_iterator i = m.alive.begin(); i != m.alive.end(); ++i) { + o << *i; + if (m.isMember(*i)) o << "(member)"; + if (m.isNewbie(*i)) o << "(newbie)"; + o << " "; } return o; } -bool ClusterMap::sendUpdate(const MemberId& id) const { - return dumper==id || (!dumper && first() == id); +bool ClusterMap::dumpRequest(const MemberId& id, const std::string& url) { + if (isAlive(id)) { + newbies[id] = Url(url); + return true; + } + return false; +} + +void ClusterMap::ready(const MemberId& id, const Url& url) { + if (isAlive(id)) members[id] = url; } -bool ClusterMap::ready(const MemberId& id, const Url& url) { - bool changed = members.insert(Members::value_type(id,url)).second; - if (id == dumper) { - dumper = MemberId(); - QPID_LOG(info, id << " finished dump. " << *this); - } - else { - QPID_LOG(info, id << " joined, url=" << url << ". " << *this); +boost::optional<Url> ClusterMap::dumpOffer(const MemberId& from, const MemberId& to) { + Map::iterator i = newbies.find(to); + if (isAlive(from) && i != newbies.end()) { + Url url= i->second; + newbies.erase(i); // No longer a potential dumpee. + return url; } - return changed; + return boost::none; } }} // namespace qpid::cluster |
