summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/ClusterMap.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/ClusterMap.cpp')
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.cpp137
1 files changed, 91 insertions, 46 deletions
diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp
index b5b71cd397..f3b5451afb 100644
--- a/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -33,73 +33,118 @@ using namespace framing;
namespace cluster {
-ClusterMap::ClusterMap() {}
+namespace {
+void insertSet(ClusterMap::Set& set, const ClusterMap::Map::value_type& v) { set.insert(v.first); }
-MemberId ClusterMap::first() const {
- return (members.empty()) ? MemberId() : members.begin()->first;
+void insertMap(ClusterMap::Map& map, FieldTable::ValueMap::value_type vt) {
+ map.insert(ClusterMap::Map::value_type(vt.first, Url(vt.second->get<std::string>())));
}
-bool ClusterMap::left(const cpg_address* addrs, size_t nLeft) {
- bool changed=false;
- for (const cpg_address* a = addrs; a < addrs+nLeft; ++a)
- changed = members.erase(*a) || changed;
- if (dumper && !isMember(dumper))
- dumper = MemberId();
- QPID_LOG_IF(debug, changed, "Members left. " << *this);
- return changed;
+void assignMap(ClusterMap::Map& map, const FieldTable& ft) {
+ map.clear();
+ std::for_each(ft.begin(), ft.end(), boost::bind(&insertMap, boost::ref(map), _1));
}
-framing::ClusterUpdateBody ClusterMap::toControl() const {
- framing::ClusterUpdateBody b;
- for (Members::const_iterator i = members.begin(); i != members.end(); ++i)
- b.getMembers().setString(i->first.str(), i->second.str());
- b.setDumper(dumper);
- return b;
+void insertFieldTable(FieldTable& ft, const ClusterMap::Map::value_type& vt) {
+ return ft.setString(vt.first.str(), vt.second.str());
+}
+
+void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) {
+ ft.clear();
+ std::for_each(map.begin(), map.end(), boost::bind(&insertFieldTable, boost::ref(ft), _1));
+}
+}
+
+ClusterMap::ClusterMap() {}
+
+ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) {
+ alive.insert(id);
+ if (isMember)
+ members[id] = url;
+ else
+ newbies[id] = url;
}
-bool ClusterMap::update(const framing::FieldTable& ftMembers, uint64_t dumper_) {
- dumper = MemberId(dumper_);
- bool changed = false;
- framing:: FieldTable::ValueMap::const_iterator i;
- for (i = ftMembers.begin(); i != ftMembers.end(); ++i) {
- MemberId id(i->first);
- Url url(i->second->get<std::string>());
- changed = members.insert(Members::value_type(id, url)).second || changed;
+ClusterMap::ClusterMap(const FieldTable& newbiesFt, const FieldTable& membersFt) {
+ assignMap(newbies, newbiesFt);
+ assignMap(members, membersFt);
+ std::for_each(newbies.begin(), newbies.end(), boost::bind(&insertSet, boost::ref(alive), _1));
+ std::for_each(members.begin(), members.end(), boost::bind(&insertSet, boost::ref(alive), _1));
+}
+
+void ClusterMap::configChange(
+ cpg_address *current, int nCurrent,
+ cpg_address *left, int nLeft,
+ cpg_address */*joined*/, int /*nJoined*/)
+{
+ cpg_address* a;
+ for (a = left; a != left+nLeft; ++a) {
+ members.erase(*a);
+ newbies.erase(*a);
}
- QPID_LOG_IF(debug, changed, "Update: " << *this);
- return changed;
+ alive.clear();
+ std::copy(current, current+nCurrent, std::inserter(alive, alive.end()));
+}
+
+Url ClusterMap::getUrl(const Map& map, const MemberId& id) {
+ Map::const_iterator i = map.find(id);
+ return i == map.end() ? Url() : i->second;
+}
+
+MemberId ClusterMap::firstNewbie() const {
+ return newbies.empty() ? MemberId() : newbies.begin()->first;
+}
+
+ClusterConnectionMembershipBody ClusterMap::asMethodBody() const {
+ framing::ClusterConnectionMembershipBody b;
+ assignFieldTable(b.getNewbies(), newbies);
+ assignFieldTable(b.getMembers(), members);
+ return b;
}
std::vector<Url> ClusterMap::memberUrls() const {
- std::vector<Url> result(size());
- std::transform(members.begin(), members.end(), result.begin(),
- boost::bind(&Members::value_type::second, _1));
- return result;
+ std::vector<Url> urls(members.size());
+ std::transform(members.begin(), members.end(), urls.begin(),
+ boost::bind(&Map::value_type::second, _1));
+ return urls;
+}
+
+std::ostream& operator<<(std::ostream& o, const ClusterMap::Map& m) {
+ std::ostream_iterator<MemberId> oi(o);
+ std::transform(m.begin(), m.end(), oi, boost::bind(&ClusterMap::Map::value_type::first, _1));
+ return o;
}
std::ostream& operator<<(std::ostream& o, const ClusterMap& m) {
- o << "Broker members:";
- for (ClusterMap::Members::const_iterator i=m.members.begin(); i != m.members.end(); ++i) {
- o << " " << i->first;
- if (i->first == m.dumper) o << "(dumping)";
+ for (ClusterMap::Set::const_iterator i = m.alive.begin(); i != m.alive.end(); ++i) {
+ o << *i;
+ if (m.isMember(*i)) o << "(member)";
+ if (m.isNewbie(*i)) o << "(newbie)";
+ o << " ";
}
return o;
}
-bool ClusterMap::sendUpdate(const MemberId& id) const {
- return dumper==id || (!dumper && first() == id);
+bool ClusterMap::dumpRequest(const MemberId& id, const std::string& url) {
+ if (isAlive(id)) {
+ newbies[id] = Url(url);
+ return true;
+ }
+ return false;
+}
+
+void ClusterMap::ready(const MemberId& id, const Url& url) {
+ if (isAlive(id)) members[id] = url;
}
-bool ClusterMap::ready(const MemberId& id, const Url& url) {
- bool changed = members.insert(Members::value_type(id,url)).second;
- if (id == dumper) {
- dumper = MemberId();
- QPID_LOG(info, id << " finished dump. " << *this);
- }
- else {
- QPID_LOG(info, id << " joined, url=" << url << ". " << *this);
+boost::optional<Url> ClusterMap::dumpOffer(const MemberId& from, const MemberId& to) {
+ Map::iterator i = newbies.find(to);
+ if (isAlive(from) && i != newbies.end()) {
+ Url url= i->second;
+ newbies.erase(i); // No longer a potential dumpee.
+ return url;
}
- return changed;
+ return boost::none;
}
}} // namespace qpid::cluster