diff options
| author | Alan Conway <aconway@apache.org> | 2008-09-16 18:46:11 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-09-16 18:46:11 +0000 |
| commit | 8709822ffee38d9bf16f4cf43114bc450fc222eb (patch) | |
| tree | aff16233001f6770a397c588d80d367af343942e /cpp | |
| parent | 2f43007baa7e97086ddb227ddc27b4b4b26bf53c (diff) | |
| download | qpid-python-8709822ffee38d9bf16f4cf43114bc450fc222eb.tar.gz | |
Fix race in cluster join protocol.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@696003 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 131 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 15 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.cpp | 52 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.h | 34 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/types.h | 3 | ||||
| -rw-r--r-- | cpp/src/tests/cluster_test.cpp | 4 | ||||
| -rw-r--r-- | cpp/xml/cluster.xml | 9 |
7 files changed, 133 insertions, 115 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 7fb2e5ad58..858542802c 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -27,6 +27,7 @@ #include "qpid/framing/AllInvoker.h" #include "qpid/framing/ClusterDumpRequestBody.h" #include "qpid/framing/ClusterUpdateBody.h" +#include "qpid/framing/ClusterReadyBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/log/Statement.h" @@ -53,8 +54,9 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler { ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {} bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); } - void dumpRequest(const std::string& u) { cluster.dumpRequest(member, u); } - void update(const FieldTable& members,bool dumping) { cluster.update(members, dumping); } + void update(const FieldTable& members, uint64_t dumping) { cluster.update(members, dumping); } + void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url); } + void ready(const std::string& url) { cluster.ready(member, url); } }; Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : @@ -76,7 +78,6 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : broker.addFinalizer(boost::bind(&Cluster::shutdown, this)); cpgDispatchHandle.startWatch(poller); cpg.join(name); - } Cluster::~Cluster() {} @@ -218,6 +219,19 @@ ostream& operator<<(ostream& o, const AddrList& a) { return o; } +void Cluster::dispatch(sys::DispatchHandle& h) { + cpg.dispatchAll(); + h.rewatch(); +} + +void Cluster::disconnect(sys::DispatchHandle& ) { + // FIXME aconway 2008-09-11: this should be logged as critical, + // when we provide admin option to shut down cluster and let + // members leave cleanly. + QPID_LOG(notice, self << " disconnected from cluster " << name.str()); + broker.shutdown(); +} + void Cluster::configChange( cpg_handle_t /*handle*/, cpg_name */*group*/, @@ -225,79 +239,78 @@ void Cluster::configChange( cpg_address *left, int nLeft, cpg_address */*joined*/, int nJoined) { + Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-15: use group terminology not cluster. Member not node. - QPID_LOG(notice, "Current cluster: " << AddrList(current, nCurrent)); - QPID_LOG_IF(notice, nLeft, "Left the cluster: " << AddrList(left, nLeft)); - if (find(left, left+nLeft, self) != left+nLeft) { - // We have left the group, this is the final config change. + QPID_LOG(info, "Current cluster: " << AddrList(current, nCurrent)); + QPID_LOG_IF(info, nLeft, "Left the cluster: " << AddrList(left, nLeft)); + + map.left(left, nLeft); + if (find(left, left+nLeft, self) != left+nLeft) { + // I have left the group, this is the final config change. QPID_LOG(notice, self << " left cluster " << name.str()); broker.shutdown(); + return; } - Mutex::ScopedLock l(lock); - map.configChange(current, nCurrent); - if (state == START && nCurrent == 1) { // First in cluster - assert(*current == self); - assert(map.empty()); - QPID_LOG(notice, self << " first in cluster."); - map.insert(self, url); - ready(); - } - else if (nJoined && self == map.first()) { // Send an update to new members. - mcastControl(map.toControl(), 0); + + if (state == START) { + if (nCurrent == 1 && *current == self) { // First in cluster. + // First in cluster + QPID_LOG(notice, self << " first in cluster."); + map.add(self, url); + ready(); + } + return; } -} -void Cluster::dispatch(sys::DispatchHandle& h) { - cpg.dispatchAll(); - h.rewatch(); -} + if (state == DISCARD && !map.dumper) // try another dump request. + mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0); -void Cluster::disconnect(sys::DispatchHandle& ) { - // FIXME aconway 2008-09-11: this should be logged as critical, - // when we provide admin option to shut down cluster and let - // members leave cleanly. - QPID_LOG(notice, self << " disconnected from cluster " << name.str()); - broker.shutdown(); + if (nJoined && map.sendUpdate(self)) // New members need update + mcastControl(map.toControl(), 0); } -void Cluster::update(const FieldTable& members, bool dumping) { +void Cluster::update(const FieldTable& members, uint64_t dumper) { Mutex::ScopedLock l(lock); - map.update(members, dumping); - QPID_LOG(info, "Cluster update:\n " << map); - if (state == START && dumping == false) { - state = DISCARD; + map.update(members, dumper); + QPID_LOG(debug, "Cluster update: " << map); + if (state == START) state = DISCARD; // Got first update. + if (state == DISCARD && !map.dumper) mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0); - } } -void Cluster::dumpRequest(const MemberId& m, const string& urlStr) { +void Cluster::dumpRequest(const MemberId& dumpee, const string& urlStr) { Mutex::ScopedLock l(lock); - bool wasDumping = map.isDumping(); - map.setDumping(true); - if (!wasDumping) { - if (self == m) { // My turn - assert(state == DISCARD); - // FIXME aconway 2008-09-15: RECEIVE DUMP - // state = CATCHUP; - // stall(); - // When received - map.insert(self, url); - mcastControl(map.toControl(), 0); - ready(); - } - else if (state == READY && self == map.first()) { // Give the dump. - QPID_LOG(info, self << " dumping to " << url); - // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient. - // state = DUMPING; - // stall(); - (void)urlStr; - // When dump complete: - map.setDumping(false); - mcastControl(map.toControl(), 0); - } + if (map.dumper) return; // Dump already in progress, ignore. + map.dumper = map.first(); + if (dumpee == self && state == DISCARD) { // My turn to receive a dump. + QPID_LOG(info, self << " receiving state dump from " << map.dumper); + // FIXME aconway 2008-09-15: RECEIVE DUMP + // state = CATCHUP; + // stall(); + // When received + mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0); + ready(); + } + else if (map.dumper == self && state == READY) { // My turn to send the dump + QPID_LOG(info, self << " sending state dump to " << dumpee); + // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient. + // state = DUMPING; + // stall(); + (void)urlStr; + // When dump complete: + assert(map.dumper == self); + ClusterUpdateBody b = map.toControl(); + b.setDumper(0); + mcastControl(b, 0); + // NB: Don't modify my own map till self-delivery. } } +void Cluster::ready(const MemberId& member, const std::string& url) { + Mutex::ScopedLock l(lock); + map.add(member, Url(url)); +} + broker::Broker& Cluster::getBroker(){ return broker; } void Cluster::stall() { diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index b8527ae66b..e33cca8482 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -74,9 +74,11 @@ class Cluster : private Cpg::Handler /** Leave the cluster */ void leave(); - + + // Cluster controls. + void update(const framing::FieldTable& members, uint64_t dumping); void dumpRequest(const MemberId&, const std::string& url); - void update(const framing::FieldTable& members, bool dumping); + void ready(const MemberId&, const std::string& url); MemberId getSelf() const { return self; } @@ -91,12 +93,11 @@ class Cluster : private Cpg::Handler typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap; typedef sys::PollableQueue<Event> EventQueue; enum State { - START, // Have not yet received first cluster update. + START, // Start state, no cluster update received yet. DISCARD, // Discard updates up to dump start point. - HAVE_DUMP, // Received state dump, waiting for catchup point. - CATCHUP, // Stalled at catchup point, waiting for dump. - DUMPING, // Stalled while sending a state dump. - READY // Normal processing. + CATCHUP, // Stalled at catchup point, waiting for dump. + DUMPING, // Stalled while sending a state dump. + READY // Normal processing. }; void connectionEvent(const Event&); diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp index 63d0c786d2..51e360ad73 100644 --- a/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/cpp/src/qpid/cluster/ClusterMap.cpp @@ -32,56 +32,58 @@ using namespace framing; namespace cluster { -ClusterMap::ClusterMap() : dumping(false) {} +ClusterMap::ClusterMap() {} -MemberId ClusterMap::first() { - return (empty()) ? MemberId() : begin()->first; +MemberId ClusterMap::first() const { + return (members.empty()) ? MemberId() : members.begin()->first; } -void ClusterMap::configChange(const cpg_address* addrs, size_t size) { - iterator i = begin(); - while (i != end()) { // Remove members that are no longer in addrs. - if (std::find(addrs, addrs+size, i->first) == addrs+size) - erase(i++); - else - ++i; - } +void ClusterMap::left(const cpg_address* addrs, size_t size) { + size_t (Members::*erase)(const MemberId&) = &Members::erase; + std::for_each(addrs, addrs+size, boost::bind(erase, &members, _1)); + if (dumper && !isMember(dumper)) + dumper = MemberId(); } framing::ClusterUpdateBody ClusterMap::toControl() const { framing::ClusterUpdateBody b; - for (const_iterator i = begin(); i != end(); ++i) + for (Members::const_iterator i = members.begin(); i != members.end(); ++i) b.getMembers().setString(i->first.str(), i->second.str()); - b.setDumping(dumping); + b.setDumper(dumper); return b; } -void ClusterMap::update(const FieldTable& ftMembers, bool dump) { - dumping = dump; +void ClusterMap::update(const FieldTable& ftMembers, uint64_t dumper_) { FieldTable::ValueMap::const_iterator i; for (i = ftMembers.begin(); i != ftMembers.end(); ++i) - (*this)[i->first] = Url(i->second->get<std::string>()); -} - -void ClusterMap::fromControl(const framing::ClusterUpdateBody& b) { - update(b.getMembers(), b.getDumping()); + members[i->first] = Url(i->second->get<std::string>()); + dumper = MemberId(dumper_); } std::vector<Url> ClusterMap::memberUrls() const { std::vector<Url> result(size()); - std::transform(begin(), end(), result.begin(), - boost::bind(&value_type::second, _1)); + std::transform(members.begin(), members.end(), result.begin(), + boost::bind(&Members::value_type::second, _1)); return result; } -std::ostream& operator<<(std::ostream& o, const ClusterMap::value_type& mv) { +std::ostream& operator<<(std::ostream& o, const ClusterMap::Members::value_type& mv) { return o << mv.first << "=" << mv.second; } std::ostream& operator<<(std::ostream& o, const ClusterMap& m) { - std::ostream_iterator<ClusterMap::value_type> im(o, "\n "); - std::copy(m.begin(), m.end(), im); + std::ostream_iterator<ClusterMap::Members::value_type> im(o, "\n "); + o << "dumper=" << m.dumper << ", members:\n "; + std::copy(m.members.begin(), m.members.end(), im); return o; } +bool ClusterMap::sendUpdate(const MemberId& id) const { + return dumper==id || (!dumper && first() == id); +} + +void ClusterMap::add(const MemberId& id, const Url& url) { + members[id] = url; +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h index fce65f083d..c626c7688d 100644 --- a/cpp/src/qpid/cluster/ClusterMap.h +++ b/cpp/src/qpid/cluster/ClusterMap.h @@ -41,39 +41,39 @@ namespace cluster { * A dumper is an established member that is sending catch-up data. * A dumpee is an aspiring member that is receiving catch-up data. */ -class ClusterMap : public std::map<MemberId, Url> { +class ClusterMap { public: + typedef std::map<MemberId, Url> Members; + Members members; + MemberId dumper; + ClusterMap(); /** First member of the cluster in ID order, gets to perform one-off tasks. */ - MemberId first(); - - /** Update for CPG config change. */ - void configChange(const cpg_address* addrs, size_t size); + MemberId first() const; + /** Update for members leaving. */ + void left(const cpg_address* addrs, size_t size); - /** Convert map contents to a cluster control body. */ + /** Convert map contents to a cluster update body. */ framing::ClusterUpdateBody toControl() const; - /** Update with first member. */ - using std::map<MemberId, Url>::insert; - void insert(const MemberId& id, const Url& url) { insert(value_type(id,url)); } - void setDumping(bool d) { dumping = d; } + /** Add a new member. */ + void add(const MemberId& id, const Url& url); /** Apply update delivered from clsuter. */ - void update(const framing::FieldTable& members, bool dumping); - void fromControl(const framing::ClusterUpdateBody&); + void update(const framing::FieldTable& members, uint64_t dumper); - bool isMember(const MemberId& id) const { return find(id) != end(); } - bool isDumping() const { return dumping; } + bool isMember(const MemberId& id) const { return members.find(id) != members.end(); } + bool sendUpdate(const MemberId& id) const; // True if id should send an update. std::vector<Url> memberUrls() const; - + size_t size() const { return members.size(); } + private: - bool dumping; friend std::ostream& operator<<(std::ostream&, const ClusterMap&); - friend std::ostream& operator<<(std::ostream& o, const ClusterMap::value_type& mv); + friend std::ostream& operator<<(std::ostream& o, const ClusterMap::Members::value_type& mv); }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h index f48ba2db30..4fbe79e118 100644 --- a/cpp/src/qpid/cluster/types.h +++ b/cpp/src/qpid/cluster/types.h @@ -41,12 +41,13 @@ enum EventType { DATA, CONTROL }; /** first=node-id, second=pid */ struct MemberId : std::pair<uint32_t, uint32_t> { + explicit MemberId(uint64_t n) : std::pair<uint32_t,uint32_t>( n >> 32, n & 0xffffffff) {} explicit MemberId(uint32_t node=0, uint32_t pid=0) : std::pair<uint32_t,uint32_t>(node, pid) {} MemberId(const cpg_address& caddr) : std::pair<uint32_t,uint32_t>(caddr.nodeid, caddr.pid) {} MemberId(const std::string&); // Decode from string. uint32_t getNode() const { return first; } uint32_t getPid() const { return second; } - operator bool() const { return first || second; } + operator uint64_t() const { return (uint64_t(first)<<32ull) + second; } // Encode as string, network byte order. std::string str() const; diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index c17dc99901..8dec23a09b 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -140,9 +140,7 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) { return o; } - -// FIXME aconway 2008-09-12: finish the new join protocol. -QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testCatchUpSharedState, 1) { +QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testCatchupSharedState, 1) { ClusterFixture cluster(1); Client c0(cluster[0], "c0"); // Create some shared state. diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml index 6dbfee109d..ba4e50d21e 100644 --- a/cpp/xml/cluster.xml +++ b/cpp/xml/cluster.xml @@ -27,15 +27,18 @@ o<?xml version="1.0"?> <class name = "cluster" code = "0x80" label="Qpid clustering extensions."> <doc>Qpid extension class to allow clustered brokers to communicate.</doc> - <control name="update" code="0x4" label="Cluster status update."> + <control name="update" code="0x1" label="Cluster status update."> <field name="members" type="map"/> <!-- member-id -> URL --> - <field name="dumping" type="boolean"/> <!-- currently dumping state to new member. --> + <field name="dumper" type="uint64"/> <!-- member currently dumping state. --> </control> - <control name = "dump-request" code="0x1" label="New meber requests brain dump"> + <control name = "dump-request" code="0x2" label="New meber requests brain dump"> <field name="url" type="str16" label="Url for brain dump."/> </control> + <control name="ready" code="0x3" label="New member is ready."> + <field name="url" type="str16" label="Url for brain dump."/> + </control> </class> <!-- TODO aconway 2008-09-10: support for un-attached connections. --> |
