summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-11-17 18:09:01 +0000
committerAlan Conway <aconway@apache.org>2009-11-17 18:09:01 +0000
commitef7728a725272b88c3cd2f81f81ee60ed00cde90 (patch)
tree5083904bb1324c93cf399c6c8b04ee4f0cb03549 /cpp/src/qpid
parent74c1740d54360bb4b091b5486c69f0f945d27abd (diff)
downloadqpid-python-ef7728a725272b88c3cd2f81f81ee60ed00cde90.tar.gz
cluster::InitialStatusMap and unit tests, support for improved cluster join protocol.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@881420 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp17
-rw-r--r--cpp/src/qpid/cluster/Cluster.h1
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.h3
-rw-r--r--cpp/src/qpid/cluster/Cpg.cpp15
-rw-r--r--cpp/src/qpid/cluster/InitialStatusMap.cpp110
-rw-r--r--cpp/src/qpid/cluster/InitialStatusMap.h68
-rw-r--r--cpp/src/qpid/cluster/types.h7
7 files changed, 207 insertions, 14 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 3ced6263df..d6312e7b93 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -153,7 +153,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 834052;
+const uint32_t Cluster::CLUSTER_VERSION = 835547;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -162,12 +162,19 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {}
void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
+ void initialStatus(bool active, bool persistent, const framing::FieldTable& props) {
+ cluster.initialStatus(member, active, persistent, props);
+ }
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& current) { cluster.configChange(member, current, l); }
- void updateOffer(uint64_t updatee, const Uuid& id, uint32_t version) { cluster.updateOffer(member, updatee, id, version, l); }
+ void updateOffer(uint64_t updatee, const Uuid& id, uint32_t version) {
+ cluster.updateOffer(member, updatee, id, version, l);
+ }
void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
- void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { cluster.errorCheck(member, type, frameSeq, l); }
+ void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) {
+ cluster.errorCheck(member, type, frameSeq, l);
+ }
void shutdown() { cluster.shutdown(member, l); }
@@ -603,6 +610,10 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l)
makeOffer(id, l);
}
+void Cluster::initialStatus(const MemberId&, bool /*active*/, bool /*persistent*/,
+ const framing::FieldTable&) {
+ // FIXME aconway 2009-11-12: fill in.
+}
void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
if (map.ready(id, Url(url)))
memberUpdate(l);
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 286d7867c9..751a71867d 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -147,6 +147,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&,
uint32_t version, Lock&);
void retractOffer(const MemberId& updater, uint64_t updatee, Lock&);
+ void initialStatus(const MemberId&, bool active, bool persistent, const framing::FieldTable& props);
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& current, Lock& l);
void messageExpired(const MemberId&, uint64_t, Lock& l);
diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h
index 5735a6335d..2e682a6f4a 100644
--- a/cpp/src/qpid/cluster/ClusterMap.h
+++ b/cpp/src/qpid/cluster/ClusterMap.h
@@ -33,14 +33,11 @@
#include <vector>
#include <deque>
#include <map>
-#include <set>
#include <iosfwd>
namespace qpid {
namespace cluster {
-typedef std::set<MemberId> MemberSet;
-
/**
* Map of established cluster members and joiners waiting for an update,
* along with other cluster state that must be updated.
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp
index 316b5386f1..49a814b848 100644
--- a/cpp/src/qpid/cluster/Cpg.cpp
+++ b/cpp/src/qpid/cluster/Cpg.cpp
@@ -217,12 +217,15 @@ MemberId Cpg::self() const {
namespace { int byte(uint32_t value, int i) { return (value >> (i*8)) & 0xff; } }
-ostream& operator <<(ostream& out, const MemberId& id) {
- out << byte(id.first, 0) << "."
- << byte(id.first, 1) << "."
- << byte(id.first, 2) << "."
- << byte(id.first, 3);
- return out << ":" << id.second;
+ostream& operator<<(ostream& out, const MemberId& id) {
+ if (id.first) {
+ out << byte(id.first, 0) << "."
+ << byte(id.first, 1) << "."
+ << byte(id.first, 2) << "."
+ << byte(id.first, 3)
+ << ":";
+ }
+ return out << id.second;
}
ostream& operator<<(ostream& o, const ConnectionId& c) {
diff --git a/cpp/src/qpid/cluster/InitialStatusMap.cpp b/cpp/src/qpid/cluster/InitialStatusMap.cpp
new file mode 100644
index 0000000000..8c55a66ed7
--- /dev/null
+++ b/cpp/src/qpid/cluster/InitialStatusMap.cpp
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "InitialStatusMap.h"
+#include <algorithm>
+#include <boost/bind.hpp>
+
+using namespace std;
+using namespace boost;
+
+namespace qpid {
+namespace cluster {
+
+InitialStatusMap::InitialStatusMap(const MemberId& self_)
+ : self(self_), complete(), updateNeeded(), resendNeeded()
+{
+ map[self] = optional<Status>();
+}
+
+void InitialStatusMap::configChange(const MemberSet& members) {
+ resendNeeded = false;
+ if (firstConfig.empty()) firstConfig = members;
+ MemberSet::const_iterator i = members.begin();
+ Map::iterator j = map.begin();
+ while (i != members.end() || j != map.end()) {
+ if (i == members.end()) { // j not in members, member left
+ Map::iterator k = j++;
+ map.erase(k);
+ }
+ else if (j == map.end()) { // i not in map, member joined
+ resendNeeded = true;
+ map[*i] = optional<Status>();
+ ++i;
+ }
+ else if (*i < j->first) { // i not in map, member joined
+ resendNeeded = true;
+ map[*i] = optional<Status>();
+ ++i;
+ }
+ else if (*i > j->first) { // j not in members, member left
+ Map::iterator k = j++;
+ map.erase(k);
+ }
+ else {
+ i++; j++;
+ }
+ }
+ if (resendNeeded) { // Clear all status
+ for (Map::iterator i = map.begin(); i != map.end(); ++i)
+ i->second = optional<Status>();
+ }
+}
+
+void InitialStatusMap::received(const MemberId& m, const Status& s){
+ map[m] = s;
+}
+
+bool InitialStatusMap::notInitialized(const Map::value_type& v) {
+ return !v.second;
+}
+
+bool InitialStatusMap::isActive(const Map::value_type& v) {
+ return v.second && v.second->getActive();
+}
+
+bool InitialStatusMap::isComplete() {
+ return find_if(map.begin(), map.end(), &notInitialized) == map.end();
+}
+
+bool InitialStatusMap::isResendNeeded() {
+ bool ret = resendNeeded;
+ resendNeeded = false;
+ return ret;
+}
+
+bool InitialStatusMap::isUpdateNeeded() {
+ assert(isComplete());
+ // If there are any active members we need an update.
+ return find_if(map.begin(), map.end(), &isActive) != map.end();
+}
+
+MemberSet InitialStatusMap::getElders() {
+ assert(isComplete());
+ MemberSet elders;
+ // Elders are from first config change, active or higher node-id.
+ for (MemberSet::iterator i = firstConfig.begin(); i != firstConfig.end(); ++i) {
+ if (map.find(*i) != map.end() && (map[*i]->getActive() || *i > self))
+ elders.insert(*i);
+ }
+ return elders;
+}
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/InitialStatusMap.h b/cpp/src/qpid/cluster/InitialStatusMap.h
new file mode 100644
index 0000000000..d139722623
--- /dev/null
+++ b/cpp/src/qpid/cluster/InitialStatusMap.h
@@ -0,0 +1,68 @@
+#ifndef QPID_CLUSTER_INITIALSTATUSMAP_H
+#define QPID_CLUSTER_INITIALSTATUSMAP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include <qpid/framing/ClusterInitialStatusBody.h>
+#include <boost/optional.hpp>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Track status of cluster members during initialization.
+ */
+class InitialStatusMap
+{
+ public:
+ typedef framing::ClusterInitialStatusBody Status;
+
+ InitialStatusMap(const MemberId& self);
+ /** Process a config change. @return true if we need to re-send our status */
+ void configChange(const MemberSet& newConfig);
+ /** @return true if we need to re-send status */
+ bool isResendNeeded();
+
+ /** Process received status */
+ void received(const MemberId&, const Status& is);
+
+ /**@return true if the map is complete. */
+ bool isComplete();
+ /**@pre isComplete. @return this node's elders */
+ MemberSet getElders();
+ /**@pre isComplete. @return True if we need an update. */
+ bool isUpdateNeeded();
+
+ private:
+ typedef std::map<MemberId, boost::optional<Status> > Map;
+ static bool notInitialized(const Map::value_type&);
+ static bool isActive(const Map::value_type&);
+ void check();
+ Map map;
+ MemberSet firstConfig;
+ MemberId self;
+ bool complete, updateNeeded, resendNeeded;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_INITIALSTATUSMAP_H*/
diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h
index c19152e4d8..6777c9674c 100644
--- a/cpp/src/qpid/cluster/types.h
+++ b/cpp/src/qpid/cluster/types.h
@@ -29,6 +29,7 @@
#include <utility>
#include <iosfwd>
#include <string>
+#include <set>
extern "C" {
@@ -52,8 +53,8 @@ enum EventType { DATA, CONTROL };
/** first=node-id, second=pid */
struct MemberId : std::pair<uint32_t, uint32_t> {
- explicit MemberId(uint64_t n) : std::pair<uint32_t,uint32_t>( n >> 32, n & 0xffffffff) {}
- explicit MemberId(uint32_t node=0, uint32_t pid=0) : std::pair<uint32_t,uint32_t>(node, pid) {}
+ MemberId(uint64_t n=0) : std::pair<uint32_t,uint32_t>( n >> 32, n & 0xffffffff) {}
+ MemberId(uint32_t node, uint32_t pid) : std::pair<uint32_t,uint32_t>(node, pid) {}
MemberId(const cpg_address& caddr) : std::pair<uint32_t,uint32_t>(caddr.nodeid, caddr.pid) {}
MemberId(const std::string&); // Decode from string.
uint32_t getNode() const { return first; }
@@ -75,6 +76,8 @@ struct ConnectionId : public std::pair<MemberId, uint64_t> {
uint64_t getNumber() const { return second; }
};
+typedef std::set<MemberId> MemberSet;
+
std::ostream& operator<<(std::ostream&, const ConnectionId&);
std::ostream& operator<<(std::ostream&, EventType);