summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-16 18:46:11 +0000
committerAlan Conway <aconway@apache.org>2008-09-16 18:46:11 +0000
commit8709822ffee38d9bf16f4cf43114bc450fc222eb (patch)
treeaff16233001f6770a397c588d80d367af343942e /cpp
parent2f43007baa7e97086ddb227ddc27b4b4b26bf53c (diff)
downloadqpid-python-8709822ffee38d9bf16f4cf43114bc450fc222eb.tar.gz
Fix race in cluster join protocol.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@696003 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp131
-rw-r--r--cpp/src/qpid/cluster/Cluster.h15
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.cpp52
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.h34
-rw-r--r--cpp/src/qpid/cluster/types.h3
-rw-r--r--cpp/src/tests/cluster_test.cpp4
-rw-r--r--cpp/xml/cluster.xml9
7 files changed, 133 insertions, 115 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 7fb2e5ad58..858542802c 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -27,6 +27,7 @@
#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterDumpRequestBody.h"
#include "qpid/framing/ClusterUpdateBody.h"
+#include "qpid/framing/ClusterReadyBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/log/Statement.h"
@@ -53,8 +54,9 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler {
ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {}
bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); }
- void dumpRequest(const std::string& u) { cluster.dumpRequest(member, u); }
- void update(const FieldTable& members,bool dumping) { cluster.update(members, dumping); }
+ void update(const FieldTable& members, uint64_t dumping) { cluster.update(members, dumping); }
+ void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url); }
+ void ready(const std::string& url) { cluster.ready(member, url); }
};
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
@@ -76,7 +78,6 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
cpgDispatchHandle.startWatch(poller);
cpg.join(name);
-
}
Cluster::~Cluster() {}
@@ -218,6 +219,19 @@ ostream& operator<<(ostream& o, const AddrList& a) {
return o;
}
+void Cluster::dispatch(sys::DispatchHandle& h) {
+ cpg.dispatchAll();
+ h.rewatch();
+}
+
+void Cluster::disconnect(sys::DispatchHandle& ) {
+ // FIXME aconway 2008-09-11: this should be logged as critical,
+ // when we provide admin option to shut down cluster and let
+ // members leave cleanly.
+ QPID_LOG(notice, self << " disconnected from cluster " << name.str());
+ broker.shutdown();
+}
+
void Cluster::configChange(
cpg_handle_t /*handle*/,
cpg_name */*group*/,
@@ -225,79 +239,78 @@ void Cluster::configChange(
cpg_address *left, int nLeft,
cpg_address */*joined*/, int nJoined)
{
+ Mutex::ScopedLock l(lock);
// FIXME aconway 2008-09-15: use group terminology not cluster. Member not node.
- QPID_LOG(notice, "Current cluster: " << AddrList(current, nCurrent));
- QPID_LOG_IF(notice, nLeft, "Left the cluster: " << AddrList(left, nLeft));
- if (find(left, left+nLeft, self) != left+nLeft) {
- // We have left the group, this is the final config change.
+ QPID_LOG(info, "Current cluster: " << AddrList(current, nCurrent));
+ QPID_LOG_IF(info, nLeft, "Left the cluster: " << AddrList(left, nLeft));
+
+ map.left(left, nLeft);
+ if (find(left, left+nLeft, self) != left+nLeft) {
+ // I have left the group, this is the final config change.
QPID_LOG(notice, self << " left cluster " << name.str());
broker.shutdown();
+ return;
}
- Mutex::ScopedLock l(lock);
- map.configChange(current, nCurrent);
- if (state == START && nCurrent == 1) { // First in cluster
- assert(*current == self);
- assert(map.empty());
- QPID_LOG(notice, self << " first in cluster.");
- map.insert(self, url);
- ready();
- }
- else if (nJoined && self == map.first()) { // Send an update to new members.
- mcastControl(map.toControl(), 0);
+
+ if (state == START) {
+ if (nCurrent == 1 && *current == self) { // First in cluster.
+ // First in cluster
+ QPID_LOG(notice, self << " first in cluster.");
+ map.add(self, url);
+ ready();
+ }
+ return;
}
-}
-void Cluster::dispatch(sys::DispatchHandle& h) {
- cpg.dispatchAll();
- h.rewatch();
-}
+ if (state == DISCARD && !map.dumper) // try another dump request.
+ mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0);
-void Cluster::disconnect(sys::DispatchHandle& ) {
- // FIXME aconway 2008-09-11: this should be logged as critical,
- // when we provide admin option to shut down cluster and let
- // members leave cleanly.
- QPID_LOG(notice, self << " disconnected from cluster " << name.str());
- broker.shutdown();
+ if (nJoined && map.sendUpdate(self)) // New members need update
+ mcastControl(map.toControl(), 0);
}
-void Cluster::update(const FieldTable& members, bool dumping) {
+void Cluster::update(const FieldTable& members, uint64_t dumper) {
Mutex::ScopedLock l(lock);
- map.update(members, dumping);
- QPID_LOG(info, "Cluster update:\n " << map);
- if (state == START && dumping == false) {
- state = DISCARD;
+ map.update(members, dumper);
+ QPID_LOG(debug, "Cluster update: " << map);
+ if (state == START) state = DISCARD; // Got first update.
+ if (state == DISCARD && !map.dumper)
mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0);
- }
}
-void Cluster::dumpRequest(const MemberId& m, const string& urlStr) {
+void Cluster::dumpRequest(const MemberId& dumpee, const string& urlStr) {
Mutex::ScopedLock l(lock);
- bool wasDumping = map.isDumping();
- map.setDumping(true);
- if (!wasDumping) {
- if (self == m) { // My turn
- assert(state == DISCARD);
- // FIXME aconway 2008-09-15: RECEIVE DUMP
- // state = CATCHUP;
- // stall();
- // When received
- map.insert(self, url);
- mcastControl(map.toControl(), 0);
- ready();
- }
- else if (state == READY && self == map.first()) { // Give the dump.
- QPID_LOG(info, self << " dumping to " << url);
- // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient.
- // state = DUMPING;
- // stall();
- (void)urlStr;
- // When dump complete:
- map.setDumping(false);
- mcastControl(map.toControl(), 0);
- }
+ if (map.dumper) return; // Dump already in progress, ignore.
+ map.dumper = map.first();
+ if (dumpee == self && state == DISCARD) { // My turn to receive a dump.
+ QPID_LOG(info, self << " receiving state dump from " << map.dumper);
+ // FIXME aconway 2008-09-15: RECEIVE DUMP
+ // state = CATCHUP;
+ // stall();
+ // When received
+ mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0);
+ ready();
+ }
+ else if (map.dumper == self && state == READY) { // My turn to send the dump
+ QPID_LOG(info, self << " sending state dump to " << dumpee);
+ // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient.
+ // state = DUMPING;
+ // stall();
+ (void)urlStr;
+ // When dump complete:
+ assert(map.dumper == self);
+ ClusterUpdateBody b = map.toControl();
+ b.setDumper(0);
+ mcastControl(b, 0);
+ // NB: Don't modify my own map till self-delivery.
}
}
+void Cluster::ready(const MemberId& member, const std::string& url) {
+ Mutex::ScopedLock l(lock);
+ map.add(member, Url(url));
+}
+
broker::Broker& Cluster::getBroker(){ return broker; }
void Cluster::stall() {
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index b8527ae66b..e33cca8482 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -74,9 +74,11 @@ class Cluster : private Cpg::Handler
/** Leave the cluster */
void leave();
-
+
+ // Cluster controls.
+ void update(const framing::FieldTable& members, uint64_t dumping);
void dumpRequest(const MemberId&, const std::string& url);
- void update(const framing::FieldTable& members, bool dumping);
+ void ready(const MemberId&, const std::string& url);
MemberId getSelf() const { return self; }
@@ -91,12 +93,11 @@ class Cluster : private Cpg::Handler
typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap;
typedef sys::PollableQueue<Event> EventQueue;
enum State {
- START, // Have not yet received first cluster update.
+ START, // Start state, no cluster update received yet.
DISCARD, // Discard updates up to dump start point.
- HAVE_DUMP, // Received state dump, waiting for catchup point.
- CATCHUP, // Stalled at catchup point, waiting for dump.
- DUMPING, // Stalled while sending a state dump.
- READY // Normal processing.
+ CATCHUP, // Stalled at catchup point, waiting for dump.
+ DUMPING, // Stalled while sending a state dump.
+ READY // Normal processing.
};
void connectionEvent(const Event&);
diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp
index 63d0c786d2..51e360ad73 100644
--- a/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -32,56 +32,58 @@ using namespace framing;
namespace cluster {
-ClusterMap::ClusterMap() : dumping(false) {}
+ClusterMap::ClusterMap() {}
-MemberId ClusterMap::first() {
- return (empty()) ? MemberId() : begin()->first;
+MemberId ClusterMap::first() const {
+ return (members.empty()) ? MemberId() : members.begin()->first;
}
-void ClusterMap::configChange(const cpg_address* addrs, size_t size) {
- iterator i = begin();
- while (i != end()) { // Remove members that are no longer in addrs.
- if (std::find(addrs, addrs+size, i->first) == addrs+size)
- erase(i++);
- else
- ++i;
- }
+void ClusterMap::left(const cpg_address* addrs, size_t size) {
+ size_t (Members::*erase)(const MemberId&) = &Members::erase;
+ std::for_each(addrs, addrs+size, boost::bind(erase, &members, _1));
+ if (dumper && !isMember(dumper))
+ dumper = MemberId();
}
framing::ClusterUpdateBody ClusterMap::toControl() const {
framing::ClusterUpdateBody b;
- for (const_iterator i = begin(); i != end(); ++i)
+ for (Members::const_iterator i = members.begin(); i != members.end(); ++i)
b.getMembers().setString(i->first.str(), i->second.str());
- b.setDumping(dumping);
+ b.setDumper(dumper);
return b;
}
-void ClusterMap::update(const FieldTable& ftMembers, bool dump) {
- dumping = dump;
+void ClusterMap::update(const FieldTable& ftMembers, uint64_t dumper_) {
FieldTable::ValueMap::const_iterator i;
for (i = ftMembers.begin(); i != ftMembers.end(); ++i)
- (*this)[i->first] = Url(i->second->get<std::string>());
-}
-
-void ClusterMap::fromControl(const framing::ClusterUpdateBody& b) {
- update(b.getMembers(), b.getDumping());
+ members[i->first] = Url(i->second->get<std::string>());
+ dumper = MemberId(dumper_);
}
std::vector<Url> ClusterMap::memberUrls() const {
std::vector<Url> result(size());
- std::transform(begin(), end(), result.begin(),
- boost::bind(&value_type::second, _1));
+ std::transform(members.begin(), members.end(), result.begin(),
+ boost::bind(&Members::value_type::second, _1));
return result;
}
-std::ostream& operator<<(std::ostream& o, const ClusterMap::value_type& mv) {
+std::ostream& operator<<(std::ostream& o, const ClusterMap::Members::value_type& mv) {
return o << mv.first << "=" << mv.second;
}
std::ostream& operator<<(std::ostream& o, const ClusterMap& m) {
- std::ostream_iterator<ClusterMap::value_type> im(o, "\n ");
- std::copy(m.begin(), m.end(), im);
+ std::ostream_iterator<ClusterMap::Members::value_type> im(o, "\n ");
+ o << "dumper=" << m.dumper << ", members:\n ";
+ std::copy(m.members.begin(), m.members.end(), im);
return o;
}
+bool ClusterMap::sendUpdate(const MemberId& id) const {
+ return dumper==id || (!dumper && first() == id);
+}
+
+void ClusterMap::add(const MemberId& id, const Url& url) {
+ members[id] = url;
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h
index fce65f083d..c626c7688d 100644
--- a/cpp/src/qpid/cluster/ClusterMap.h
+++ b/cpp/src/qpid/cluster/ClusterMap.h
@@ -41,39 +41,39 @@ namespace cluster {
* A dumper is an established member that is sending catch-up data.
* A dumpee is an aspiring member that is receiving catch-up data.
*/
-class ClusterMap : public std::map<MemberId, Url> {
+class ClusterMap {
public:
+ typedef std::map<MemberId, Url> Members;
+ Members members;
+ MemberId dumper;
+
ClusterMap();
/** First member of the cluster in ID order, gets to perform one-off tasks. */
- MemberId first();
-
- /** Update for CPG config change. */
- void configChange(const cpg_address* addrs, size_t size);
+ MemberId first() const;
+ /** Update for members leaving. */
+ void left(const cpg_address* addrs, size_t size);
- /** Convert map contents to a cluster control body. */
+ /** Convert map contents to a cluster update body. */
framing::ClusterUpdateBody toControl() const;
- /** Update with first member. */
- using std::map<MemberId, Url>::insert;
- void insert(const MemberId& id, const Url& url) { insert(value_type(id,url)); }
- void setDumping(bool d) { dumping = d; }
+ /** Add a new member. */
+ void add(const MemberId& id, const Url& url);
/** Apply update delivered from clsuter. */
- void update(const framing::FieldTable& members, bool dumping);
- void fromControl(const framing::ClusterUpdateBody&);
+ void update(const framing::FieldTable& members, uint64_t dumper);
- bool isMember(const MemberId& id) const { return find(id) != end(); }
- bool isDumping() const { return dumping; }
+ bool isMember(const MemberId& id) const { return members.find(id) != members.end(); }
+ bool sendUpdate(const MemberId& id) const; // True if id should send an update.
std::vector<Url> memberUrls() const;
-
+ size_t size() const { return members.size(); }
+
private:
- bool dumping;
friend std::ostream& operator<<(std::ostream&, const ClusterMap&);
- friend std::ostream& operator<<(std::ostream& o, const ClusterMap::value_type& mv);
+ friend std::ostream& operator<<(std::ostream& o, const ClusterMap::Members::value_type& mv);
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h
index f48ba2db30..4fbe79e118 100644
--- a/cpp/src/qpid/cluster/types.h
+++ b/cpp/src/qpid/cluster/types.h
@@ -41,12 +41,13 @@ enum EventType { DATA, CONTROL };
/** first=node-id, second=pid */
struct MemberId : std::pair<uint32_t, uint32_t> {
+ explicit MemberId(uint64_t n) : std::pair<uint32_t,uint32_t>( n >> 32, n & 0xffffffff) {}
explicit MemberId(uint32_t node=0, uint32_t pid=0) : std::pair<uint32_t,uint32_t>(node, pid) {}
MemberId(const cpg_address& caddr) : std::pair<uint32_t,uint32_t>(caddr.nodeid, caddr.pid) {}
MemberId(const std::string&); // Decode from string.
uint32_t getNode() const { return first; }
uint32_t getPid() const { return second; }
- operator bool() const { return first || second; }
+ operator uint64_t() const { return (uint64_t(first)<<32ull) + second; }
// Encode as string, network byte order.
std::string str() const;
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index c17dc99901..8dec23a09b 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -140,9 +140,7 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) {
return o;
}
-
-// FIXME aconway 2008-09-12: finish the new join protocol.
-QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testCatchUpSharedState, 1) {
+QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testCatchupSharedState, 1) {
ClusterFixture cluster(1);
Client c0(cluster[0], "c0");
// Create some shared state.
diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml
index 6dbfee109d..ba4e50d21e 100644
--- a/cpp/xml/cluster.xml
+++ b/cpp/xml/cluster.xml
@@ -27,15 +27,18 @@ o<?xml version="1.0"?>
<class name = "cluster" code = "0x80" label="Qpid clustering extensions.">
<doc>Qpid extension class to allow clustered brokers to communicate.</doc>
- <control name="update" code="0x4" label="Cluster status update.">
+ <control name="update" code="0x1" label="Cluster status update.">
<field name="members" type="map"/> <!-- member-id -> URL -->
- <field name="dumping" type="boolean"/> <!-- currently dumping state to new member. -->
+ <field name="dumper" type="uint64"/> <!-- member currently dumping state. -->
</control>
- <control name = "dump-request" code="0x1" label="New meber requests brain dump">
+ <control name = "dump-request" code="0x2" label="New meber requests brain dump">
<field name="url" type="str16" label="Url for brain dump."/>
</control>
+ <control name="ready" code="0x3" label="New member is ready.">
+ <field name="url" type="str16" label="Url for brain dump."/>
+ </control>
</class>
<!-- TODO aconway 2008-09-10: support for un-attached connections. -->