summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-16 18:46:11 +0000
committerAlan Conway <aconway@apache.org>2008-09-16 18:46:11 +0000
commit8709822ffee38d9bf16f4cf43114bc450fc222eb (patch)
treeaff16233001f6770a397c588d80d367af343942e /cpp/src/qpid/cluster/Cluster.cpp
parent2f43007baa7e97086ddb227ddc27b4b4b26bf53c (diff)
downloadqpid-python-8709822ffee38d9bf16f4cf43114bc450fc222eb.tar.gz
Fix race in cluster join protocol.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@696003 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp131
1 files changed, 72 insertions, 59 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 7fb2e5ad58..858542802c 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -27,6 +27,7 @@
#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterDumpRequestBody.h"
#include "qpid/framing/ClusterUpdateBody.h"
+#include "qpid/framing/ClusterReadyBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/log/Statement.h"
@@ -53,8 +54,9 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler {
ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {}
bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); }
- void dumpRequest(const std::string& u) { cluster.dumpRequest(member, u); }
- void update(const FieldTable& members,bool dumping) { cluster.update(members, dumping); }
+ void update(const FieldTable& members, uint64_t dumping) { cluster.update(members, dumping); }
+ void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url); }
+ void ready(const std::string& url) { cluster.ready(member, url); }
};
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
@@ -76,7 +78,6 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
cpgDispatchHandle.startWatch(poller);
cpg.join(name);
-
}
Cluster::~Cluster() {}
@@ -218,6 +219,19 @@ ostream& operator<<(ostream& o, const AddrList& a) {
return o;
}
+void Cluster::dispatch(sys::DispatchHandle& h) {
+ cpg.dispatchAll();
+ h.rewatch();
+}
+
+void Cluster::disconnect(sys::DispatchHandle& ) {
+ // FIXME aconway 2008-09-11: this should be logged as critical,
+ // when we provide admin option to shut down cluster and let
+ // members leave cleanly.
+ QPID_LOG(notice, self << " disconnected from cluster " << name.str());
+ broker.shutdown();
+}
+
void Cluster::configChange(
cpg_handle_t /*handle*/,
cpg_name */*group*/,
@@ -225,79 +239,78 @@ void Cluster::configChange(
cpg_address *left, int nLeft,
cpg_address */*joined*/, int nJoined)
{
+ Mutex::ScopedLock l(lock);
// FIXME aconway 2008-09-15: use group terminology not cluster. Member not node.
- QPID_LOG(notice, "Current cluster: " << AddrList(current, nCurrent));
- QPID_LOG_IF(notice, nLeft, "Left the cluster: " << AddrList(left, nLeft));
- if (find(left, left+nLeft, self) != left+nLeft) {
- // We have left the group, this is the final config change.
+ QPID_LOG(info, "Current cluster: " << AddrList(current, nCurrent));
+ QPID_LOG_IF(info, nLeft, "Left the cluster: " << AddrList(left, nLeft));
+
+ map.left(left, nLeft);
+ if (find(left, left+nLeft, self) != left+nLeft) {
+ // I have left the group, this is the final config change.
QPID_LOG(notice, self << " left cluster " << name.str());
broker.shutdown();
+ return;
}
- Mutex::ScopedLock l(lock);
- map.configChange(current, nCurrent);
- if (state == START && nCurrent == 1) { // First in cluster
- assert(*current == self);
- assert(map.empty());
- QPID_LOG(notice, self << " first in cluster.");
- map.insert(self, url);
- ready();
- }
- else if (nJoined && self == map.first()) { // Send an update to new members.
- mcastControl(map.toControl(), 0);
+
+ if (state == START) {
+ if (nCurrent == 1 && *current == self) { // First in cluster.
+ // First in cluster
+ QPID_LOG(notice, self << " first in cluster.");
+ map.add(self, url);
+ ready();
+ }
+ return;
}
-}
-void Cluster::dispatch(sys::DispatchHandle& h) {
- cpg.dispatchAll();
- h.rewatch();
-}
+ if (state == DISCARD && !map.dumper) // try another dump request.
+ mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0);
-void Cluster::disconnect(sys::DispatchHandle& ) {
- // FIXME aconway 2008-09-11: this should be logged as critical,
- // when we provide admin option to shut down cluster and let
- // members leave cleanly.
- QPID_LOG(notice, self << " disconnected from cluster " << name.str());
- broker.shutdown();
+ if (nJoined && map.sendUpdate(self)) // New members need update
+ mcastControl(map.toControl(), 0);
}
-void Cluster::update(const FieldTable& members, bool dumping) {
+void Cluster::update(const FieldTable& members, uint64_t dumper) {
Mutex::ScopedLock l(lock);
- map.update(members, dumping);
- QPID_LOG(info, "Cluster update:\n " << map);
- if (state == START && dumping == false) {
- state = DISCARD;
+ map.update(members, dumper);
+ QPID_LOG(debug, "Cluster update: " << map);
+ if (state == START) state = DISCARD; // Got first update.
+ if (state == DISCARD && !map.dumper)
mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0);
- }
}
-void Cluster::dumpRequest(const MemberId& m, const string& urlStr) {
+void Cluster::dumpRequest(const MemberId& dumpee, const string& urlStr) {
Mutex::ScopedLock l(lock);
- bool wasDumping = map.isDumping();
- map.setDumping(true);
- if (!wasDumping) {
- if (self == m) { // My turn
- assert(state == DISCARD);
- // FIXME aconway 2008-09-15: RECEIVE DUMP
- // state = CATCHUP;
- // stall();
- // When received
- map.insert(self, url);
- mcastControl(map.toControl(), 0);
- ready();
- }
- else if (state == READY && self == map.first()) { // Give the dump.
- QPID_LOG(info, self << " dumping to " << url);
- // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient.
- // state = DUMPING;
- // stall();
- (void)urlStr;
- // When dump complete:
- map.setDumping(false);
- mcastControl(map.toControl(), 0);
- }
+ if (map.dumper) return; // Dump already in progress, ignore.
+ map.dumper = map.first();
+ if (dumpee == self && state == DISCARD) { // My turn to receive a dump.
+ QPID_LOG(info, self << " receiving state dump from " << map.dumper);
+ // FIXME aconway 2008-09-15: RECEIVE DUMP
+ // state = CATCHUP;
+ // stall();
+ // When received
+ mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0);
+ ready();
+ }
+ else if (map.dumper == self && state == READY) { // My turn to send the dump
+ QPID_LOG(info, self << " sending state dump to " << dumpee);
+ // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient.
+ // state = DUMPING;
+ // stall();
+ (void)urlStr;
+ // When dump complete:
+ assert(map.dumper == self);
+ ClusterUpdateBody b = map.toControl();
+ b.setDumper(0);
+ mcastControl(b, 0);
+ // NB: Don't modify my own map till self-delivery.
}
}
+void Cluster::ready(const MemberId& member, const std::string& url) {
+ Mutex::ScopedLock l(lock);
+ map.add(member, Url(url));
+}
+
broker::Broker& Cluster::getBroker(){ return broker; }
void Cluster::stall() {