diff options
| author | Alan Conway <aconway@apache.org> | 2008-09-16 18:46:11 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-09-16 18:46:11 +0000 |
| commit | 8709822ffee38d9bf16f4cf43114bc450fc222eb (patch) | |
| tree | aff16233001f6770a397c588d80d367af343942e /cpp/src/qpid/cluster/Cluster.cpp | |
| parent | 2f43007baa7e97086ddb227ddc27b4b4b26bf53c (diff) | |
| download | qpid-python-8709822ffee38d9bf16f4cf43114bc450fc222eb.tar.gz | |
Fix race in cluster join protocol.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@696003 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 131 |
1 files changed, 72 insertions, 59 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 7fb2e5ad58..858542802c 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -27,6 +27,7 @@ #include "qpid/framing/AllInvoker.h" #include "qpid/framing/ClusterDumpRequestBody.h" #include "qpid/framing/ClusterUpdateBody.h" +#include "qpid/framing/ClusterReadyBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/log/Statement.h" @@ -53,8 +54,9 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler { ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {} bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); } - void dumpRequest(const std::string& u) { cluster.dumpRequest(member, u); } - void update(const FieldTable& members,bool dumping) { cluster.update(members, dumping); } + void update(const FieldTable& members, uint64_t dumping) { cluster.update(members, dumping); } + void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url); } + void ready(const std::string& url) { cluster.ready(member, url); } }; Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : @@ -76,7 +78,6 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : broker.addFinalizer(boost::bind(&Cluster::shutdown, this)); cpgDispatchHandle.startWatch(poller); cpg.join(name); - } Cluster::~Cluster() {} @@ -218,6 +219,19 @@ ostream& operator<<(ostream& o, const AddrList& a) { return o; } +void Cluster::dispatch(sys::DispatchHandle& h) { + cpg.dispatchAll(); + h.rewatch(); +} + +void Cluster::disconnect(sys::DispatchHandle& ) { + // FIXME aconway 2008-09-11: this should be logged as critical, + // when we provide admin option to shut down cluster and let + // members leave cleanly. + QPID_LOG(notice, self << " disconnected from cluster " << name.str()); + broker.shutdown(); +} + void Cluster::configChange( cpg_handle_t /*handle*/, cpg_name */*group*/, @@ -225,79 +239,78 @@ void Cluster::configChange( cpg_address *left, int nLeft, cpg_address */*joined*/, int nJoined) { + Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-15: use group terminology not cluster. Member not node. - QPID_LOG(notice, "Current cluster: " << AddrList(current, nCurrent)); - QPID_LOG_IF(notice, nLeft, "Left the cluster: " << AddrList(left, nLeft)); - if (find(left, left+nLeft, self) != left+nLeft) { - // We have left the group, this is the final config change. + QPID_LOG(info, "Current cluster: " << AddrList(current, nCurrent)); + QPID_LOG_IF(info, nLeft, "Left the cluster: " << AddrList(left, nLeft)); + + map.left(left, nLeft); + if (find(left, left+nLeft, self) != left+nLeft) { + // I have left the group, this is the final config change. QPID_LOG(notice, self << " left cluster " << name.str()); broker.shutdown(); + return; } - Mutex::ScopedLock l(lock); - map.configChange(current, nCurrent); - if (state == START && nCurrent == 1) { // First in cluster - assert(*current == self); - assert(map.empty()); - QPID_LOG(notice, self << " first in cluster."); - map.insert(self, url); - ready(); - } - else if (nJoined && self == map.first()) { // Send an update to new members. - mcastControl(map.toControl(), 0); + + if (state == START) { + if (nCurrent == 1 && *current == self) { // First in cluster. + // First in cluster + QPID_LOG(notice, self << " first in cluster."); + map.add(self, url); + ready(); + } + return; } -} -void Cluster::dispatch(sys::DispatchHandle& h) { - cpg.dispatchAll(); - h.rewatch(); -} + if (state == DISCARD && !map.dumper) // try another dump request. + mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0); -void Cluster::disconnect(sys::DispatchHandle& ) { - // FIXME aconway 2008-09-11: this should be logged as critical, - // when we provide admin option to shut down cluster and let - // members leave cleanly. - QPID_LOG(notice, self << " disconnected from cluster " << name.str()); - broker.shutdown(); + if (nJoined && map.sendUpdate(self)) // New members need update + mcastControl(map.toControl(), 0); } -void Cluster::update(const FieldTable& members, bool dumping) { +void Cluster::update(const FieldTable& members, uint64_t dumper) { Mutex::ScopedLock l(lock); - map.update(members, dumping); - QPID_LOG(info, "Cluster update:\n " << map); - if (state == START && dumping == false) { - state = DISCARD; + map.update(members, dumper); + QPID_LOG(debug, "Cluster update: " << map); + if (state == START) state = DISCARD; // Got first update. + if (state == DISCARD && !map.dumper) mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0); - } } -void Cluster::dumpRequest(const MemberId& m, const string& urlStr) { +void Cluster::dumpRequest(const MemberId& dumpee, const string& urlStr) { Mutex::ScopedLock l(lock); - bool wasDumping = map.isDumping(); - map.setDumping(true); - if (!wasDumping) { - if (self == m) { // My turn - assert(state == DISCARD); - // FIXME aconway 2008-09-15: RECEIVE DUMP - // state = CATCHUP; - // stall(); - // When received - map.insert(self, url); - mcastControl(map.toControl(), 0); - ready(); - } - else if (state == READY && self == map.first()) { // Give the dump. - QPID_LOG(info, self << " dumping to " << url); - // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient. - // state = DUMPING; - // stall(); - (void)urlStr; - // When dump complete: - map.setDumping(false); - mcastControl(map.toControl(), 0); - } + if (map.dumper) return; // Dump already in progress, ignore. + map.dumper = map.first(); + if (dumpee == self && state == DISCARD) { // My turn to receive a dump. + QPID_LOG(info, self << " receiving state dump from " << map.dumper); + // FIXME aconway 2008-09-15: RECEIVE DUMP + // state = CATCHUP; + // stall(); + // When received + mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0); + ready(); + } + else if (map.dumper == self && state == READY) { // My turn to send the dump + QPID_LOG(info, self << " sending state dump to " << dumpee); + // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient. + // state = DUMPING; + // stall(); + (void)urlStr; + // When dump complete: + assert(map.dumper == self); + ClusterUpdateBody b = map.toControl(); + b.setDumper(0); + mcastControl(b, 0); + // NB: Don't modify my own map till self-delivery. } } +void Cluster::ready(const MemberId& member, const std::string& url) { + Mutex::ScopedLock l(lock); + map.add(member, Url(url)); +} + broker::Broker& Cluster::getBroker(){ return broker; } void Cluster::stall() { |
